aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitlab-ci.d/buildtest.yml9
-rw-r--r--accel/kvm/kvm-all.c10
-rw-r--r--accel/stubs/kvm-stub.c5
-rw-r--r--include/sysemu/kvm.h6
-rw-r--r--migration/migration.c48
-rw-r--r--migration/multifd-zlib.c11
-rw-r--r--migration/multifd-zstd.c11
-rw-r--r--migration/multifd.c778
-rw-r--r--migration/multifd.h59
-rw-r--r--migration/ram.c2
-rw-r--r--migration/trace-events2
-rw-r--r--tests/qtest/migration-test.c2
12 files changed, 547 insertions, 396 deletions
diff --git a/.gitlab-ci.d/buildtest.yml b/.gitlab-ci.d/buildtest.yml
index 79bbc85..f56df59 100644
--- a/.gitlab-ci.d/buildtest.yml
+++ b/.gitlab-ci.d/buildtest.yml
@@ -189,6 +189,8 @@ build-previous-qemu:
TARGETS: x86_64-softmmu aarch64-softmmu
before_script:
- export QEMU_PREV_VERSION="$(sed 's/\([0-9.]*\)\.[0-9]*/v\1.0/' VERSION)"
+ - git remote add upstream https://gitlab.com/qemu-project/qemu
+ - git fetch upstream $QEMU_PREV_VERSION
- git checkout $QEMU_PREV_VERSION
after_script:
- mv build build-previous
@@ -217,9 +219,10 @@ build-previous-qemu:
- QTEST_QEMU_BINARY_DST=./qemu-system-${TARGET}
QTEST_QEMU_BINARY=../build/qemu-system-${TARGET} ./tests/qtest/migration-test
-# This job is disabled until we release 9.0. The existing
-# migration-test in 8.2 is broken on aarch64. The fix was already
-# commited, but it will only take effect once 9.0 is out.
+# This job needs to be disabled until we can have an aarch64 CPU model that
+# will both (1) support both KVM and TCG, and (2) provide a stable ABI.
+# Currently only "-cpu max" can provide (1), however it doesn't guarantee
+# (2). Mark this test skipped until later.
migration-compat-aarch64:
extends: .migration-compat-common
variables:
diff --git a/accel/kvm/kvm-all.c b/accel/kvm/kvm-all.c
index 49e755e..a8cecd0 100644
--- a/accel/kvm/kvm-all.c
+++ b/accel/kvm/kvm-all.c
@@ -1119,6 +1119,11 @@ int kvm_vm_check_extension(KVMState *s, unsigned int extension)
return ret;
}
+/*
+ * We track the poisoned pages to be able to:
+ * - replace them on VM reset
+ * - block a migration for a VM with a poisoned page
+ */
typedef struct HWPoisonPage {
ram_addr_t ram_addr;
QLIST_ENTRY(HWPoisonPage) list;
@@ -1152,6 +1157,11 @@ void kvm_hwpoison_page_add(ram_addr_t ram_addr)
QLIST_INSERT_HEAD(&hwpoison_page_list, page, list);
}
+bool kvm_hwpoisoned_mem(void)
+{
+ return !QLIST_EMPTY(&hwpoison_page_list);
+}
+
static uint32_t adjust_ioeventfd_endianness(uint32_t val, uint32_t size)
{
#if HOST_BIG_ENDIAN != TARGET_BIG_ENDIAN
diff --git a/accel/stubs/kvm-stub.c b/accel/stubs/kvm-stub.c
index 1b37d9a..ca38172 100644
--- a/accel/stubs/kvm-stub.c
+++ b/accel/stubs/kvm-stub.c
@@ -124,3 +124,8 @@ uint32_t kvm_dirty_ring_size(void)
{
return 0;
}
+
+bool kvm_hwpoisoned_mem(void)
+{
+ return false;
+}
diff --git a/include/sysemu/kvm.h b/include/sysemu/kvm.h
index d614878..fad9a7e 100644
--- a/include/sysemu/kvm.h
+++ b/include/sysemu/kvm.h
@@ -538,4 +538,10 @@ bool kvm_arch_cpu_check_are_resettable(void);
bool kvm_dirty_ring_enabled(void);
uint32_t kvm_dirty_ring_size(void);
+
+/**
+ * kvm_hwpoisoned_mem - indicate if there is any hwpoisoned page
+ * reported for the VM.
+ */
+bool kvm_hwpoisoned_mem(void);
#endif
diff --git a/migration/migration.c b/migration/migration.c
index d5f705c..ab21de2 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -67,6 +67,7 @@
#include "options.h"
#include "sysemu/dirtylimit.h"
#include "qemu/sockets.h"
+#include "sysemu/kvm.h"
static NotifierList migration_state_notifiers =
NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
@@ -128,11 +129,17 @@ static bool migration_needs_multiple_sockets(void)
return migrate_multifd() || migrate_postcopy_preempt();
}
-static bool transport_supports_multi_channels(SocketAddress *saddr)
+static bool transport_supports_multi_channels(MigrationAddress *addr)
{
- return saddr->type == SOCKET_ADDRESS_TYPE_INET ||
- saddr->type == SOCKET_ADDRESS_TYPE_UNIX ||
- saddr->type == SOCKET_ADDRESS_TYPE_VSOCK;
+ if (addr->transport == MIGRATION_ADDRESS_TYPE_SOCKET) {
+ SocketAddress *saddr = &addr->u.socket;
+
+ return saddr->type == SOCKET_ADDRESS_TYPE_INET ||
+ saddr->type == SOCKET_ADDRESS_TYPE_UNIX ||
+ saddr->type == SOCKET_ADDRESS_TYPE_VSOCK;
+ }
+
+ return false;
}
static bool
@@ -140,8 +147,7 @@ migration_channels_and_transport_compatible(MigrationAddress *addr,
Error **errp)
{
if (migration_needs_multiple_sockets() &&
- (addr->transport == MIGRATION_ADDRESS_TYPE_SOCKET) &&
- !transport_supports_multi_channels(&addr->u.socket)) {
+ !transport_supports_multi_channels(addr)) {
error_setg(errp, "Migration requires multi-channel URIs (e.g. tcp)");
return false;
}
@@ -311,7 +317,7 @@ void migration_incoming_state_destroy(void)
{
struct MigrationIncomingState *mis = migration_incoming_get_current();
- multifd_load_cleanup();
+ multifd_recv_cleanup();
compress_threads_load_cleanup();
if (mis->to_src_file) {
@@ -662,7 +668,7 @@ static void process_incoming_migration_bh(void *opaque)
trace_vmstate_downtime_checkpoint("dst-precopy-bh-announced");
- multifd_load_shutdown();
+ multifd_recv_shutdown();
dirty_bitmap_mig_before_vm_start();
@@ -759,7 +765,7 @@ fail:
MIGRATION_STATUS_FAILED);
qemu_fclose(mis->from_src_file);
- multifd_load_cleanup();
+ multifd_recv_cleanup();
compress_threads_load_cleanup();
exit(EXIT_FAILURE);
@@ -885,7 +891,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
default_channel = !mis->from_src_file;
}
- if (multifd_load_setup(errp) != 0) {
+ if (multifd_recv_setup(errp) != 0) {
return;
}
@@ -1331,7 +1337,7 @@ static void migrate_fd_cleanup(MigrationState *s)
}
bql_lock();
- multifd_save_cleanup();
+ multifd_send_shutdown();
qemu_mutex_lock(&s->qemu_file_lock);
tmp = s->to_dst_file;
s->to_dst_file = NULL;
@@ -1906,6 +1912,12 @@ static bool migrate_prepare(MigrationState *s, bool blk, bool blk_inc,
return false;
}
+ if (kvm_hwpoisoned_mem()) {
+ error_setg(errp, "Can't migrate this vm with hardware poisoned memory, "
+ "please reboot the vm and try again");
+ return false;
+ }
+
if (migration_is_blocked(errp)) {
return false;
}
@@ -3315,6 +3327,10 @@ static void *migration_thread(void *opaque)
object_ref(OBJECT(s));
update_iteration_initial_status(s);
+ if (!multifd_send_setup()) {
+ goto out;
+ }
+
bql_lock();
qemu_savevm_state_header(s->to_dst_file);
bql_unlock();
@@ -3386,6 +3402,7 @@ static void *migration_thread(void *opaque)
urgent = migration_rate_limit();
}
+out:
trace_migration_thread_after_loop();
migration_iteration_finish(s);
object_unref(OBJECT(s));
@@ -3623,15 +3640,6 @@ void migrate_fd_connect(MigrationState *s, Error *error_in)
return;
}
- if (multifd_save_setup(&local_err) != 0) {
- migrate_set_error(s, local_err);
- error_report_err(local_err);
- migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
- MIGRATION_STATUS_FAILED);
- migrate_fd_cleanup(s);
- return;
- }
-
if (migrate_background_snapshot()) {
qemu_thread_create(&s->thread, "bg_snapshot",
bg_migration_thread, s, QEMU_THREAD_JOINABLE);
diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
index 37ce486..012e3bd 100644
--- a/migration/multifd-zlib.c
+++ b/migration/multifd-zlib.c
@@ -116,17 +116,20 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
*/
static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
{
+ MultiFDPages_t *pages = p->pages;
struct zlib_data *z = p->data;
z_stream *zs = &z->zs;
uint32_t out_size = 0;
int ret;
uint32_t i;
- for (i = 0; i < p->normal_num; i++) {
+ multifd_send_prepare_header(p);
+
+ for (i = 0; i < pages->num; i++) {
uint32_t available = z->zbuff_len - out_size;
int flush = Z_NO_FLUSH;
- if (i == p->normal_num - 1) {
+ if (i == pages->num - 1) {
flush = Z_SYNC_FLUSH;
}
@@ -135,7 +138,7 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
* with compression. zlib does not guarantee that this is safe,
* therefore copy the page before calling deflate().
*/
- memcpy(z->buf, p->pages->block->host + p->normal[i], p->page_size);
+ memcpy(z->buf, p->pages->block->host + pages->offset[i], p->page_size);
zs->avail_in = p->page_size;
zs->next_in = z->buf;
@@ -171,6 +174,8 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
p->next_packet_size = out_size;
p->flags |= MULTIFD_FLAG_ZLIB;
+ multifd_send_fill_packet(p);
+
return 0;
}
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
index b471daa..dc8fe43 100644
--- a/migration/multifd-zstd.c
+++ b/migration/multifd-zstd.c
@@ -113,21 +113,24 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
*/
static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
{
+ MultiFDPages_t *pages = p->pages;
struct zstd_data *z = p->data;
int ret;
uint32_t i;
+ multifd_send_prepare_header(p);
+
z->out.dst = z->zbuff;
z->out.size = z->zbuff_len;
z->out.pos = 0;
- for (i = 0; i < p->normal_num; i++) {
+ for (i = 0; i < pages->num; i++) {
ZSTD_EndDirective flush = ZSTD_e_continue;
- if (i == p->normal_num - 1) {
+ if (i == pages->num - 1) {
flush = ZSTD_e_flush;
}
- z->in.src = p->pages->block->host + p->normal[i];
+ z->in.src = p->pages->block->host + pages->offset[i];
z->in.size = p->page_size;
z->in.pos = 0;
@@ -160,6 +163,8 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
p->next_packet_size = z->out.pos;
p->flags |= MULTIFD_FLAG_ZSTD;
+ multifd_send_fill_packet(p);
+
return 0;
}
diff --git a/migration/multifd.c b/migration/multifd.c
index 25cbc6d..adfe8c9 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -45,20 +45,54 @@ typedef struct {
uint64_t unused2[4]; /* Reserved for future use */
} __attribute__((packed)) MultiFDInit_t;
+struct {
+ MultiFDSendParams *params;
+ /* array of pages to sent */
+ MultiFDPages_t *pages;
+ /*
+ * Global number of generated multifd packets.
+ *
+ * Note that we used 'uintptr_t' because it'll naturally support atomic
+ * operations on both 32bit / 64 bits hosts. It means on 32bit systems
+ * multifd will overflow the packet_num easier, but that should be
+ * fine.
+ *
+ * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
+ * hosts, however so far it does not support atomic fetch_add() yet.
+ * Make it easy for now.
+ */
+ uintptr_t packet_num;
+ /*
+ * Synchronization point past which no more channels will be
+ * created.
+ */
+ QemuSemaphore channels_created;
+ /* send channels ready */
+ QemuSemaphore channels_ready;
+ /*
+ * Have we already run terminate threads. There is a race when it
+ * happens that we got one error while we are exiting.
+ * We will use atomic operations. Only valid values are 0 and 1.
+ */
+ int exiting;
+ /* multifd ops */
+ MultiFDMethods *ops;
+} *multifd_send_state;
+
/* Multifd without compression */
/**
* nocomp_send_setup: setup send side
*
- * For no compression this function does nothing.
- *
- * Returns 0 for success or -1 for error
- *
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
{
+ if (migrate_zero_copy_send()) {
+ p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
+ }
+
return 0;
}
@@ -88,16 +122,38 @@ static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
*/
static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
{
+ bool use_zero_copy_send = migrate_zero_copy_send();
MultiFDPages_t *pages = p->pages;
+ int ret;
- for (int i = 0; i < p->normal_num; i++) {
- p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i];
+ if (!use_zero_copy_send) {
+ /*
+ * Only !zerocopy needs the header in IOV; zerocopy will
+ * send it separately.
+ */
+ multifd_send_prepare_header(p);
+ }
+
+ for (int i = 0; i < pages->num; i++) {
+ p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
p->iov[p->iovs_num].iov_len = p->page_size;
p->iovs_num++;
}
- p->next_packet_size = p->normal_num * p->page_size;
+ p->next_packet_size = pages->num * p->page_size;
p->flags |= MULTIFD_FLAG_NOCOMP;
+
+ multifd_send_fill_packet(p);
+
+ if (use_zero_copy_send) {
+ /* Send header first, without zerocopy */
+ ret = qio_channel_write_all(p->c, (void *)p->packet,
+ p->packet_len, errp);
+ if (ret != 0) {
+ return -1;
+ }
+ }
+
return 0;
}
@@ -172,6 +228,17 @@ void multifd_register_ops(int method, MultiFDMethods *ops)
multifd_ops[method] = ops;
}
+/* Reset a MultiFDPages_t* object for the next use */
+static void multifd_pages_reset(MultiFDPages_t *pages)
+{
+ /*
+ * We don't need to touch offset[] array, because it will be
+ * overwritten later when reused.
+ */
+ pages->num = 0;
+ pages->block = NULL;
+}
+
static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
{
MultiFDInit_t msg = {};
@@ -248,35 +315,44 @@ static MultiFDPages_t *multifd_pages_init(uint32_t n)
static void multifd_pages_clear(MultiFDPages_t *pages)
{
- pages->num = 0;
+ multifd_pages_reset(pages);
pages->allocated = 0;
- pages->block = NULL;
g_free(pages->offset);
pages->offset = NULL;
g_free(pages);
}
-static void multifd_send_fill_packet(MultiFDSendParams *p)
+void multifd_send_fill_packet(MultiFDSendParams *p)
{
MultiFDPacket_t *packet = p->packet;
+ MultiFDPages_t *pages = p->pages;
+ uint64_t packet_num;
int i;
packet->flags = cpu_to_be32(p->flags);
packet->pages_alloc = cpu_to_be32(p->pages->allocated);
- packet->normal_pages = cpu_to_be32(p->normal_num);
+ packet->normal_pages = cpu_to_be32(pages->num);
packet->next_packet_size = cpu_to_be32(p->next_packet_size);
- packet->packet_num = cpu_to_be64(p->packet_num);
- if (p->pages->block) {
- strncpy(packet->ramblock, p->pages->block->idstr, 256);
+ packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
+ packet->packet_num = cpu_to_be64(packet_num);
+
+ if (pages->block) {
+ strncpy(packet->ramblock, pages->block->idstr, 256);
}
- for (i = 0; i < p->normal_num; i++) {
+ for (i = 0; i < pages->num; i++) {
/* there are architectures where ram_addr_t is 32 bit */
- uint64_t temp = p->normal[i];
+ uint64_t temp = pages->offset[i];
packet->offset[i] = cpu_to_be64(temp);
}
+
+ p->packets_sent++;
+ p->total_normal_pages += pages->num;
+
+ trace_multifd_send(p->id, packet_num, pages->num, p->flags,
+ p->next_packet_size);
}
static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
@@ -324,6 +400,11 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
p->next_packet_size = be32_to_cpu(packet->next_packet_size);
p->packet_num = be64_to_cpu(packet->packet_num);
+ p->packets_recved++;
+ p->total_normal_pages += p->normal_num;
+
+ trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags,
+ p->next_packet_size);
if (p->normal_num == 0) {
return 0;
@@ -354,23 +435,22 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
return 0;
}
-struct {
- MultiFDSendParams *params;
- /* array of pages to sent */
- MultiFDPages_t *pages;
- /* global number of generated multifd packets */
- uint64_t packet_num;
- /* send channels ready */
- QemuSemaphore channels_ready;
- /*
- * Have we already run terminate threads. There is a race when it
- * happens that we got one error while we are exiting.
- * We will use atomic operations. Only valid values are 0 and 1.
- */
- int exiting;
- /* multifd ops */
- MultiFDMethods *ops;
-} *multifd_send_state;
+static bool multifd_send_should_exit(void)
+{
+ return qatomic_read(&multifd_send_state->exiting);
+}
+
+/*
+ * The migration thread can wait on either of the two semaphores. This
+ * function can be used to kick the main thread out of waiting on either of
+ * them. Should mostly only be called when something wrong happened with
+ * the current multifd send thread.
+ */
+static void multifd_send_kick_main(MultiFDSendParams *p)
+{
+ qemu_sem_post(&p->sem_sync);
+ qemu_sem_post(&multifd_send_state->channels_ready);
+}
/*
* How we use multifd_send_state->pages and channel->pages?
@@ -388,20 +468,23 @@ struct {
* thread is using the channel mutex when changing it, and the channel
* have to had finish with its own, otherwise pending_job can't be
* false.
+ *
+ * Returns true if succeed, false otherwise.
*/
-
-static int multifd_send_pages(void)
+static bool multifd_send_pages(void)
{
int i;
static int next_channel;
MultiFDSendParams *p = NULL; /* make happy gcc */
MultiFDPages_t *pages = multifd_send_state->pages;
- if (qatomic_read(&multifd_send_state->exiting)) {
- return -1;
+ if (multifd_send_should_exit()) {
+ return false;
}
+ /* We wait here, until at least one channel is ready */
qemu_sem_wait(&multifd_send_state->channels_ready);
+
/*
* next_channel can remain from a previous migration that was
* using more channels, so ensure it doesn't overflow if the
@@ -409,69 +492,100 @@ static int multifd_send_pages(void)
*/
next_channel %= migrate_multifd_channels();
for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
- p = &multifd_send_state->params[i];
-
- qemu_mutex_lock(&p->mutex);
- if (p->quit) {
- error_report("%s: channel %d has already quit!", __func__, i);
- qemu_mutex_unlock(&p->mutex);
- return -1;
+ if (multifd_send_should_exit()) {
+ return false;
}
- if (!p->pending_job) {
- p->pending_job++;
+ p = &multifd_send_state->params[i];
+ /*
+ * Lockless read to p->pending_job is safe, because only multifd
+ * sender thread can clear it.
+ */
+ if (qatomic_read(&p->pending_job) == false) {
next_channel = (i + 1) % migrate_multifd_channels();
break;
}
- qemu_mutex_unlock(&p->mutex);
}
- assert(!p->pages->num);
- assert(!p->pages->block);
- p->packet_num = multifd_send_state->packet_num++;
+ /*
+ * Make sure we read p->pending_job before all the rest. Pairs with
+ * qatomic_store_release() in multifd_send_thread().
+ */
+ smp_mb_acquire();
+ assert(!p->pages->num);
multifd_send_state->pages = p->pages;
p->pages = pages;
- qemu_mutex_unlock(&p->mutex);
+ /*
+ * Making sure p->pages is setup before marking pending_job=true. Pairs
+ * with the qatomic_load_acquire() in multifd_send_thread().
+ */
+ qatomic_store_release(&p->pending_job, true);
qemu_sem_post(&p->sem);
- return 1;
+ return true;
}
-int multifd_queue_page(RAMBlock *block, ram_addr_t offset)
+static inline bool multifd_queue_empty(MultiFDPages_t *pages)
{
- MultiFDPages_t *pages = multifd_send_state->pages;
- bool changed = false;
+ return pages->num == 0;
+}
- if (!pages->block) {
- pages->block = block;
- }
+static inline bool multifd_queue_full(MultiFDPages_t *pages)
+{
+ return pages->num == pages->allocated;
+}
- if (pages->block == block) {
- pages->offset[pages->num] = offset;
- pages->num++;
+static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
+{
+ pages->offset[pages->num++] = offset;
+}
- if (pages->num < pages->allocated) {
- return 1;
- }
- } else {
- changed = true;
- }
+/* Returns true if enqueue successful, false otherwise */
+bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
+{
+ MultiFDPages_t *pages;
- if (multifd_send_pages() < 0) {
- return -1;
+retry:
+ pages = multifd_send_state->pages;
+
+ /* If the queue is empty, we can already enqueue now */
+ if (multifd_queue_empty(pages)) {
+ pages->block = block;
+ multifd_enqueue(pages, offset);
+ return true;
}
- if (changed) {
- return multifd_queue_page(block, offset);
+ /*
+ * Not empty, meanwhile we need a flush. It can because of either:
+ *
+ * (1) The page is not on the same ramblock of previous ones, or,
+ * (2) The queue is full.
+ *
+ * After flush, always retry.
+ */
+ if (pages->block != block || multifd_queue_full(pages)) {
+ if (!multifd_send_pages()) {
+ return false;
+ }
+ goto retry;
}
- return 1;
+ /* Not empty, and we still have space, do it! */
+ multifd_enqueue(pages, offset);
+ return true;
}
-static void multifd_send_terminate_threads(Error *err)
+/* Multifd send side hit an error; remember it and prepare to quit */
+static void multifd_send_set_error(Error *err)
{
- int i;
-
- trace_multifd_send_terminate_threads(err != NULL);
+ /*
+ * We don't want to exit each threads twice. Depending on where
+ * we get the error, or if there are two independent errors in two
+ * threads at the same time, we can end calling this function
+ * twice.
+ */
+ if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
+ return;
+ }
if (err) {
MigrationState *s = migrate_get_current();
@@ -484,27 +598,46 @@ static void multifd_send_terminate_threads(Error *err)
MIGRATION_STATUS_FAILED);
}
}
+}
+
+static void multifd_send_terminate_threads(void)
+{
+ int i;
+
+ trace_multifd_send_terminate_threads();
/*
- * We don't want to exit each threads twice. Depending on where
- * we get the error, or if there are two independent errors in two
- * threads at the same time, we can end calling this function
- * twice.
+ * Tell everyone we're quitting. No xchg() needed here; we simply
+ * always set it.
*/
- if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
- return;
- }
+ qatomic_set(&multifd_send_state->exiting, 1);
+ /*
+ * Firstly, kick all threads out; no matter whether they are just idle,
+ * or blocked in an IO system call.
+ */
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
- qemu_mutex_lock(&p->mutex);
- p->quit = true;
qemu_sem_post(&p->sem);
if (p->c) {
qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
}
- qemu_mutex_unlock(&p->mutex);
+ }
+
+ /*
+ * Finally recycle all the threads.
+ */
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDSendParams *p = &multifd_send_state->params[i];
+
+ if (p->tls_thread_created) {
+ qemu_thread_join(&p->tls_thread);
+ }
+
+ if (p->thread_created) {
+ qemu_thread_join(&p->thread);
+ }
}
}
@@ -513,57 +646,62 @@ static int multifd_send_channel_destroy(QIOChannel *send)
return socket_send_channel_destroy(send);
}
-void multifd_save_cleanup(void)
+static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
+{
+ if (p->registered_yank) {
+ migration_ioc_unregister_yank(p->c);
+ }
+ multifd_send_channel_destroy(p->c);
+ p->c = NULL;
+ qemu_sem_destroy(&p->sem);
+ qemu_sem_destroy(&p->sem_sync);
+ g_free(p->name);
+ p->name = NULL;
+ multifd_pages_clear(p->pages);
+ p->pages = NULL;
+ p->packet_len = 0;
+ g_free(p->packet);
+ p->packet = NULL;
+ g_free(p->iov);
+ p->iov = NULL;
+ multifd_send_state->ops->send_cleanup(p, errp);
+
+ return *errp == NULL;
+}
+
+static void multifd_send_cleanup_state(void)
+{
+ qemu_sem_destroy(&multifd_send_state->channels_created);
+ qemu_sem_destroy(&multifd_send_state->channels_ready);
+ g_free(multifd_send_state->params);
+ multifd_send_state->params = NULL;
+ multifd_pages_clear(multifd_send_state->pages);
+ multifd_send_state->pages = NULL;
+ g_free(multifd_send_state);
+ multifd_send_state = NULL;
+}
+
+void multifd_send_shutdown(void)
{
int i;
if (!migrate_multifd()) {
return;
}
- multifd_send_terminate_threads(NULL);
- for (i = 0; i < migrate_multifd_channels(); i++) {
- MultiFDSendParams *p = &multifd_send_state->params[i];
- if (p->running) {
- qemu_thread_join(&p->thread);
- }
- }
+ multifd_send_terminate_threads();
+
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
Error *local_err = NULL;
- if (p->registered_yank) {
- migration_ioc_unregister_yank(p->c);
- }
- multifd_send_channel_destroy(p->c);
- p->c = NULL;
- qemu_mutex_destroy(&p->mutex);
- qemu_sem_destroy(&p->sem);
- qemu_sem_destroy(&p->sem_sync);
- g_free(p->name);
- p->name = NULL;
- multifd_pages_clear(p->pages);
- p->pages = NULL;
- p->packet_len = 0;
- g_free(p->packet);
- p->packet = NULL;
- g_free(p->iov);
- p->iov = NULL;
- g_free(p->normal);
- p->normal = NULL;
- multifd_send_state->ops->send_cleanup(p, &local_err);
- if (local_err) {
+ if (!multifd_send_cleanup_channel(p, &local_err)) {
migrate_set_error(migrate_get_current(), local_err);
error_free(local_err);
}
}
- qemu_sem_destroy(&multifd_send_state->channels_ready);
- g_free(multifd_send_state->params);
- multifd_send_state->params = NULL;
- multifd_pages_clear(multifd_send_state->pages);
- multifd_send_state->pages = NULL;
- g_free(multifd_send_state);
- multifd_send_state = NULL;
+
+ multifd_send_cleanup_state();
}
static int multifd_zero_copy_flush(QIOChannel *c)
@@ -592,47 +730,38 @@ int multifd_send_sync_main(void)
return 0;
}
if (multifd_send_state->pages->num) {
- if (multifd_send_pages() < 0) {
+ if (!multifd_send_pages()) {
error_report("%s: multifd_send_pages fail", __func__);
return -1;
}
}
- /*
- * When using zero-copy, it's necessary to flush the pages before any of
- * the pages can be sent again, so we'll make sure the new version of the
- * pages will always arrive _later_ than the old pages.
- *
- * Currently we achieve this by flushing the zero-page requested writes
- * per ram iteration, but in the future we could potentially optimize it
- * to be less frequent, e.g. only after we finished one whole scanning of
- * all the dirty bitmaps.
- */
-
flush_zero_copy = migrate_zero_copy_send();
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
- trace_multifd_send_sync_main_signal(p->id);
-
- qemu_mutex_lock(&p->mutex);
-
- if (p->quit) {
- error_report("%s: channel %d has already quit", __func__, i);
- qemu_mutex_unlock(&p->mutex);
+ if (multifd_send_should_exit()) {
return -1;
}
- p->packet_num = multifd_send_state->packet_num++;
- p->flags |= MULTIFD_FLAG_SYNC;
- p->pending_job++;
- qemu_mutex_unlock(&p->mutex);
+ trace_multifd_send_sync_main_signal(p->id);
+
+ /*
+ * We should be the only user so far, so not possible to be set by
+ * others concurrently.
+ */
+ assert(qatomic_read(&p->pending_sync) == false);
+ qatomic_set(&p->pending_sync, true);
qemu_sem_post(&p->sem);
}
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
+ if (multifd_send_should_exit()) {
+ return -1;
+ }
+
qemu_sem_wait(&multifd_send_state->channels_ready);
trace_multifd_send_sync_main_wait(p->id);
qemu_sem_wait(&p->sem_sync);
@@ -652,7 +781,6 @@ static void *multifd_send_thread(void *opaque)
MigrationThread *thread = NULL;
Error *local_err = NULL;
int ret = 0;
- bool use_zero_copy_send = migrate_zero_copy_send();
thread = migration_threads_add(p->name, qemu_get_thread_id());
@@ -663,64 +791,28 @@ static void *multifd_send_thread(void *opaque)
ret = -1;
goto out;
}
- /* initial packet */
- p->num_packets = 1;
while (true) {
qemu_sem_post(&multifd_send_state->channels_ready);
qemu_sem_wait(&p->sem);
- if (qatomic_read(&multifd_send_state->exiting)) {
+ if (multifd_send_should_exit()) {
break;
}
- qemu_mutex_lock(&p->mutex);
-
- if (p->pending_job) {
- uint64_t packet_num = p->packet_num;
- uint32_t flags;
- p->normal_num = 0;
-
- if (use_zero_copy_send) {
- p->iovs_num = 0;
- } else {
- p->iovs_num = 1;
- }
- for (int i = 0; i < p->pages->num; i++) {
- p->normal[p->normal_num] = p->pages->offset[i];
- p->normal_num++;
- }
+ /*
+ * Read pending_job flag before p->pages. Pairs with the
+ * qatomic_store_release() in multifd_send_pages().
+ */
+ if (qatomic_load_acquire(&p->pending_job)) {
+ MultiFDPages_t *pages = p->pages;
- if (p->normal_num) {
- ret = multifd_send_state->ops->send_prepare(p, &local_err);
- if (ret != 0) {
- qemu_mutex_unlock(&p->mutex);
- break;
- }
- }
- multifd_send_fill_packet(p);
- flags = p->flags;
- p->flags = 0;
- p->num_packets++;
- p->total_normal_pages += p->normal_num;
- p->pages->num = 0;
- p->pages->block = NULL;
- qemu_mutex_unlock(&p->mutex);
+ p->iovs_num = 0;
+ assert(pages->num);
- trace_multifd_send(p->id, packet_num, p->normal_num, flags,
- p->next_packet_size);
-
- if (use_zero_copy_send) {
- /* Send header first, without zerocopy */
- ret = qio_channel_write_all(p->c, (void *)p->packet,
- p->packet_len, &local_err);
- if (ret != 0) {
- break;
- }
- } else {
- /* Send header using the same writev call */
- p->iov[0].iov_len = p->packet_len;
- p->iov[0].iov_base = p->packet;
+ ret = multifd_send_state->ops->send_prepare(p, &local_err);
+ if (ret != 0) {
+ break;
}
ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
@@ -731,17 +823,35 @@ static void *multifd_send_thread(void *opaque)
stat64_add(&mig_stats.multifd_bytes,
p->next_packet_size + p->packet_len);
+
+ multifd_pages_reset(p->pages);
p->next_packet_size = 0;
- qemu_mutex_lock(&p->mutex);
- p->pending_job--;
- qemu_mutex_unlock(&p->mutex);
- if (flags & MULTIFD_FLAG_SYNC) {
- qemu_sem_post(&p->sem_sync);
- }
+ /*
+ * Making sure p->pages is published before saying "we're
+ * free". Pairs with the smp_mb_acquire() in
+ * multifd_send_pages().
+ */
+ qatomic_store_release(&p->pending_job, false);
} else {
- qemu_mutex_unlock(&p->mutex);
- /* sometimes there are spurious wakeups */
+ /*
+ * If not a normal job, must be a sync request. Note that
+ * pending_sync is a standalone flag (unlike pending_job), so
+ * it doesn't require explicit memory barriers.
+ */
+ assert(qatomic_read(&p->pending_sync));
+ p->flags = MULTIFD_FLAG_SYNC;
+ multifd_send_fill_packet(p);
+ ret = qio_channel_write_all(p->c, (void *)p->packet,
+ p->packet_len, &local_err);
+ if (ret != 0) {
+ break;
+ }
+ /* p->next_packet_size will always be zero for a SYNC packet */
+ stat64_add(&mig_stats.multifd_bytes, p->packet_len);
+ p->flags = 0;
+ qatomic_set(&p->pending_sync, false);
+ qemu_sem_post(&p->sem_sync);
}
}
@@ -749,53 +859,19 @@ out:
if (ret) {
assert(local_err);
trace_multifd_send_error(p->id);
- multifd_send_terminate_threads(local_err);
- qemu_sem_post(&p->sem_sync);
- qemu_sem_post(&multifd_send_state->channels_ready);
+ multifd_send_set_error(local_err);
+ multifd_send_kick_main(p);
error_free(local_err);
}
- qemu_mutex_lock(&p->mutex);
- p->running = false;
- qemu_mutex_unlock(&p->mutex);
-
rcu_unregister_thread();
migration_threads_remove(thread);
- trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages);
+ trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
return NULL;
}
-static bool multifd_channel_connect(MultiFDSendParams *p,
- QIOChannel *ioc,
- Error **errp);
-
-static void multifd_tls_outgoing_handshake(QIOTask *task,
- gpointer opaque)
-{
- MultiFDSendParams *p = opaque;
- QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
- Error *err = NULL;
-
- if (!qio_task_propagate_error(task, &err)) {
- trace_multifd_tls_outgoing_handshake_complete(ioc);
- if (multifd_channel_connect(p, ioc, &err)) {
- return;
- }
- }
-
- trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
-
- migrate_set_error(migrate_get_current(), err);
- /*
- * Error happen, mark multifd_send_thread status as 'quit' although it
- * is not created, and then tell who pay attention to me.
- */
- p->quit = true;
- qemu_sem_post(&multifd_send_state->channels_ready);
- qemu_sem_post(&p->sem_sync);
- error_free(err);
-}
+static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
static void *multifd_tls_handshake_thread(void *opaque)
{
@@ -803,7 +879,7 @@ static void *multifd_tls_handshake_thread(void *opaque)
QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c);
qio_channel_tls_handshake(tioc,
- multifd_tls_outgoing_handshake,
+ multifd_new_send_channel_async,
p,
NULL,
NULL);
@@ -823,11 +899,17 @@ static bool multifd_tls_channel_connect(MultiFDSendParams *p,
return false;
}
+ /*
+ * Ownership of the socket channel now transfers to the newly
+ * created TLS channel, which has already taken a reference.
+ */
object_unref(OBJECT(ioc));
trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
p->c = QIO_CHANNEL(tioc);
- qemu_thread_create(&p->thread, "multifd-tls-handshake-worker",
+
+ p->tls_thread_created = true;
+ qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker",
multifd_tls_handshake_thread, p,
QEMU_THREAD_JOINABLE);
return true;
@@ -837,61 +919,72 @@ static bool multifd_channel_connect(MultiFDSendParams *p,
QIOChannel *ioc,
Error **errp)
{
- trace_multifd_set_outgoing_channel(
- ioc, object_get_typename(OBJECT(ioc)),
- migrate_get_current()->hostname);
-
- if (migrate_channel_requires_tls_upgrade(ioc)) {
- /*
- * tls_channel_connect will call back to this
- * function after the TLS handshake,
- * so we mustn't call multifd_send_thread until then
- */
- return multifd_tls_channel_connect(p, ioc, errp);
- }
+ qio_channel_set_delay(ioc, false);
migration_ioc_register_yank(ioc);
p->registered_yank = true;
p->c = ioc;
+
+ p->thread_created = true;
qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
QEMU_THREAD_JOINABLE);
return true;
}
-static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
- QIOChannel *ioc, Error *err)
-{
- migrate_set_error(migrate_get_current(), err);
- /* Error happen, we need to tell who pay attention to me */
- qemu_sem_post(&multifd_send_state->channels_ready);
- qemu_sem_post(&p->sem_sync);
- /*
- * Although multifd_send_thread is not created, but main migration
- * thread need to judge whether it is running, so we need to mark
- * its status.
- */
- p->quit = true;
- object_unref(OBJECT(ioc));
- error_free(err);
-}
-
+/*
+ * When TLS is enabled this function is called once to establish the
+ * TLS connection and a second time after the TLS handshake to create
+ * the multifd channel. Without TLS it goes straight into the channel
+ * creation.
+ */
static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
{
MultiFDSendParams *p = opaque;
QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
Error *local_err = NULL;
+ bool ret;
trace_multifd_new_send_channel_async(p->id);
- if (!qio_task_propagate_error(task, &local_err)) {
- qio_channel_set_delay(ioc, false);
- p->running = true;
- if (multifd_channel_connect(p, ioc, &local_err)) {
+
+ if (qio_task_propagate_error(task, &local_err)) {
+ ret = false;
+ goto out;
+ }
+
+ trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
+ migrate_get_current()->hostname);
+
+ if (migrate_channel_requires_tls_upgrade(ioc)) {
+ ret = multifd_tls_channel_connect(p, ioc, &local_err);
+ if (ret) {
return;
}
+ } else {
+ ret = multifd_channel_connect(p, ioc, &local_err);
+ }
+
+out:
+ /*
+ * Here we're not interested whether creation succeeded, only that
+ * it happened at all.
+ */
+ qemu_sem_post(&multifd_send_state->channels_created);
+
+ if (ret) {
+ return;
}
trace_multifd_new_send_channel_async_error(p->id, local_err);
- multifd_new_send_channel_cleanup(p, ioc, local_err);
+ multifd_send_set_error(local_err);
+ if (!p->c) {
+ /*
+ * If no channel has been created, drop the initial
+ * reference. Otherwise cleanup happens at
+ * multifd_send_channel_destroy()
+ */
+ object_unref(OBJECT(ioc));
+ }
+ error_free(local_err);
}
static void multifd_new_send_channel_create(gpointer opaque)
@@ -899,20 +992,23 @@ static void multifd_new_send_channel_create(gpointer opaque)
socket_send_channel_create(multifd_new_send_channel_async, opaque);
}
-int multifd_save_setup(Error **errp)
+bool multifd_send_setup(void)
{
- int thread_count;
+ MigrationState *s = migrate_get_current();
+ Error *local_err = NULL;
+ int thread_count, ret = 0;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
uint8_t i;
if (!migrate_multifd()) {
- return 0;
+ return true;
}
thread_count = migrate_multifd_channels();
multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
multifd_send_state->pages = multifd_pages_init(page_count);
+ qemu_sem_init(&multifd_send_state->channels_created, 0);
qemu_sem_init(&multifd_send_state->channels_ready, 0);
qatomic_set(&multifd_send_state->exiting, 0);
multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
@@ -920,11 +1016,8 @@ int multifd_save_setup(Error **errp)
for (i = 0; i < thread_count; i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
- qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->sem_sync, 0);
- p->quit = false;
- p->pending_job = 0;
p->id = i;
p->pages = multifd_pages_init(page_count);
p->packet_len = sizeof(MultiFDPacket_t)
@@ -935,29 +1028,39 @@ int multifd_save_setup(Error **errp)
p->name = g_strdup_printf("multifdsend_%d", i);
/* We need one extra place for the packet header */
p->iov = g_new0(struct iovec, page_count + 1);
- p->normal = g_new0(ram_addr_t, page_count);
p->page_size = qemu_target_page_size();
p->page_count = page_count;
-
- if (migrate_zero_copy_send()) {
- p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
- } else {
- p->write_flags = 0;
- }
-
+ p->write_flags = 0;
multifd_new_send_channel_create(p);
}
+ /*
+ * Wait until channel creation has started for all channels. The
+ * creation can still fail, but no more channels will be created
+ * past this point.
+ */
+ for (i = 0; i < thread_count; i++) {
+ qemu_sem_wait(&multifd_send_state->channels_created);
+ }
+
for (i = 0; i < thread_count; i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
- int ret;
- ret = multifd_send_state->ops->send_setup(p, errp);
+ ret = multifd_send_state->ops->send_setup(p, &local_err);
if (ret) {
- return ret;
+ break;
}
}
- return 0;
+
+ if (ret) {
+ migrate_set_error(s, local_err);
+ error_report_err(local_err);
+ migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
+ MIGRATION_STATUS_FAILED);
+ return false;
+ }
+
+ return true;
}
struct {
@@ -1006,14 +1109,42 @@ static void multifd_recv_terminate_threads(Error *err)
}
}
-void multifd_load_shutdown(void)
+void multifd_recv_shutdown(void)
{
if (migrate_multifd()) {
multifd_recv_terminate_threads(NULL);
}
}
-void multifd_load_cleanup(void)
+static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
+{
+ migration_ioc_unregister_yank(p->c);
+ object_unref(OBJECT(p->c));
+ p->c = NULL;
+ qemu_mutex_destroy(&p->mutex);
+ qemu_sem_destroy(&p->sem_sync);
+ g_free(p->name);
+ p->name = NULL;
+ p->packet_len = 0;
+ g_free(p->packet);
+ p->packet = NULL;
+ g_free(p->iov);
+ p->iov = NULL;
+ g_free(p->normal);
+ p->normal = NULL;
+ multifd_recv_state->ops->recv_cleanup(p);
+}
+
+static void multifd_recv_cleanup_state(void)
+{
+ qemu_sem_destroy(&multifd_recv_state->sem_sync);
+ g_free(multifd_recv_state->params);
+ multifd_recv_state->params = NULL;
+ g_free(multifd_recv_state);
+ multifd_recv_state = NULL;
+}
+
+void multifd_recv_cleanup(void)
{
int i;
@@ -1024,40 +1155,20 @@ void multifd_load_cleanup(void)
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDRecvParams *p = &multifd_recv_state->params[i];
- if (p->running) {
- /*
- * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
- * however try to wakeup it without harm in cleanup phase.
- */
- qemu_sem_post(&p->sem_sync);
- }
+ /*
+ * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
+ * however try to wakeup it without harm in cleanup phase.
+ */
+ qemu_sem_post(&p->sem_sync);
- qemu_thread_join(&p->thread);
+ if (p->thread_created) {
+ qemu_thread_join(&p->thread);
+ }
}
for (i = 0; i < migrate_multifd_channels(); i++) {
- MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
- migration_ioc_unregister_yank(p->c);
- object_unref(OBJECT(p->c));
- p->c = NULL;
- qemu_mutex_destroy(&p->mutex);
- qemu_sem_destroy(&p->sem_sync);
- g_free(p->name);
- p->name = NULL;
- p->packet_len = 0;
- g_free(p->packet);
- p->packet = NULL;
- g_free(p->iov);
- p->iov = NULL;
- g_free(p->normal);
- p->normal = NULL;
- multifd_recv_state->ops->recv_cleanup(p);
+ multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
}
- qemu_sem_destroy(&multifd_recv_state->sem_sync);
- g_free(multifd_recv_state->params);
- multifd_recv_state->params = NULL;
- g_free(multifd_recv_state);
- multifd_recv_state = NULL;
+ multifd_recv_cleanup_state();
}
void multifd_recv_sync_main(void)
@@ -1119,10 +1230,6 @@ static void *multifd_recv_thread(void *opaque)
flags = p->flags;
/* recv methods don't know how to handle the SYNC flag */
p->flags &= ~MULTIFD_FLAG_SYNC;
- trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
- p->next_packet_size);
- p->num_packets++;
- p->total_normal_pages += p->normal_num;
qemu_mutex_unlock(&p->mutex);
if (p->normal_num) {
@@ -1142,17 +1249,14 @@ static void *multifd_recv_thread(void *opaque)
multifd_recv_terminate_threads(local_err);
error_free(local_err);
}
- qemu_mutex_lock(&p->mutex);
- p->running = false;
- qemu_mutex_unlock(&p->mutex);
rcu_unregister_thread();
- trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages);
+ trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages);
return NULL;
}
-int multifd_load_setup(Error **errp)
+int multifd_recv_setup(Error **errp)
{
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
@@ -1249,10 +1353,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
}
p->c = ioc;
object_ref(OBJECT(ioc));
- /* initial packet */
- p->num_packets = 1;
- p->running = true;
+ p->thread_created = true;
qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
QEMU_THREAD_JOINABLE);
qatomic_inc(&multifd_recv_state->count);
diff --git a/migration/multifd.h b/migration/multifd.h
index 35d11f1..8a1cad0 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -13,16 +13,16 @@
#ifndef QEMU_MIGRATION_MULTIFD_H
#define QEMU_MIGRATION_MULTIFD_H
-int multifd_save_setup(Error **errp);
-void multifd_save_cleanup(void);
-int multifd_load_setup(Error **errp);
-void multifd_load_cleanup(void);
-void multifd_load_shutdown(void);
+bool multifd_send_setup(void);
+void multifd_send_shutdown(void);
+int multifd_recv_setup(Error **errp);
+void multifd_recv_cleanup(void);
+void multifd_recv_shutdown(void);
bool multifd_recv_all_channels_created(void);
void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
void multifd_recv_sync_main(void);
int multifd_send_sync_main(void);
-int multifd_queue_page(RAMBlock *block, ram_addr_t offset);
+bool multifd_queue_page(RAMBlock *block, ram_addr_t offset);
/* Multifd Compression flags */
#define MULTIFD_FLAG_SYNC (1 << 0)
@@ -73,6 +73,9 @@ typedef struct {
char *name;
/* channel thread id */
QemuThread thread;
+ bool thread_created;
+ QemuThread tls_thread;
+ bool tls_thread_created;
/* communication channel */
QIOChannel *c;
/* is the yank function registered */
@@ -91,18 +94,19 @@ typedef struct {
/* syncs main thread and channels */
QemuSemaphore sem_sync;
- /* this mutex protects the following parameters */
- QemuMutex mutex;
- /* is this channel thread running */
- bool running;
- /* should this thread finish */
- bool quit;
/* multifd flags for each packet */
uint32_t flags;
- /* global number of generated multifd packets */
- uint64_t packet_num;
- /* thread has work to do */
- int pending_job;
+ /*
+ * The sender thread has work to do if either of below boolean is set.
+ *
+ * @pending_job: a job is pending
+ * @pending_sync: a sync request is pending
+ *
+ * For both of these fields, they're only set by the requesters, and
+ * cleared by the multifd sender threads.
+ */
+ bool pending_job;
+ bool pending_sync;
/* array of pages to sent.
* The owner of 'pages' depends of 'pending_job' value:
* pending_job == 0 -> migration_thread can use it.
@@ -117,17 +121,13 @@ typedef struct {
/* size of the next packet that contains pages */
uint32_t next_packet_size;
/* packets sent through this channel */
- uint64_t num_packets;
+ uint64_t packets_sent;
/* non zero pages sent through this channel */
uint64_t total_normal_pages;
/* buffers to send */
struct iovec *iov;
/* number of iovs used */
uint32_t iovs_num;
- /* Pages that are not zero */
- ram_addr_t *normal;
- /* num of non zero pages */
- uint32_t normal_num;
/* used for compression methods */
void *data;
} MultiFDSendParams;
@@ -142,6 +142,7 @@ typedef struct {
char *name;
/* channel thread id */
QemuThread thread;
+ bool thread_created;
/* communication channel */
QIOChannel *c;
/* packet allocated len */
@@ -156,8 +157,6 @@ typedef struct {
/* this mutex protects the following parameters */
QemuMutex mutex;
- /* is this channel thread running */
- bool running;
/* should this thread finish */
bool quit;
/* multifd flags for each packet */
@@ -171,8 +170,8 @@ typedef struct {
MultiFDPacket_t *packet;
/* size of the next packet that contains pages */
uint32_t next_packet_size;
- /* packets sent through this channel */
- uint64_t num_packets;
+ /* packets received through this channel */
+ uint64_t packets_recved;
/* ramblock */
RAMBlock *block;
/* ramblock host address */
@@ -205,6 +204,14 @@ typedef struct {
} MultiFDMethods;
void multifd_register_ops(int method, MultiFDMethods *ops);
+void multifd_send_fill_packet(MultiFDSendParams *p);
+
+static inline void multifd_send_prepare_header(MultiFDSendParams *p)
+{
+ p->iov[0].iov_len = p->packet_len;
+ p->iov[0].iov_base = p->packet;
+ p->iovs_num++;
+}
-#endif
+#endif
diff --git a/migration/ram.c b/migration/ram.c
index d5b7cd5..4649a81 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1252,7 +1252,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss)
static int ram_save_multifd_page(RAMBlock *block, ram_addr_t offset)
{
- if (multifd_queue_page(block, offset) < 0) {
+ if (!multifd_queue_page(block, offset)) {
return -1;
}
stat64_add(&mig_stats.normal_pages, 1);
diff --git a/migration/trace-events b/migration/trace-events
index de4a743..298ad2b 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -141,7 +141,7 @@ multifd_send_error(uint8_t id) "channel %u"
multifd_send_sync_main(long packet_num) "packet num %ld"
multifd_send_sync_main_signal(uint8_t id) "channel %u"
multifd_send_sync_main_wait(uint8_t id) "channel %u"
-multifd_send_terminate_threads(bool error) "error %d"
+multifd_send_terminate_threads(void) ""
multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64
multifd_send_thread_start(uint8_t id) "%u"
multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s"
diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
index 7675519..8a5bb17 100644
--- a/tests/qtest/migration-test.c
+++ b/tests/qtest/migration-test.c
@@ -819,7 +819,7 @@ static int test_migrate_start(QTestState **from, QTestState **to,
} else if (strcmp(arch, "aarch64") == 0) {
memory_size = "150M";
machine_alias = "virt";
- machine_opts = "gic-version=max";
+ machine_opts = "gic-version=3";
arch_opts = g_strdup_printf("-cpu max -kernel %s", bootpath);
start_address = ARM_TEST_MEM_START;
end_address = ARM_TEST_MEM_END;