diff options
Diffstat (limited to 'migration/multifd.c')
-rw-r--r-- | migration/multifd.c | 824 |
1 files changed, 344 insertions, 480 deletions
diff --git a/migration/multifd.c b/migration/multifd.c index 0b4cbad..b255778 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -12,15 +12,18 @@ #include "qemu/osdep.h" #include "qemu/cutils.h" +#include "qemu/iov.h" #include "qemu/rcu.h" #include "exec/target_page.h" -#include "sysemu/sysemu.h" -#include "exec/ramblock.h" +#include "system/system.h" +#include "system/ramblock.h" #include "qemu/error-report.h" #include "qapi/error.h" #include "file.h" +#include "migration/misc.h" #include "migration.h" #include "migration-stats.h" +#include "savevm.h" #include "socket.h" #include "tls.h" #include "qemu-file.h" @@ -33,11 +36,6 @@ #include "io/channel-socket.h" #include "yank_functions.h" -/* Multiple fd's */ - -#define MULTIFD_MAGIC 0x11223344U -#define MULTIFD_VERSION 1 - typedef struct { uint32_t magic; uint32_t version; @@ -49,8 +47,10 @@ typedef struct { struct { MultiFDSendParams *params; - /* array of pages to sent */ - MultiFDPages_t *pages; + + /* multifd_send() body is not thread safe, needs serialization */ + QemuMutex multifd_send_mutex; + /* * Global number of generated multifd packets. * @@ -78,7 +78,7 @@ struct { */ int exiting; /* multifd ops */ - MultiFDMethods *ops; + const MultiFDMethods *ops; } *multifd_send_state; struct { @@ -95,236 +95,70 @@ struct { uint64_t packet_num; int exiting; /* multifd ops */ - MultiFDMethods *ops; + const MultiFDMethods *ops; } *multifd_recv_state; -static bool multifd_use_packets(void) -{ - return !migrate_mapped_ram(); -} - -void multifd_send_channel_created(void) -{ - qemu_sem_post(&multifd_send_state->channels_created); -} - -static void multifd_set_file_bitmap(MultiFDSendParams *p) +MultiFDSendData *multifd_send_data_alloc(void) { - MultiFDPages_t *pages = p->pages; + MultiFDSendData *new = g_new0(MultiFDSendData, 1); - assert(pages->block); + multifd_ram_payload_alloc(&new->u.ram); + /* Device state allocates its payload on-demand */ - for (int i = 0; i < p->pages->normal_num; i++) { - ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true); - } - - for (int i = p->pages->normal_num; i < p->pages->num; i++) { - ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false); - } + return new; } -/* Multifd without compression */ - -/** - * nocomp_send_setup: setup send side - * - * @p: Params for the channel that we are using - * @errp: pointer to an error - */ -static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) +void multifd_send_data_clear(MultiFDSendData *data) { - if (migrate_zero_copy_send()) { - p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; + if (multifd_payload_empty(data)) { + return; } - if (multifd_use_packets()) { - /* We need one extra place for the packet header */ - p->iov = g_new0(struct iovec, p->page_count + 1); - } else { - p->iov = g_new0(struct iovec, p->page_count); + switch (data->type) { + case MULTIFD_PAYLOAD_DEVICE_STATE: + multifd_send_data_clear_device_state(&data->u.device_state); + break; + default: + /* Nothing to do */ + break; } - return 0; -} - -/** - * nocomp_send_cleanup: cleanup send side - * - * For no compression this function does nothing. - * - * @p: Params for the channel that we are using - * @errp: pointer to an error - */ -static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) -{ - g_free(p->iov); - p->iov = NULL; - return; + data->type = MULTIFD_PAYLOAD_NONE; } -static void multifd_send_prepare_iovs(MultiFDSendParams *p) +void multifd_send_data_free(MultiFDSendData *data) { - MultiFDPages_t *pages = p->pages; - - for (int i = 0; i < pages->normal_num; i++) { - p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; - p->iov[p->iovs_num].iov_len = p->page_size; - p->iovs_num++; - } - - p->next_packet_size = pages->normal_num * p->page_size; -} - -/** - * nocomp_send_prepare: prepare date to be able to send - * - * For no compression we just have to calculate the size of the - * packet. - * - * Returns 0 for success or -1 for error - * - * @p: Params for the channel that we are using - * @errp: pointer to an error - */ -static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) -{ - bool use_zero_copy_send = migrate_zero_copy_send(); - int ret; - - multifd_send_zero_page_detect(p); - - if (!multifd_use_packets()) { - multifd_send_prepare_iovs(p); - multifd_set_file_bitmap(p); - - return 0; - } - - if (!use_zero_copy_send) { - /* - * Only !zerocopy needs the header in IOV; zerocopy will - * send it separately. - */ - multifd_send_prepare_header(p); + if (!data) { + return; } - multifd_send_prepare_iovs(p); - p->flags |= MULTIFD_FLAG_NOCOMP; + /* This also free's device state payload */ + multifd_send_data_clear(data); - multifd_send_fill_packet(p); - - if (use_zero_copy_send) { - /* Send header first, without zerocopy */ - ret = qio_channel_write_all(p->c, (void *)p->packet, - p->packet_len, errp); - if (ret != 0) { - return -1; - } - } + multifd_ram_payload_free(&data->u.ram); - return 0; -} - -/** - * nocomp_recv_setup: setup receive side - * - * For no compression this function does nothing. - * - * Returns 0 for success or -1 for error - * - * @p: Params for the channel that we are using - * @errp: pointer to an error - */ -static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) -{ - p->iov = g_new0(struct iovec, p->page_count); - return 0; + g_free(data); } -/** - * nocomp_recv_cleanup: setup receive side - * - * For no compression this function does nothing. - * - * @p: Params for the channel that we are using - */ -static void nocomp_recv_cleanup(MultiFDRecvParams *p) +static bool multifd_use_packets(void) { - g_free(p->iov); - p->iov = NULL; + return !migrate_mapped_ram(); } -/** - * nocomp_recv: read the data from the channel - * - * For no compression we just need to read things into the correct place. - * - * Returns 0 for success or -1 for error - * - * @p: Params for the channel that we are using - * @errp: pointer to an error - */ -static int nocomp_recv(MultiFDRecvParams *p, Error **errp) +void multifd_send_channel_created(void) { - uint32_t flags; - - if (!multifd_use_packets()) { - return multifd_file_recv_data(p, errp); - } - - flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; - - if (flags != MULTIFD_FLAG_NOCOMP) { - error_setg(errp, "multifd %u: flags received %x flags expected %x", - p->id, flags, MULTIFD_FLAG_NOCOMP); - return -1; - } - - multifd_recv_zero_page_process(p); - - if (!p->normal_num) { - return 0; - } - - for (int i = 0; i < p->normal_num; i++) { - p->iov[i].iov_base = p->host + p->normal[i]; - p->iov[i].iov_len = p->page_size; - ramblock_recv_bitmap_set_offset(p->block, p->normal[i]); - } - return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); + qemu_sem_post(&multifd_send_state->channels_created); } -static MultiFDMethods multifd_nocomp_ops = { - .send_setup = nocomp_send_setup, - .send_cleanup = nocomp_send_cleanup, - .send_prepare = nocomp_send_prepare, - .recv_setup = nocomp_recv_setup, - .recv_cleanup = nocomp_recv_cleanup, - .recv = nocomp_recv -}; +static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {}; -static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { - [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, -}; - -void multifd_register_ops(int method, MultiFDMethods *ops) +void multifd_register_ops(int method, const MultiFDMethods *ops) { - assert(0 < method && method < MULTIFD_COMPRESSION__MAX); + assert(0 <= method && method < MULTIFD_COMPRESSION__MAX); + assert(!multifd_ops[method]); multifd_ops[method] = ops; } -/* Reset a MultiFDPages_t* object for the next use */ -static void multifd_pages_reset(MultiFDPages_t *pages) -{ - /* - * We don't need to touch offset[] array, because it will be - * overwritten later when reused. - */ - pages->num = 0; - pages->normal_num = 0; - pages->block = NULL; -} - static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) { MultiFDInit_t msg = {}; @@ -389,160 +223,95 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) return msg.id; } -static MultiFDPages_t *multifd_pages_init(uint32_t n) -{ - MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); - - pages->allocated = n; - pages->offset = g_new0(ram_addr_t, n); - - return pages; -} - -static void multifd_pages_clear(MultiFDPages_t *pages) -{ - multifd_pages_reset(pages); - pages->allocated = 0; - g_free(pages->offset); - pages->offset = NULL; - g_free(pages); -} - +/* Fills a RAM multifd packet */ void multifd_send_fill_packet(MultiFDSendParams *p) { MultiFDPacket_t *packet = p->packet; - MultiFDPages_t *pages = p->pages; uint64_t packet_num; - uint32_t zero_num = pages->num - pages->normal_num; - int i; + bool sync_packet = p->flags & MULTIFD_FLAG_SYNC; + + memset(packet, 0, p->packet_len); - packet->flags = cpu_to_be32(p->flags); - packet->pages_alloc = cpu_to_be32(p->pages->allocated); - packet->normal_pages = cpu_to_be32(pages->normal_num); - packet->zero_pages = cpu_to_be32(zero_num); + packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); + packet->hdr.version = cpu_to_be32(MULTIFD_VERSION); + + packet->hdr.flags = cpu_to_be32(p->flags); packet->next_packet_size = cpu_to_be32(p->next_packet_size); packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); packet->packet_num = cpu_to_be64(packet_num); - if (pages->block) { - strncpy(packet->ramblock, pages->block->idstr, 256); - } - - for (i = 0; i < pages->num; i++) { - /* there are architectures where ram_addr_t is 32 bit */ - uint64_t temp = pages->offset[i]; + p->packets_sent++; - packet->offset[i] = cpu_to_be64(temp); + if (!sync_packet) { + multifd_ram_fill_packet(p); } - p->packets_sent++; - p->total_normal_pages += pages->normal_num; - p->total_zero_pages += zero_num; - - trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num, - p->flags, p->next_packet_size); + trace_multifd_send_fill(p->id, packet_num, + p->flags, p->next_packet_size); } -static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) +static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p, + const MultiFDPacketHdr_t *hdr, + Error **errp) { - MultiFDPacket_t *packet = p->packet; - int i; + uint32_t magic = be32_to_cpu(hdr->magic); + uint32_t version = be32_to_cpu(hdr->version); - packet->magic = be32_to_cpu(packet->magic); - if (packet->magic != MULTIFD_MAGIC) { - error_setg(errp, "multifd: received packet " - "magic %x and expected magic %x", - packet->magic, MULTIFD_MAGIC); + if (magic != MULTIFD_MAGIC) { + error_setg(errp, "multifd: received packet magic %x, expected %x", + magic, MULTIFD_MAGIC); return -1; } - packet->version = be32_to_cpu(packet->version); - if (packet->version != MULTIFD_VERSION) { - error_setg(errp, "multifd: received packet " - "version %u and expected version %u", - packet->version, MULTIFD_VERSION); + if (version != MULTIFD_VERSION) { + error_setg(errp, "multifd: received packet version %u, expected %u", + version, MULTIFD_VERSION); return -1; } - p->flags = be32_to_cpu(packet->flags); - - packet->pages_alloc = be32_to_cpu(packet->pages_alloc); - /* - * If we received a packet that is 100 times bigger than expected - * just stop migration. It is a magic number. - */ - if (packet->pages_alloc > p->page_count) { - error_setg(errp, "multifd: received packet " - "with size %u and expected a size of %u", - packet->pages_alloc, p->page_count) ; - return -1; - } + p->flags = be32_to_cpu(hdr->flags); - p->normal_num = be32_to_cpu(packet->normal_pages); - if (p->normal_num > packet->pages_alloc) { - error_setg(errp, "multifd: received packet " - "with %u normal pages and expected maximum pages are %u", - p->normal_num, packet->pages_alloc) ; - return -1; - } + return 0; +} - p->zero_num = be32_to_cpu(packet->zero_pages); - if (p->zero_num > packet->pages_alloc - p->normal_num) { - error_setg(errp, "multifd: received packet " - "with %u zero pages and expected maximum zero pages are %u", - p->zero_num, packet->pages_alloc - p->normal_num) ; - return -1; - } +static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p, + Error **errp) +{ + MultiFDPacketDeviceState_t *packet = p->packet_dev_state; + packet->instance_id = be32_to_cpu(packet->instance_id); p->next_packet_size = be32_to_cpu(packet->next_packet_size); - p->packet_num = be64_to_cpu(packet->packet_num); - p->packets_recved++; - p->total_normal_pages += p->normal_num; - p->total_zero_pages += p->zero_num; - trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num, - p->flags, p->next_packet_size); + return 0; +} - if (p->normal_num == 0 && p->zero_num == 0) { - return 0; - } +static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp) +{ + const MultiFDPacket_t *packet = p->packet; + int ret = 0; - /* make sure that ramblock is 0 terminated */ - packet->ramblock[255] = 0; - p->block = qemu_ram_block_by_name(packet->ramblock); - if (!p->block) { - error_setg(errp, "multifd: unknown ram block %s", - packet->ramblock); - return -1; - } + p->next_packet_size = be32_to_cpu(packet->next_packet_size); + p->packet_num = be64_to_cpu(packet->packet_num); - p->host = p->block->host; - for (i = 0; i < p->normal_num; i++) { - uint64_t offset = be64_to_cpu(packet->offset[i]); + /* Always unfill, old QEMUs (<9.0) send data along with SYNC */ + ret = multifd_ram_unfill_packet(p, errp); - if (offset > (p->block->used_length - p->page_size)) { - error_setg(errp, "multifd: offset too long %" PRIu64 - " (max " RAM_ADDR_FMT ")", - offset, p->block->used_length); - return -1; - } - p->normal[i] = offset; - } + trace_multifd_recv_unfill(p->id, p->packet_num, p->flags, + p->next_packet_size); - for (i = 0; i < p->zero_num; i++) { - uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]); + return ret; +} - if (offset > (p->block->used_length - p->page_size)) { - error_setg(errp, "multifd: offset too long %" PRIu64 - " (max " RAM_ADDR_FMT ")", - offset, p->block->used_length); - return -1; - } - p->zero[i] = offset; +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) +{ + p->packets_recved++; + + if (p->flags & MULTIFD_FLAG_DEVICE_STATE) { + return multifd_recv_unfill_packet_device_state(p, errp); } - return 0; + return multifd_recv_unfill_packet_ram(p, errp); } static bool multifd_send_should_exit(void) @@ -568,35 +337,32 @@ static void multifd_send_kick_main(MultiFDSendParams *p) } /* - * How we use multifd_send_state->pages and channel->pages? + * multifd_send() works by exchanging the MultiFDSendData object + * provided by the caller with an unused MultiFDSendData object from + * the next channel that is found to be idle. * - * We create a pages for each channel, and a main one. Each time that - * we need to send a batch of pages we interchange the ones between - * multifd_send_state and the channel that is sending it. There are - * two reasons for that: - * - to not have to do so many mallocs during migration - * - to make easier to know what to free at the end of migration + * The channel owns the data until it finishes transmitting and the + * caller owns the empty object until it fills it with data and calls + * this function again. No locking necessary. * - * This way we always know who is the owner of each "pages" struct, - * and we don't need any locking. It belongs to the migration thread - * or to the channel thread. Switching is safe because the migration - * thread is using the channel mutex when changing it, and the channel - * have to had finish with its own, otherwise pending_job can't be - * false. + * Switching is safe because both the migration thread and the channel + * thread have barriers in place to serialize access. * * Returns true if succeed, false otherwise. */ -static bool multifd_send_pages(void) +bool multifd_send(MultiFDSendData **send_data) { int i; static int next_channel; MultiFDSendParams *p = NULL; /* make happy gcc */ - MultiFDPages_t *pages = multifd_send_state->pages; + MultiFDSendData *tmp; if (multifd_send_should_exit()) { return false; } + QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex); + /* We wait here, until at least one channel is ready */ qemu_sem_wait(&multifd_send_state->channels_ready); @@ -626,66 +392,24 @@ static bool multifd_send_pages(void) * qatomic_store_release() in multifd_send_thread(). */ smp_mb_acquire(); - assert(!p->pages->num); - multifd_send_state->pages = p->pages; - p->pages = pages; - /* - * Making sure p->pages is setup before marking pending_job=true. Pairs - * with the qatomic_load_acquire() in multifd_send_thread(). - */ - qatomic_store_release(&p->pending_job, true); - qemu_sem_post(&p->sem); - - return true; -} - -static inline bool multifd_queue_empty(MultiFDPages_t *pages) -{ - return pages->num == 0; -} -static inline bool multifd_queue_full(MultiFDPages_t *pages) -{ - return pages->num == pages->allocated; -} - -static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) -{ - pages->offset[pages->num++] = offset; -} + assert(multifd_payload_empty(p->data)); -/* Returns true if enqueue successful, false otherwise */ -bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) -{ - MultiFDPages_t *pages; - -retry: - pages = multifd_send_state->pages; - - /* If the queue is empty, we can already enqueue now */ - if (multifd_queue_empty(pages)) { - pages->block = block; - multifd_enqueue(pages, offset); - return true; - } + /* + * Swap the pointers. The channel gets the client data for + * transferring and the client gets back an unused data slot. + */ + tmp = *send_data; + *send_data = p->data; + p->data = tmp; /* - * Not empty, meanwhile we need a flush. It can because of either: - * - * (1) The page is not on the same ramblock of previous ones, or, - * (2) The queue is full. - * - * After flush, always retry. + * Making sure p->data is setup before marking pending_job=true. Pairs + * with the qatomic_load_acquire() in multifd_send_thread(). */ - if (pages->block != block || multifd_queue_full(pages)) { - if (!multifd_send_pages()) { - return false; - } - goto retry; - } + qatomic_store_release(&p->pending_job, true); + qemu_sem_post(&p->sem); - /* Not empty, and we still have space, do it! */ - multifd_enqueue(pages, offset); return true; } @@ -775,7 +499,7 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) * channels have no I/O handler callback registered when reaching * here, because migration thread will wait for all multifd channel * establishments to complete during setup. Since - * migrate_fd_cleanup() will be scheduled in main thread too, all + * migration_cleanup() will be scheduled in main thread too, all * previous callbacks should guarantee to be completed when * reaching here. See multifd_send_state.channels_created and its * usage. In the future, we could replace this with an assert @@ -790,12 +514,13 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) qemu_sem_destroy(&p->sem_sync); g_free(p->name); p->name = NULL; - multifd_pages_clear(p->pages); - p->pages = NULL; + g_clear_pointer(&p->data, multifd_send_data_free); p->packet_len = 0; + g_clear_pointer(&p->packet_device_state, g_free); g_free(p->packet); p->packet = NULL; multifd_send_state->ops->send_cleanup(p, errp); + assert(!p->iov); return *errp == NULL; } @@ -804,12 +529,12 @@ static void multifd_send_cleanup_state(void) { file_cleanup_outgoing_migration(); socket_cleanup_outgoing_migration(); + multifd_device_state_send_cleanup(); qemu_sem_destroy(&multifd_send_state->channels_created); qemu_sem_destroy(&multifd_send_state->channels_ready); + qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex); g_free(multifd_send_state->params); multifd_send_state->params = NULL; - multifd_pages_clear(multifd_send_state->pages); - multifd_send_state->pages = NULL; g_free(multifd_send_state); multifd_send_state = NULL; } @@ -822,6 +547,36 @@ void multifd_send_shutdown(void) return; } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + /* thread_created implies the TLS handshake has succeeded */ + if (p->tls_thread_created && p->thread_created) { + Error *local_err = NULL; + /* + * The destination expects the TLS session to always be + * properly terminated. This helps to detect a premature + * termination in the middle of the stream. Note that + * older QEMUs always break the connection on the source + * and the destination always sees + * GNUTLS_E_PREMATURE_TERMINATION. + */ + migration_tls_channel_end(p->c, &local_err); + + /* + * The above can return an error in case the migration has + * already failed. If the migration succeeded, errors are + * not expected but there's no need to kill the source. + */ + if (local_err && !migration_has_failed(migrate_get_current())) { + warn_report( + "multifd_send_%d: Failed to terminate TLS connection: %s", + p->id, error_get_pretty(local_err)); + break; + } + } + } + multifd_send_terminate_threads(); for (i = 0; i < migrate_multifd_channels(); i++) { @@ -854,20 +609,12 @@ static int multifd_zero_copy_flush(QIOChannel *c) return ret; } -int multifd_send_sync_main(void) +int multifd_send_sync_main(MultiFDSyncReq req) { int i; bool flush_zero_copy; - if (!migrate_multifd()) { - return 0; - } - if (multifd_send_state->pages->num) { - if (!multifd_send_pages()) { - error_report("%s: multifd_send_pages fail", __func__); - return -1; - } - } + assert(req != MULTIFD_SYNC_NONE); flush_zero_copy = migrate_zero_copy_send(); @@ -884,8 +631,8 @@ int multifd_send_sync_main(void) * We should be the only user so far, so not possible to be set by * others concurrently. */ - assert(qatomic_read(&p->pending_sync) == false); - qatomic_set(&p->pending_sync, true); + assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE); + qatomic_set(&p->pending_sync, req); qemu_sem_post(&p->sem); } for (i = 0; i < migrate_multifd_channels(); i++) { @@ -937,26 +684,46 @@ static void *multifd_send_thread(void *opaque) } /* - * Read pending_job flag before p->pages. Pairs with the - * qatomic_store_release() in multifd_send_pages(). + * Read pending_job flag before p->data. Pairs with the + * qatomic_store_release() in multifd_send(). */ if (qatomic_load_acquire(&p->pending_job)) { - MultiFDPages_t *pages = p->pages; + bool is_device_state = multifd_payload_device_state(p->data); + size_t total_size; + int write_flags_masked = 0; + p->flags = 0; p->iovs_num = 0; - assert(pages->num); + assert(!multifd_payload_empty(p->data)); - ret = multifd_send_state->ops->send_prepare(p, &local_err); - if (ret != 0) { - break; + if (is_device_state) { + multifd_device_state_send_prepare(p); + + /* Device state packets cannot be sent via zerocopy */ + write_flags_masked |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; + } else { + ret = multifd_send_state->ops->send_prepare(p, &local_err); + if (ret != 0) { + break; + } } + /* + * The packet header in the zerocopy RAM case is accounted for + * in multifd_nocomp_send_prepare() - where it is actually + * being sent. + */ + total_size = iov_size(p->iov, p->iovs_num); + if (migrate_mapped_ram()) { + assert(!is_device_state); + ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, - p->pages->block, &local_err); + &p->data->u.ram, &local_err); } else { ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, - NULL, 0, p->write_flags, + NULL, 0, + p->write_flags & ~write_flags_masked, &local_err); } @@ -964,29 +731,29 @@ static void *multifd_send_thread(void *opaque) break; } - stat64_add(&mig_stats.multifd_bytes, - p->next_packet_size + p->packet_len); - stat64_add(&mig_stats.normal_pages, pages->normal_num); - stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); + stat64_add(&mig_stats.multifd_bytes, total_size); - multifd_pages_reset(p->pages); p->next_packet_size = 0; + multifd_send_data_clear(p->data); /* - * Making sure p->pages is published before saying "we're + * Making sure p->data is published before saying "we're * free". Pairs with the smp_mb_acquire() in - * multifd_send_pages(). + * multifd_send(). */ qatomic_store_release(&p->pending_job, false); } else { + MultiFDSyncReq req = qatomic_read(&p->pending_sync); + /* * If not a normal job, must be a sync request. Note that * pending_sync is a standalone flag (unlike pending_job), so * it doesn't require explicit memory barriers. */ - assert(qatomic_read(&p->pending_sync)); + assert(req != MULTIFD_SYNC_NONE); - if (use_packets) { + /* Only push the SYNC message if it involves a remote sync */ + if (req == MULTIFD_SYNC_ALL) { p->flags = MULTIFD_FLAG_SYNC; multifd_send_fill_packet(p); ret = qio_channel_write_all(p->c, (void *)p->packet, @@ -996,10 +763,9 @@ static void *multifd_send_thread(void *opaque) } /* p->next_packet_size will always be zero for a SYNC packet */ stat64_add(&mig_stats.multifd_bytes, p->packet_len); - p->flags = 0; } - qatomic_set(&p->pending_sync, false); + qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE); qemu_sem_post(&p->sem_sync); } } @@ -1015,8 +781,7 @@ out: rcu_unregister_thread(); migration_threads_remove(thread); - trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages, - p->total_zero_pages); + trace_multifd_send_thread_end(p->id, p->packets_sent); return NULL; } @@ -1069,7 +834,7 @@ static bool multifd_tls_channel_connect(MultiFDSendParams *p, args->p = p; p->tls_thread_created = true; - qemu_thread_create(&p->tls_thread, "mig/src/tls", + qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS, multifd_tls_handshake_thread, args, QEMU_THREAD_JOINABLE); return true; @@ -1156,9 +921,8 @@ static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) bool multifd_send_setup(void) { MigrationState *s = migrate_get_current(); - Error *local_err = NULL; int thread_count, ret = 0; - uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + uint32_t page_count = multifd_ram_page_count(); bool use_packets = multifd_use_packets(); uint8_t i; @@ -1169,7 +933,7 @@ bool multifd_send_setup(void) thread_count = migrate_multifd_channels(); multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); - multifd_send_state->pages = multifd_pages_init(page_count); + qemu_mutex_init(&multifd_send_state->multifd_send_mutex); qemu_sem_init(&multifd_send_state->channels_created, 0); qemu_sem_init(&multifd_send_state->channels_ready, 0); qatomic_set(&multifd_send_state->exiting, 0); @@ -1177,26 +941,27 @@ bool multifd_send_setup(void) for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; + Error *local_err = NULL; qemu_sem_init(&p->sem, 0); qemu_sem_init(&p->sem_sync, 0); p->id = i; - p->pages = multifd_pages_init(page_count); + p->data = multifd_send_data_alloc(); if (use_packets) { p->packet_len = sizeof(MultiFDPacket_t) + sizeof(uint64_t) * page_count; p->packet = g_malloc0(p->packet_len); - p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); - p->packet->version = cpu_to_be32(MULTIFD_VERSION); + p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state)); + p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); + p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION); } - p->name = g_strdup_printf("mig/src/send_%d", i); - p->page_size = qemu_target_page_size(); - p->page_count = page_count; + p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i); p->write_flags = 0; if (!multifd_new_send_channel_create(p, &local_err)) { - return false; + migrate_set_error(s, local_err); + ret = -1; } } @@ -1209,24 +974,30 @@ bool multifd_send_setup(void) qemu_sem_wait(&multifd_send_state->channels_created); } + if (ret) { + goto err; + } + for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; + Error *local_err = NULL; ret = multifd_send_state->ops->send_setup(p, &local_err); if (ret) { - break; + migrate_set_error(s, local_err); + goto err; } + assert(p->iov); } - if (ret) { - migrate_set_error(s, local_err); - error_report_err(local_err); - migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, - MIGRATION_STATUS_FAILED); - return false; - } + multifd_device_state_send_setup(); return true; + +err: + migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, + MIGRATION_STATUS_FAILED); + return false; } bool multifd_recv(void) @@ -1353,11 +1124,14 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem_sync); qemu_sem_destroy(&p->sem); + g_free(p->data); + p->data = NULL; g_free(p->name); p->name = NULL; p->packet_len = 0; g_free(p->packet); p->packet = NULL; + g_clear_pointer(&p->packet_dev_state, g_free); g_free(p->normal); p->normal = NULL; g_free(p->zero); @@ -1459,8 +1233,37 @@ void multifd_recv_sync_main(void) trace_multifd_recv_sync_main(multifd_recv_state->packet_num); } +static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp) +{ + g_autofree char *dev_state_buf = NULL; + int ret; + + dev_state_buf = g_malloc(p->next_packet_size); + + ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp); + if (ret != 0) { + return ret; + } + + if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1] + != 0) { + error_setg(errp, "unterminated multifd device state idstr"); + return -1; + } + + if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr, + p->packet_dev_state->instance_id, + dev_state_buf, p->next_packet_size, + errp)) { + ret = -1; + } + + return ret; +} + static void *multifd_recv_thread(void *opaque) { + MigrationState *s = migrate_get_current(); MultiFDRecvParams *p = opaque; Error *local_err = NULL; bool use_packets = multifd_use_packets(); @@ -1469,19 +1272,65 @@ static void *multifd_recv_thread(void *opaque) trace_multifd_recv_thread_start(p->id); rcu_register_thread(); + if (!s->multifd_clean_tls_termination) { + p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF; + } + while (true) { + MultiFDPacketHdr_t hdr; uint32_t flags = 0; + bool is_device_state = false; bool has_data = false; + uint8_t *pkt_buf; + size_t pkt_len; + p->normal_num = 0; if (use_packets) { + struct iovec iov = { + .iov_base = (void *)&hdr, + .iov_len = sizeof(hdr) + }; + if (multifd_recv_should_exit()) { break; } - ret = qio_channel_read_all_eof(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ + ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL, + p->read_flags, &local_err); + if (!ret) { + /* EOF */ + assert(!local_err); + break; + } + + if (ret == -1) { + break; + } + + ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err); + if (ret) { + break; + } + + is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE; + if (is_device_state) { + pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr); + pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr); + } else { + pkt_buf = (uint8_t *)p->packet + sizeof(hdr); + pkt_len = p->packet_len - sizeof(hdr); + } + + ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len, + &local_err); + if (!ret) { + /* EOF */ + error_setg(&local_err, "multifd: unexpected EOF after packet header"); + break; + } + + if (ret == -1) { break; } @@ -1495,7 +1344,18 @@ static void *multifd_recv_thread(void *opaque) flags = p->flags; /* recv methods don't know how to handle the SYNC flag */ p->flags &= ~MULTIFD_FLAG_SYNC; - has_data = p->normal_num || p->zero_num; + + if (is_device_state) { + has_data = p->next_packet_size > 0; + } else { + /* + * Even if it's a SYNC packet, this needs to be set + * because older QEMUs (<9.0) still send data along with + * the SYNC packet. + */ + has_data = p->normal_num || p->zero_num; + } + qemu_mutex_unlock(&p->mutex); } else { /* @@ -1524,19 +1384,40 @@ static void *multifd_recv_thread(void *opaque) } if (has_data) { - ret = multifd_recv_state->ops->recv(p, &local_err); + /* + * multifd thread should not be active and receive data + * when migration is in the Postcopy phase. Two threads + * writing the same memory area could easily corrupt + * the guest state. + */ + assert(!migration_in_postcopy()); + if (is_device_state) { + assert(use_packets); + ret = multifd_device_state_recv(p, &local_err); + } else { + ret = multifd_recv_state->ops->recv(p, &local_err); + } if (ret != 0) { break; } + } else if (is_device_state) { + error_setg(&local_err, + "multifd: received empty device state packet"); + break; } if (use_packets) { if (flags & MULTIFD_FLAG_SYNC) { + if (is_device_state) { + error_setg(&local_err, + "multifd: received SYNC device state packet"); + break; + } + qemu_sem_post(&multifd_recv_state->sem_sync); qemu_sem_wait(&p->sem_sync); } } else { - p->total_normal_pages += p->data->size / qemu_target_page_size(); p->data->size = 0; /* * Order data->size update before clearing @@ -1553,9 +1434,7 @@ static void *multifd_recv_thread(void *opaque) } rcu_unregister_thread(); - trace_multifd_recv_thread_end(p->id, p->packets_recved, - p->total_normal_pages, - p->total_zero_pages); + trace_multifd_recv_thread_end(p->id, p->packets_recved); return NULL; } @@ -1563,7 +1442,7 @@ static void *multifd_recv_thread(void *opaque) int multifd_recv_setup(Error **errp) { int thread_count; - uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + uint32_t page_count = multifd_ram_page_count(); bool use_packets = multifd_use_packets(); uint8_t i; @@ -1603,12 +1482,11 @@ int multifd_recv_setup(Error **errp) p->packet_len = sizeof(MultiFDPacket_t) + sizeof(uint64_t) * page_count; p->packet = g_malloc0(p->packet_len); + p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state)); } - p->name = g_strdup_printf("mig/dst/recv_%d", i); + p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i); p->normal = g_new0(ram_addr_t, page_count); p->zero = g_new0(ram_addr_t, page_count); - p->page_count = page_count; - p->page_size = qemu_target_page_size(); } for (i = 0; i < thread_count; i++) { @@ -1681,17 +1559,3 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) QEMU_THREAD_JOINABLE); qatomic_inc(&multifd_recv_state->count); } - -bool multifd_send_prepare_common(MultiFDSendParams *p) -{ - multifd_send_zero_page_detect(p); - - if (!p->pages->normal_num) { - p->next_packet_size = 0; - return false; - } - - multifd_send_prepare_header(p); - - return true; -} |