aboutsummaryrefslogtreecommitdiff
path: root/migration/multifd.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/multifd.c')
-rw-r--r--migration/multifd.c824
1 files changed, 344 insertions, 480 deletions
diff --git a/migration/multifd.c b/migration/multifd.c
index 0b4cbad..b255778 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -12,15 +12,18 @@
#include "qemu/osdep.h"
#include "qemu/cutils.h"
+#include "qemu/iov.h"
#include "qemu/rcu.h"
#include "exec/target_page.h"
-#include "sysemu/sysemu.h"
-#include "exec/ramblock.h"
+#include "system/system.h"
+#include "system/ramblock.h"
#include "qemu/error-report.h"
#include "qapi/error.h"
#include "file.h"
+#include "migration/misc.h"
#include "migration.h"
#include "migration-stats.h"
+#include "savevm.h"
#include "socket.h"
#include "tls.h"
#include "qemu-file.h"
@@ -33,11 +36,6 @@
#include "io/channel-socket.h"
#include "yank_functions.h"
-/* Multiple fd's */
-
-#define MULTIFD_MAGIC 0x11223344U
-#define MULTIFD_VERSION 1
-
typedef struct {
uint32_t magic;
uint32_t version;
@@ -49,8 +47,10 @@ typedef struct {
struct {
MultiFDSendParams *params;
- /* array of pages to sent */
- MultiFDPages_t *pages;
+
+ /* multifd_send() body is not thread safe, needs serialization */
+ QemuMutex multifd_send_mutex;
+
/*
* Global number of generated multifd packets.
*
@@ -78,7 +78,7 @@ struct {
*/
int exiting;
/* multifd ops */
- MultiFDMethods *ops;
+ const MultiFDMethods *ops;
} *multifd_send_state;
struct {
@@ -95,236 +95,70 @@ struct {
uint64_t packet_num;
int exiting;
/* multifd ops */
- MultiFDMethods *ops;
+ const MultiFDMethods *ops;
} *multifd_recv_state;
-static bool multifd_use_packets(void)
-{
- return !migrate_mapped_ram();
-}
-
-void multifd_send_channel_created(void)
-{
- qemu_sem_post(&multifd_send_state->channels_created);
-}
-
-static void multifd_set_file_bitmap(MultiFDSendParams *p)
+MultiFDSendData *multifd_send_data_alloc(void)
{
- MultiFDPages_t *pages = p->pages;
+ MultiFDSendData *new = g_new0(MultiFDSendData, 1);
- assert(pages->block);
+ multifd_ram_payload_alloc(&new->u.ram);
+ /* Device state allocates its payload on-demand */
- for (int i = 0; i < p->pages->normal_num; i++) {
- ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true);
- }
-
- for (int i = p->pages->normal_num; i < p->pages->num; i++) {
- ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false);
- }
+ return new;
}
-/* Multifd without compression */
-
-/**
- * nocomp_send_setup: setup send side
- *
- * @p: Params for the channel that we are using
- * @errp: pointer to an error
- */
-static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
+void multifd_send_data_clear(MultiFDSendData *data)
{
- if (migrate_zero_copy_send()) {
- p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
+ if (multifd_payload_empty(data)) {
+ return;
}
- if (multifd_use_packets()) {
- /* We need one extra place for the packet header */
- p->iov = g_new0(struct iovec, p->page_count + 1);
- } else {
- p->iov = g_new0(struct iovec, p->page_count);
+ switch (data->type) {
+ case MULTIFD_PAYLOAD_DEVICE_STATE:
+ multifd_send_data_clear_device_state(&data->u.device_state);
+ break;
+ default:
+ /* Nothing to do */
+ break;
}
- return 0;
-}
-
-/**
- * nocomp_send_cleanup: cleanup send side
- *
- * For no compression this function does nothing.
- *
- * @p: Params for the channel that we are using
- * @errp: pointer to an error
- */
-static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
-{
- g_free(p->iov);
- p->iov = NULL;
- return;
+ data->type = MULTIFD_PAYLOAD_NONE;
}
-static void multifd_send_prepare_iovs(MultiFDSendParams *p)
+void multifd_send_data_free(MultiFDSendData *data)
{
- MultiFDPages_t *pages = p->pages;
-
- for (int i = 0; i < pages->normal_num; i++) {
- p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
- p->iov[p->iovs_num].iov_len = p->page_size;
- p->iovs_num++;
- }
-
- p->next_packet_size = pages->normal_num * p->page_size;
-}
-
-/**
- * nocomp_send_prepare: prepare date to be able to send
- *
- * For no compression we just have to calculate the size of the
- * packet.
- *
- * Returns 0 for success or -1 for error
- *
- * @p: Params for the channel that we are using
- * @errp: pointer to an error
- */
-static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
-{
- bool use_zero_copy_send = migrate_zero_copy_send();
- int ret;
-
- multifd_send_zero_page_detect(p);
-
- if (!multifd_use_packets()) {
- multifd_send_prepare_iovs(p);
- multifd_set_file_bitmap(p);
-
- return 0;
- }
-
- if (!use_zero_copy_send) {
- /*
- * Only !zerocopy needs the header in IOV; zerocopy will
- * send it separately.
- */
- multifd_send_prepare_header(p);
+ if (!data) {
+ return;
}
- multifd_send_prepare_iovs(p);
- p->flags |= MULTIFD_FLAG_NOCOMP;
+ /* This also free's device state payload */
+ multifd_send_data_clear(data);
- multifd_send_fill_packet(p);
-
- if (use_zero_copy_send) {
- /* Send header first, without zerocopy */
- ret = qio_channel_write_all(p->c, (void *)p->packet,
- p->packet_len, errp);
- if (ret != 0) {
- return -1;
- }
- }
+ multifd_ram_payload_free(&data->u.ram);
- return 0;
-}
-
-/**
- * nocomp_recv_setup: setup receive side
- *
- * For no compression this function does nothing.
- *
- * Returns 0 for success or -1 for error
- *
- * @p: Params for the channel that we are using
- * @errp: pointer to an error
- */
-static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
-{
- p->iov = g_new0(struct iovec, p->page_count);
- return 0;
+ g_free(data);
}
-/**
- * nocomp_recv_cleanup: setup receive side
- *
- * For no compression this function does nothing.
- *
- * @p: Params for the channel that we are using
- */
-static void nocomp_recv_cleanup(MultiFDRecvParams *p)
+static bool multifd_use_packets(void)
{
- g_free(p->iov);
- p->iov = NULL;
+ return !migrate_mapped_ram();
}
-/**
- * nocomp_recv: read the data from the channel
- *
- * For no compression we just need to read things into the correct place.
- *
- * Returns 0 for success or -1 for error
- *
- * @p: Params for the channel that we are using
- * @errp: pointer to an error
- */
-static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
+void multifd_send_channel_created(void)
{
- uint32_t flags;
-
- if (!multifd_use_packets()) {
- return multifd_file_recv_data(p, errp);
- }
-
- flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
-
- if (flags != MULTIFD_FLAG_NOCOMP) {
- error_setg(errp, "multifd %u: flags received %x flags expected %x",
- p->id, flags, MULTIFD_FLAG_NOCOMP);
- return -1;
- }
-
- multifd_recv_zero_page_process(p);
-
- if (!p->normal_num) {
- return 0;
- }
-
- for (int i = 0; i < p->normal_num; i++) {
- p->iov[i].iov_base = p->host + p->normal[i];
- p->iov[i].iov_len = p->page_size;
- ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
- }
- return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
+ qemu_sem_post(&multifd_send_state->channels_created);
}
-static MultiFDMethods multifd_nocomp_ops = {
- .send_setup = nocomp_send_setup,
- .send_cleanup = nocomp_send_cleanup,
- .send_prepare = nocomp_send_prepare,
- .recv_setup = nocomp_recv_setup,
- .recv_cleanup = nocomp_recv_cleanup,
- .recv = nocomp_recv
-};
+static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
-static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
- [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
-};
-
-void multifd_register_ops(int method, MultiFDMethods *ops)
+void multifd_register_ops(int method, const MultiFDMethods *ops)
{
- assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
+ assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
+ assert(!multifd_ops[method]);
multifd_ops[method] = ops;
}
-/* Reset a MultiFDPages_t* object for the next use */
-static void multifd_pages_reset(MultiFDPages_t *pages)
-{
- /*
- * We don't need to touch offset[] array, because it will be
- * overwritten later when reused.
- */
- pages->num = 0;
- pages->normal_num = 0;
- pages->block = NULL;
-}
-
static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
{
MultiFDInit_t msg = {};
@@ -389,160 +223,95 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
return msg.id;
}
-static MultiFDPages_t *multifd_pages_init(uint32_t n)
-{
- MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
-
- pages->allocated = n;
- pages->offset = g_new0(ram_addr_t, n);
-
- return pages;
-}
-
-static void multifd_pages_clear(MultiFDPages_t *pages)
-{
- multifd_pages_reset(pages);
- pages->allocated = 0;
- g_free(pages->offset);
- pages->offset = NULL;
- g_free(pages);
-}
-
+/* Fills a RAM multifd packet */
void multifd_send_fill_packet(MultiFDSendParams *p)
{
MultiFDPacket_t *packet = p->packet;
- MultiFDPages_t *pages = p->pages;
uint64_t packet_num;
- uint32_t zero_num = pages->num - pages->normal_num;
- int i;
+ bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
+
+ memset(packet, 0, p->packet_len);
- packet->flags = cpu_to_be32(p->flags);
- packet->pages_alloc = cpu_to_be32(p->pages->allocated);
- packet->normal_pages = cpu_to_be32(pages->normal_num);
- packet->zero_pages = cpu_to_be32(zero_num);
+ packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
+ packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
+
+ packet->hdr.flags = cpu_to_be32(p->flags);
packet->next_packet_size = cpu_to_be32(p->next_packet_size);
packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
packet->packet_num = cpu_to_be64(packet_num);
- if (pages->block) {
- strncpy(packet->ramblock, pages->block->idstr, 256);
- }
-
- for (i = 0; i < pages->num; i++) {
- /* there are architectures where ram_addr_t is 32 bit */
- uint64_t temp = pages->offset[i];
+ p->packets_sent++;
- packet->offset[i] = cpu_to_be64(temp);
+ if (!sync_packet) {
+ multifd_ram_fill_packet(p);
}
- p->packets_sent++;
- p->total_normal_pages += pages->normal_num;
- p->total_zero_pages += zero_num;
-
- trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num,
- p->flags, p->next_packet_size);
+ trace_multifd_send_fill(p->id, packet_num,
+ p->flags, p->next_packet_size);
}
-static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
+ const MultiFDPacketHdr_t *hdr,
+ Error **errp)
{
- MultiFDPacket_t *packet = p->packet;
- int i;
+ uint32_t magic = be32_to_cpu(hdr->magic);
+ uint32_t version = be32_to_cpu(hdr->version);
- packet->magic = be32_to_cpu(packet->magic);
- if (packet->magic != MULTIFD_MAGIC) {
- error_setg(errp, "multifd: received packet "
- "magic %x and expected magic %x",
- packet->magic, MULTIFD_MAGIC);
+ if (magic != MULTIFD_MAGIC) {
+ error_setg(errp, "multifd: received packet magic %x, expected %x",
+ magic, MULTIFD_MAGIC);
return -1;
}
- packet->version = be32_to_cpu(packet->version);
- if (packet->version != MULTIFD_VERSION) {
- error_setg(errp, "multifd: received packet "
- "version %u and expected version %u",
- packet->version, MULTIFD_VERSION);
+ if (version != MULTIFD_VERSION) {
+ error_setg(errp, "multifd: received packet version %u, expected %u",
+ version, MULTIFD_VERSION);
return -1;
}
- p->flags = be32_to_cpu(packet->flags);
-
- packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
- /*
- * If we received a packet that is 100 times bigger than expected
- * just stop migration. It is a magic number.
- */
- if (packet->pages_alloc > p->page_count) {
- error_setg(errp, "multifd: received packet "
- "with size %u and expected a size of %u",
- packet->pages_alloc, p->page_count) ;
- return -1;
- }
+ p->flags = be32_to_cpu(hdr->flags);
- p->normal_num = be32_to_cpu(packet->normal_pages);
- if (p->normal_num > packet->pages_alloc) {
- error_setg(errp, "multifd: received packet "
- "with %u normal pages and expected maximum pages are %u",
- p->normal_num, packet->pages_alloc) ;
- return -1;
- }
+ return 0;
+}
- p->zero_num = be32_to_cpu(packet->zero_pages);
- if (p->zero_num > packet->pages_alloc - p->normal_num) {
- error_setg(errp, "multifd: received packet "
- "with %u zero pages and expected maximum zero pages are %u",
- p->zero_num, packet->pages_alloc - p->normal_num) ;
- return -1;
- }
+static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
+ Error **errp)
+{
+ MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
+ packet->instance_id = be32_to_cpu(packet->instance_id);
p->next_packet_size = be32_to_cpu(packet->next_packet_size);
- p->packet_num = be64_to_cpu(packet->packet_num);
- p->packets_recved++;
- p->total_normal_pages += p->normal_num;
- p->total_zero_pages += p->zero_num;
- trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num,
- p->flags, p->next_packet_size);
+ return 0;
+}
- if (p->normal_num == 0 && p->zero_num == 0) {
- return 0;
- }
+static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
+{
+ const MultiFDPacket_t *packet = p->packet;
+ int ret = 0;
- /* make sure that ramblock is 0 terminated */
- packet->ramblock[255] = 0;
- p->block = qemu_ram_block_by_name(packet->ramblock);
- if (!p->block) {
- error_setg(errp, "multifd: unknown ram block %s",
- packet->ramblock);
- return -1;
- }
+ p->next_packet_size = be32_to_cpu(packet->next_packet_size);
+ p->packet_num = be64_to_cpu(packet->packet_num);
- p->host = p->block->host;
- for (i = 0; i < p->normal_num; i++) {
- uint64_t offset = be64_to_cpu(packet->offset[i]);
+ /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
+ ret = multifd_ram_unfill_packet(p, errp);
- if (offset > (p->block->used_length - p->page_size)) {
- error_setg(errp, "multifd: offset too long %" PRIu64
- " (max " RAM_ADDR_FMT ")",
- offset, p->block->used_length);
- return -1;
- }
- p->normal[i] = offset;
- }
+ trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
+ p->next_packet_size);
- for (i = 0; i < p->zero_num; i++) {
- uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]);
+ return ret;
+}
- if (offset > (p->block->used_length - p->page_size)) {
- error_setg(errp, "multifd: offset too long %" PRIu64
- " (max " RAM_ADDR_FMT ")",
- offset, p->block->used_length);
- return -1;
- }
- p->zero[i] = offset;
+static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+{
+ p->packets_recved++;
+
+ if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
+ return multifd_recv_unfill_packet_device_state(p, errp);
}
- return 0;
+ return multifd_recv_unfill_packet_ram(p, errp);
}
static bool multifd_send_should_exit(void)
@@ -568,35 +337,32 @@ static void multifd_send_kick_main(MultiFDSendParams *p)
}
/*
- * How we use multifd_send_state->pages and channel->pages?
+ * multifd_send() works by exchanging the MultiFDSendData object
+ * provided by the caller with an unused MultiFDSendData object from
+ * the next channel that is found to be idle.
*
- * We create a pages for each channel, and a main one. Each time that
- * we need to send a batch of pages we interchange the ones between
- * multifd_send_state and the channel that is sending it. There are
- * two reasons for that:
- * - to not have to do so many mallocs during migration
- * - to make easier to know what to free at the end of migration
+ * The channel owns the data until it finishes transmitting and the
+ * caller owns the empty object until it fills it with data and calls
+ * this function again. No locking necessary.
*
- * This way we always know who is the owner of each "pages" struct,
- * and we don't need any locking. It belongs to the migration thread
- * or to the channel thread. Switching is safe because the migration
- * thread is using the channel mutex when changing it, and the channel
- * have to had finish with its own, otherwise pending_job can't be
- * false.
+ * Switching is safe because both the migration thread and the channel
+ * thread have barriers in place to serialize access.
*
* Returns true if succeed, false otherwise.
*/
-static bool multifd_send_pages(void)
+bool multifd_send(MultiFDSendData **send_data)
{
int i;
static int next_channel;
MultiFDSendParams *p = NULL; /* make happy gcc */
- MultiFDPages_t *pages = multifd_send_state->pages;
+ MultiFDSendData *tmp;
if (multifd_send_should_exit()) {
return false;
}
+ QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex);
+
/* We wait here, until at least one channel is ready */
qemu_sem_wait(&multifd_send_state->channels_ready);
@@ -626,66 +392,24 @@ static bool multifd_send_pages(void)
* qatomic_store_release() in multifd_send_thread().
*/
smp_mb_acquire();
- assert(!p->pages->num);
- multifd_send_state->pages = p->pages;
- p->pages = pages;
- /*
- * Making sure p->pages is setup before marking pending_job=true. Pairs
- * with the qatomic_load_acquire() in multifd_send_thread().
- */
- qatomic_store_release(&p->pending_job, true);
- qemu_sem_post(&p->sem);
-
- return true;
-}
-
-static inline bool multifd_queue_empty(MultiFDPages_t *pages)
-{
- return pages->num == 0;
-}
-static inline bool multifd_queue_full(MultiFDPages_t *pages)
-{
- return pages->num == pages->allocated;
-}
-
-static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
-{
- pages->offset[pages->num++] = offset;
-}
+ assert(multifd_payload_empty(p->data));
-/* Returns true if enqueue successful, false otherwise */
-bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
-{
- MultiFDPages_t *pages;
-
-retry:
- pages = multifd_send_state->pages;
-
- /* If the queue is empty, we can already enqueue now */
- if (multifd_queue_empty(pages)) {
- pages->block = block;
- multifd_enqueue(pages, offset);
- return true;
- }
+ /*
+ * Swap the pointers. The channel gets the client data for
+ * transferring and the client gets back an unused data slot.
+ */
+ tmp = *send_data;
+ *send_data = p->data;
+ p->data = tmp;
/*
- * Not empty, meanwhile we need a flush. It can because of either:
- *
- * (1) The page is not on the same ramblock of previous ones, or,
- * (2) The queue is full.
- *
- * After flush, always retry.
+ * Making sure p->data is setup before marking pending_job=true. Pairs
+ * with the qatomic_load_acquire() in multifd_send_thread().
*/
- if (pages->block != block || multifd_queue_full(pages)) {
- if (!multifd_send_pages()) {
- return false;
- }
- goto retry;
- }
+ qatomic_store_release(&p->pending_job, true);
+ qemu_sem_post(&p->sem);
- /* Not empty, and we still have space, do it! */
- multifd_enqueue(pages, offset);
return true;
}
@@ -775,7 +499,7 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
* channels have no I/O handler callback registered when reaching
* here, because migration thread will wait for all multifd channel
* establishments to complete during setup. Since
- * migrate_fd_cleanup() will be scheduled in main thread too, all
+ * migration_cleanup() will be scheduled in main thread too, all
* previous callbacks should guarantee to be completed when
* reaching here. See multifd_send_state.channels_created and its
* usage. In the future, we could replace this with an assert
@@ -790,12 +514,13 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
qemu_sem_destroy(&p->sem_sync);
g_free(p->name);
p->name = NULL;
- multifd_pages_clear(p->pages);
- p->pages = NULL;
+ g_clear_pointer(&p->data, multifd_send_data_free);
p->packet_len = 0;
+ g_clear_pointer(&p->packet_device_state, g_free);
g_free(p->packet);
p->packet = NULL;
multifd_send_state->ops->send_cleanup(p, errp);
+ assert(!p->iov);
return *errp == NULL;
}
@@ -804,12 +529,12 @@ static void multifd_send_cleanup_state(void)
{
file_cleanup_outgoing_migration();
socket_cleanup_outgoing_migration();
+ multifd_device_state_send_cleanup();
qemu_sem_destroy(&multifd_send_state->channels_created);
qemu_sem_destroy(&multifd_send_state->channels_ready);
+ qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex);
g_free(multifd_send_state->params);
multifd_send_state->params = NULL;
- multifd_pages_clear(multifd_send_state->pages);
- multifd_send_state->pages = NULL;
g_free(multifd_send_state);
multifd_send_state = NULL;
}
@@ -822,6 +547,36 @@ void multifd_send_shutdown(void)
return;
}
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDSendParams *p = &multifd_send_state->params[i];
+
+ /* thread_created implies the TLS handshake has succeeded */
+ if (p->tls_thread_created && p->thread_created) {
+ Error *local_err = NULL;
+ /*
+ * The destination expects the TLS session to always be
+ * properly terminated. This helps to detect a premature
+ * termination in the middle of the stream. Note that
+ * older QEMUs always break the connection on the source
+ * and the destination always sees
+ * GNUTLS_E_PREMATURE_TERMINATION.
+ */
+ migration_tls_channel_end(p->c, &local_err);
+
+ /*
+ * The above can return an error in case the migration has
+ * already failed. If the migration succeeded, errors are
+ * not expected but there's no need to kill the source.
+ */
+ if (local_err && !migration_has_failed(migrate_get_current())) {
+ warn_report(
+ "multifd_send_%d: Failed to terminate TLS connection: %s",
+ p->id, error_get_pretty(local_err));
+ break;
+ }
+ }
+ }
+
multifd_send_terminate_threads();
for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -854,20 +609,12 @@ static int multifd_zero_copy_flush(QIOChannel *c)
return ret;
}
-int multifd_send_sync_main(void)
+int multifd_send_sync_main(MultiFDSyncReq req)
{
int i;
bool flush_zero_copy;
- if (!migrate_multifd()) {
- return 0;
- }
- if (multifd_send_state->pages->num) {
- if (!multifd_send_pages()) {
- error_report("%s: multifd_send_pages fail", __func__);
- return -1;
- }
- }
+ assert(req != MULTIFD_SYNC_NONE);
flush_zero_copy = migrate_zero_copy_send();
@@ -884,8 +631,8 @@ int multifd_send_sync_main(void)
* We should be the only user so far, so not possible to be set by
* others concurrently.
*/
- assert(qatomic_read(&p->pending_sync) == false);
- qatomic_set(&p->pending_sync, true);
+ assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
+ qatomic_set(&p->pending_sync, req);
qemu_sem_post(&p->sem);
}
for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -937,26 +684,46 @@ static void *multifd_send_thread(void *opaque)
}
/*
- * Read pending_job flag before p->pages. Pairs with the
- * qatomic_store_release() in multifd_send_pages().
+ * Read pending_job flag before p->data. Pairs with the
+ * qatomic_store_release() in multifd_send().
*/
if (qatomic_load_acquire(&p->pending_job)) {
- MultiFDPages_t *pages = p->pages;
+ bool is_device_state = multifd_payload_device_state(p->data);
+ size_t total_size;
+ int write_flags_masked = 0;
+ p->flags = 0;
p->iovs_num = 0;
- assert(pages->num);
+ assert(!multifd_payload_empty(p->data));
- ret = multifd_send_state->ops->send_prepare(p, &local_err);
- if (ret != 0) {
- break;
+ if (is_device_state) {
+ multifd_device_state_send_prepare(p);
+
+ /* Device state packets cannot be sent via zerocopy */
+ write_flags_masked |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
+ } else {
+ ret = multifd_send_state->ops->send_prepare(p, &local_err);
+ if (ret != 0) {
+ break;
+ }
}
+ /*
+ * The packet header in the zerocopy RAM case is accounted for
+ * in multifd_nocomp_send_prepare() - where it is actually
+ * being sent.
+ */
+ total_size = iov_size(p->iov, p->iovs_num);
+
if (migrate_mapped_ram()) {
+ assert(!is_device_state);
+
ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
- p->pages->block, &local_err);
+ &p->data->u.ram, &local_err);
} else {
ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
- NULL, 0, p->write_flags,
+ NULL, 0,
+ p->write_flags & ~write_flags_masked,
&local_err);
}
@@ -964,29 +731,29 @@ static void *multifd_send_thread(void *opaque)
break;
}
- stat64_add(&mig_stats.multifd_bytes,
- p->next_packet_size + p->packet_len);
- stat64_add(&mig_stats.normal_pages, pages->normal_num);
- stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
+ stat64_add(&mig_stats.multifd_bytes, total_size);
- multifd_pages_reset(p->pages);
p->next_packet_size = 0;
+ multifd_send_data_clear(p->data);
/*
- * Making sure p->pages is published before saying "we're
+ * Making sure p->data is published before saying "we're
* free". Pairs with the smp_mb_acquire() in
- * multifd_send_pages().
+ * multifd_send().
*/
qatomic_store_release(&p->pending_job, false);
} else {
+ MultiFDSyncReq req = qatomic_read(&p->pending_sync);
+
/*
* If not a normal job, must be a sync request. Note that
* pending_sync is a standalone flag (unlike pending_job), so
* it doesn't require explicit memory barriers.
*/
- assert(qatomic_read(&p->pending_sync));
+ assert(req != MULTIFD_SYNC_NONE);
- if (use_packets) {
+ /* Only push the SYNC message if it involves a remote sync */
+ if (req == MULTIFD_SYNC_ALL) {
p->flags = MULTIFD_FLAG_SYNC;
multifd_send_fill_packet(p);
ret = qio_channel_write_all(p->c, (void *)p->packet,
@@ -996,10 +763,9 @@ static void *multifd_send_thread(void *opaque)
}
/* p->next_packet_size will always be zero for a SYNC packet */
stat64_add(&mig_stats.multifd_bytes, p->packet_len);
- p->flags = 0;
}
- qatomic_set(&p->pending_sync, false);
+ qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
qemu_sem_post(&p->sem_sync);
}
}
@@ -1015,8 +781,7 @@ out:
rcu_unregister_thread();
migration_threads_remove(thread);
- trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages,
- p->total_zero_pages);
+ trace_multifd_send_thread_end(p->id, p->packets_sent);
return NULL;
}
@@ -1069,7 +834,7 @@ static bool multifd_tls_channel_connect(MultiFDSendParams *p,
args->p = p;
p->tls_thread_created = true;
- qemu_thread_create(&p->tls_thread, "mig/src/tls",
+ qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
multifd_tls_handshake_thread, args,
QEMU_THREAD_JOINABLE);
return true;
@@ -1156,9 +921,8 @@ static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
bool multifd_send_setup(void)
{
MigrationState *s = migrate_get_current();
- Error *local_err = NULL;
int thread_count, ret = 0;
- uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+ uint32_t page_count = multifd_ram_page_count();
bool use_packets = multifd_use_packets();
uint8_t i;
@@ -1169,7 +933,7 @@ bool multifd_send_setup(void)
thread_count = migrate_multifd_channels();
multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
- multifd_send_state->pages = multifd_pages_init(page_count);
+ qemu_mutex_init(&multifd_send_state->multifd_send_mutex);
qemu_sem_init(&multifd_send_state->channels_created, 0);
qemu_sem_init(&multifd_send_state->channels_ready, 0);
qatomic_set(&multifd_send_state->exiting, 0);
@@ -1177,26 +941,27 @@ bool multifd_send_setup(void)
for (i = 0; i < thread_count; i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
+ Error *local_err = NULL;
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->sem_sync, 0);
p->id = i;
- p->pages = multifd_pages_init(page_count);
+ p->data = multifd_send_data_alloc();
if (use_packets) {
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
- p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
- p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+ p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state));
+ p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
+ p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION);
}
- p->name = g_strdup_printf("mig/src/send_%d", i);
- p->page_size = qemu_target_page_size();
- p->page_count = page_count;
+ p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
p->write_flags = 0;
if (!multifd_new_send_channel_create(p, &local_err)) {
- return false;
+ migrate_set_error(s, local_err);
+ ret = -1;
}
}
@@ -1209,24 +974,30 @@ bool multifd_send_setup(void)
qemu_sem_wait(&multifd_send_state->channels_created);
}
+ if (ret) {
+ goto err;
+ }
+
for (i = 0; i < thread_count; i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
+ Error *local_err = NULL;
ret = multifd_send_state->ops->send_setup(p, &local_err);
if (ret) {
- break;
+ migrate_set_error(s, local_err);
+ goto err;
}
+ assert(p->iov);
}
- if (ret) {
- migrate_set_error(s, local_err);
- error_report_err(local_err);
- migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
- MIGRATION_STATUS_FAILED);
- return false;
- }
+ multifd_device_state_send_setup();
return true;
+
+err:
+ migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
+ MIGRATION_STATUS_FAILED);
+ return false;
}
bool multifd_recv(void)
@@ -1353,11 +1124,14 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem_sync);
qemu_sem_destroy(&p->sem);
+ g_free(p->data);
+ p->data = NULL;
g_free(p->name);
p->name = NULL;
p->packet_len = 0;
g_free(p->packet);
p->packet = NULL;
+ g_clear_pointer(&p->packet_dev_state, g_free);
g_free(p->normal);
p->normal = NULL;
g_free(p->zero);
@@ -1459,8 +1233,37 @@ void multifd_recv_sync_main(void)
trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
}
+static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp)
+{
+ g_autofree char *dev_state_buf = NULL;
+ int ret;
+
+ dev_state_buf = g_malloc(p->next_packet_size);
+
+ ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp);
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1]
+ != 0) {
+ error_setg(errp, "unterminated multifd device state idstr");
+ return -1;
+ }
+
+ if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr,
+ p->packet_dev_state->instance_id,
+ dev_state_buf, p->next_packet_size,
+ errp)) {
+ ret = -1;
+ }
+
+ return ret;
+}
+
static void *multifd_recv_thread(void *opaque)
{
+ MigrationState *s = migrate_get_current();
MultiFDRecvParams *p = opaque;
Error *local_err = NULL;
bool use_packets = multifd_use_packets();
@@ -1469,19 +1272,65 @@ static void *multifd_recv_thread(void *opaque)
trace_multifd_recv_thread_start(p->id);
rcu_register_thread();
+ if (!s->multifd_clean_tls_termination) {
+ p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF;
+ }
+
while (true) {
+ MultiFDPacketHdr_t hdr;
uint32_t flags = 0;
+ bool is_device_state = false;
bool has_data = false;
+ uint8_t *pkt_buf;
+ size_t pkt_len;
+
p->normal_num = 0;
if (use_packets) {
+ struct iovec iov = {
+ .iov_base = (void *)&hdr,
+ .iov_len = sizeof(hdr)
+ };
+
if (multifd_recv_should_exit()) {
break;
}
- ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
- p->packet_len, &local_err);
- if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
+ ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL,
+ p->read_flags, &local_err);
+ if (!ret) {
+ /* EOF */
+ assert(!local_err);
+ break;
+ }
+
+ if (ret == -1) {
+ break;
+ }
+
+ ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
+ if (ret) {
+ break;
+ }
+
+ is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
+ if (is_device_state) {
+ pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
+ pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
+ } else {
+ pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
+ pkt_len = p->packet_len - sizeof(hdr);
+ }
+
+ ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
+ &local_err);
+ if (!ret) {
+ /* EOF */
+ error_setg(&local_err, "multifd: unexpected EOF after packet header");
+ break;
+ }
+
+ if (ret == -1) {
break;
}
@@ -1495,7 +1344,18 @@ static void *multifd_recv_thread(void *opaque)
flags = p->flags;
/* recv methods don't know how to handle the SYNC flag */
p->flags &= ~MULTIFD_FLAG_SYNC;
- has_data = p->normal_num || p->zero_num;
+
+ if (is_device_state) {
+ has_data = p->next_packet_size > 0;
+ } else {
+ /*
+ * Even if it's a SYNC packet, this needs to be set
+ * because older QEMUs (<9.0) still send data along with
+ * the SYNC packet.
+ */
+ has_data = p->normal_num || p->zero_num;
+ }
+
qemu_mutex_unlock(&p->mutex);
} else {
/*
@@ -1524,19 +1384,40 @@ static void *multifd_recv_thread(void *opaque)
}
if (has_data) {
- ret = multifd_recv_state->ops->recv(p, &local_err);
+ /*
+ * multifd thread should not be active and receive data
+ * when migration is in the Postcopy phase. Two threads
+ * writing the same memory area could easily corrupt
+ * the guest state.
+ */
+ assert(!migration_in_postcopy());
+ if (is_device_state) {
+ assert(use_packets);
+ ret = multifd_device_state_recv(p, &local_err);
+ } else {
+ ret = multifd_recv_state->ops->recv(p, &local_err);
+ }
if (ret != 0) {
break;
}
+ } else if (is_device_state) {
+ error_setg(&local_err,
+ "multifd: received empty device state packet");
+ break;
}
if (use_packets) {
if (flags & MULTIFD_FLAG_SYNC) {
+ if (is_device_state) {
+ error_setg(&local_err,
+ "multifd: received SYNC device state packet");
+ break;
+ }
+
qemu_sem_post(&multifd_recv_state->sem_sync);
qemu_sem_wait(&p->sem_sync);
}
} else {
- p->total_normal_pages += p->data->size / qemu_target_page_size();
p->data->size = 0;
/*
* Order data->size update before clearing
@@ -1553,9 +1434,7 @@ static void *multifd_recv_thread(void *opaque)
}
rcu_unregister_thread();
- trace_multifd_recv_thread_end(p->id, p->packets_recved,
- p->total_normal_pages,
- p->total_zero_pages);
+ trace_multifd_recv_thread_end(p->id, p->packets_recved);
return NULL;
}
@@ -1563,7 +1442,7 @@ static void *multifd_recv_thread(void *opaque)
int multifd_recv_setup(Error **errp)
{
int thread_count;
- uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+ uint32_t page_count = multifd_ram_page_count();
bool use_packets = multifd_use_packets();
uint8_t i;
@@ -1603,12 +1482,11 @@ int multifd_recv_setup(Error **errp)
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
+ p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
}
- p->name = g_strdup_printf("mig/dst/recv_%d", i);
+ p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
p->normal = g_new0(ram_addr_t, page_count);
p->zero = g_new0(ram_addr_t, page_count);
- p->page_count = page_count;
- p->page_size = qemu_target_page_size();
}
for (i = 0; i < thread_count; i++) {
@@ -1681,17 +1559,3 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
QEMU_THREAD_JOINABLE);
qatomic_inc(&multifd_recv_state->count);
}
-
-bool multifd_send_prepare_common(MultiFDSendParams *p)
-{
- multifd_send_zero_page_detect(p);
-
- if (!p->pages->normal_num) {
- p->next_packet_size = 0;
- return false;
- }
-
- multifd_send_prepare_header(p);
-
- return true;
-}