aboutsummaryrefslogtreecommitdiff
path: root/migration/multifd.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/multifd.c')
-rw-r--r--migration/multifd.c345
1 files changed, 285 insertions, 60 deletions
diff --git a/migration/multifd.c b/migration/multifd.c
index 9b200f4..b255778 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -12,15 +12,18 @@
#include "qemu/osdep.h"
#include "qemu/cutils.h"
+#include "qemu/iov.h"
#include "qemu/rcu.h"
#include "exec/target_page.h"
-#include "sysemu/sysemu.h"
-#include "exec/ramblock.h"
+#include "system/system.h"
+#include "system/ramblock.h"
#include "qemu/error-report.h"
#include "qapi/error.h"
#include "file.h"
+#include "migration/misc.h"
#include "migration.h"
#include "migration-stats.h"
+#include "savevm.h"
#include "socket.h"
#include "tls.h"
#include "qemu-file.h"
@@ -33,11 +36,6 @@
#include "io/channel-socket.h"
#include "yank_functions.h"
-/* Multiple fd's */
-
-#define MULTIFD_MAGIC 0x11223344U
-#define MULTIFD_VERSION 1
-
typedef struct {
uint32_t magic;
uint32_t version;
@@ -49,6 +47,10 @@ typedef struct {
struct {
MultiFDSendParams *params;
+
+ /* multifd_send() body is not thread safe, needs serialization */
+ QemuMutex multifd_send_mutex;
+
/*
* Global number of generated multifd packets.
*
@@ -98,24 +100,44 @@ struct {
MultiFDSendData *multifd_send_data_alloc(void)
{
- size_t max_payload_size, size_minus_payload;
+ MultiFDSendData *new = g_new0(MultiFDSendData, 1);
- /*
- * MultiFDPages_t has a flexible array at the end, account for it
- * when allocating MultiFDSendData. Use max() in case other types
- * added to the union in the future are larger than
- * (MultiFDPages_t + flex array).
- */
- max_payload_size = MAX(multifd_ram_payload_size(), sizeof(MultiFDPayload));
+ multifd_ram_payload_alloc(&new->u.ram);
+ /* Device state allocates its payload on-demand */
- /*
- * Account for any holes the compiler might insert. We can't pack
- * the structure because that misaligns the members and triggers
- * Waddress-of-packed-member.
- */
- size_minus_payload = sizeof(MultiFDSendData) - sizeof(MultiFDPayload);
+ return new;
+}
- return g_malloc0(size_minus_payload + max_payload_size);
+void multifd_send_data_clear(MultiFDSendData *data)
+{
+ if (multifd_payload_empty(data)) {
+ return;
+ }
+
+ switch (data->type) {
+ case MULTIFD_PAYLOAD_DEVICE_STATE:
+ multifd_send_data_clear_device_state(&data->u.device_state);
+ break;
+ default:
+ /* Nothing to do */
+ break;
+ }
+
+ data->type = MULTIFD_PAYLOAD_NONE;
+}
+
+void multifd_send_data_free(MultiFDSendData *data)
+{
+ if (!data) {
+ return;
+ }
+
+ /* This also free's device state payload */
+ multifd_send_data_clear(data);
+
+ multifd_ram_payload_free(&data->u.ram);
+
+ g_free(data);
}
static bool multifd_use_packets(void)
@@ -201,6 +223,7 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
return msg.id;
}
+/* Fills a RAM multifd packet */
void multifd_send_fill_packet(MultiFDSendParams *p)
{
MultiFDPacket_t *packet = p->packet;
@@ -209,10 +232,10 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
memset(packet, 0, p->packet_len);
- packet->magic = cpu_to_be32(MULTIFD_MAGIC);
- packet->version = cpu_to_be32(MULTIFD_VERSION);
+ packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
+ packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
- packet->flags = cpu_to_be32(p->flags);
+ packet->hdr.flags = cpu_to_be32(p->flags);
packet->next_packet_size = cpu_to_be32(p->next_packet_size);
packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
@@ -228,12 +251,12 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
p->flags, p->next_packet_size);
}
-static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
+ const MultiFDPacketHdr_t *hdr,
+ Error **errp)
{
- const MultiFDPacket_t *packet = p->packet;
- uint32_t magic = be32_to_cpu(packet->magic);
- uint32_t version = be32_to_cpu(packet->version);
- int ret = 0;
+ uint32_t magic = be32_to_cpu(hdr->magic);
+ uint32_t version = be32_to_cpu(hdr->version);
if (magic != MULTIFD_MAGIC) {
error_setg(errp, "multifd: received packet magic %x, expected %x",
@@ -247,14 +270,32 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
return -1;
}
- p->flags = be32_to_cpu(packet->flags);
+ p->flags = be32_to_cpu(hdr->flags);
+
+ return 0;
+}
+
+static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
+ Error **errp)
+{
+ MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
+
+ packet->instance_id = be32_to_cpu(packet->instance_id);
+ p->next_packet_size = be32_to_cpu(packet->next_packet_size);
+
+ return 0;
+}
+
+static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
+{
+ const MultiFDPacket_t *packet = p->packet;
+ int ret = 0;
+
p->next_packet_size = be32_to_cpu(packet->next_packet_size);
p->packet_num = be64_to_cpu(packet->packet_num);
- p->packets_recved++;
- if (!(p->flags & MULTIFD_FLAG_SYNC)) {
- ret = multifd_ram_unfill_packet(p, errp);
- }
+ /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
+ ret = multifd_ram_unfill_packet(p, errp);
trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
p->next_packet_size);
@@ -262,6 +303,17 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
return ret;
}
+static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+{
+ p->packets_recved++;
+
+ if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
+ return multifd_recv_unfill_packet_device_state(p, errp);
+ }
+
+ return multifd_recv_unfill_packet_ram(p, errp);
+}
+
static bool multifd_send_should_exit(void)
{
return qatomic_read(&multifd_send_state->exiting);
@@ -309,6 +361,8 @@ bool multifd_send(MultiFDSendData **send_data)
return false;
}
+ QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex);
+
/* We wait here, until at least one channel is ready */
qemu_sem_wait(&multifd_send_state->channels_ready);
@@ -445,7 +499,7 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
* channels have no I/O handler callback registered when reaching
* here, because migration thread will wait for all multifd channel
* establishments to complete during setup. Since
- * migrate_fd_cleanup() will be scheduled in main thread too, all
+ * migration_cleanup() will be scheduled in main thread too, all
* previous callbacks should guarantee to be completed when
* reaching here. See multifd_send_state.channels_created and its
* usage. In the future, we could replace this with an assert
@@ -460,9 +514,9 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
qemu_sem_destroy(&p->sem_sync);
g_free(p->name);
p->name = NULL;
- g_free(p->data);
- p->data = NULL;
+ g_clear_pointer(&p->data, multifd_send_data_free);
p->packet_len = 0;
+ g_clear_pointer(&p->packet_device_state, g_free);
g_free(p->packet);
p->packet = NULL;
multifd_send_state->ops->send_cleanup(p, errp);
@@ -475,8 +529,10 @@ static void multifd_send_cleanup_state(void)
{
file_cleanup_outgoing_migration();
socket_cleanup_outgoing_migration();
+ multifd_device_state_send_cleanup();
qemu_sem_destroy(&multifd_send_state->channels_created);
qemu_sem_destroy(&multifd_send_state->channels_ready);
+ qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex);
g_free(multifd_send_state->params);
multifd_send_state->params = NULL;
g_free(multifd_send_state);
@@ -491,6 +547,36 @@ void multifd_send_shutdown(void)
return;
}
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDSendParams *p = &multifd_send_state->params[i];
+
+ /* thread_created implies the TLS handshake has succeeded */
+ if (p->tls_thread_created && p->thread_created) {
+ Error *local_err = NULL;
+ /*
+ * The destination expects the TLS session to always be
+ * properly terminated. This helps to detect a premature
+ * termination in the middle of the stream. Note that
+ * older QEMUs always break the connection on the source
+ * and the destination always sees
+ * GNUTLS_E_PREMATURE_TERMINATION.
+ */
+ migration_tls_channel_end(p->c, &local_err);
+
+ /*
+ * The above can return an error in case the migration has
+ * already failed. If the migration succeeded, errors are
+ * not expected but there's no need to kill the source.
+ */
+ if (local_err && !migration_has_failed(migrate_get_current())) {
+ warn_report(
+ "multifd_send_%d: Failed to terminate TLS connection: %s",
+ p->id, error_get_pretty(local_err));
+ break;
+ }
+ }
+ }
+
multifd_send_terminate_threads();
for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -523,11 +609,13 @@ static int multifd_zero_copy_flush(QIOChannel *c)
return ret;
}
-int multifd_send_sync_main(void)
+int multifd_send_sync_main(MultiFDSyncReq req)
{
int i;
bool flush_zero_copy;
+ assert(req != MULTIFD_SYNC_NONE);
+
flush_zero_copy = migrate_zero_copy_send();
for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -543,8 +631,8 @@ int multifd_send_sync_main(void)
* We should be the only user so far, so not possible to be set by
* others concurrently.
*/
- assert(qatomic_read(&p->pending_sync) == false);
- qatomic_set(&p->pending_sync, true);
+ assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
+ qatomic_set(&p->pending_sync, req);
qemu_sem_post(&p->sem);
}
for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -600,20 +688,42 @@ static void *multifd_send_thread(void *opaque)
* qatomic_store_release() in multifd_send().
*/
if (qatomic_load_acquire(&p->pending_job)) {
+ bool is_device_state = multifd_payload_device_state(p->data);
+ size_t total_size;
+ int write_flags_masked = 0;
+
+ p->flags = 0;
p->iovs_num = 0;
assert(!multifd_payload_empty(p->data));
- ret = multifd_send_state->ops->send_prepare(p, &local_err);
- if (ret != 0) {
- break;
+ if (is_device_state) {
+ multifd_device_state_send_prepare(p);
+
+ /* Device state packets cannot be sent via zerocopy */
+ write_flags_masked |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
+ } else {
+ ret = multifd_send_state->ops->send_prepare(p, &local_err);
+ if (ret != 0) {
+ break;
+ }
}
+ /*
+ * The packet header in the zerocopy RAM case is accounted for
+ * in multifd_nocomp_send_prepare() - where it is actually
+ * being sent.
+ */
+ total_size = iov_size(p->iov, p->iovs_num);
+
if (migrate_mapped_ram()) {
+ assert(!is_device_state);
+
ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
&p->data->u.ram, &local_err);
} else {
ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
- NULL, 0, p->write_flags,
+ NULL, 0,
+ p->write_flags & ~write_flags_masked,
&local_err);
}
@@ -621,11 +731,10 @@ static void *multifd_send_thread(void *opaque)
break;
}
- stat64_add(&mig_stats.multifd_bytes,
- p->next_packet_size + p->packet_len);
+ stat64_add(&mig_stats.multifd_bytes, total_size);
p->next_packet_size = 0;
- multifd_set_payload_type(p->data, MULTIFD_PAYLOAD_NONE);
+ multifd_send_data_clear(p->data);
/*
* Making sure p->data is published before saying "we're
@@ -634,14 +743,17 @@ static void *multifd_send_thread(void *opaque)
*/
qatomic_store_release(&p->pending_job, false);
} else {
+ MultiFDSyncReq req = qatomic_read(&p->pending_sync);
+
/*
* If not a normal job, must be a sync request. Note that
* pending_sync is a standalone flag (unlike pending_job), so
* it doesn't require explicit memory barriers.
*/
- assert(qatomic_read(&p->pending_sync));
+ assert(req != MULTIFD_SYNC_NONE);
- if (use_packets) {
+ /* Only push the SYNC message if it involves a remote sync */
+ if (req == MULTIFD_SYNC_ALL) {
p->flags = MULTIFD_FLAG_SYNC;
multifd_send_fill_packet(p);
ret = qio_channel_write_all(p->c, (void *)p->packet,
@@ -651,10 +763,9 @@ static void *multifd_send_thread(void *opaque)
}
/* p->next_packet_size will always be zero for a SYNC packet */
stat64_add(&mig_stats.multifd_bytes, p->packet_len);
- p->flags = 0;
}
- qatomic_set(&p->pending_sync, false);
+ qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
qemu_sem_post(&p->sem_sync);
}
}
@@ -723,7 +834,7 @@ static bool multifd_tls_channel_connect(MultiFDSendParams *p,
args->p = p;
p->tls_thread_created = true;
- qemu_thread_create(&p->tls_thread, "mig/src/tls",
+ qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
multifd_tls_handshake_thread, args,
QEMU_THREAD_JOINABLE);
return true;
@@ -822,6 +933,7 @@ bool multifd_send_setup(void)
thread_count = migrate_multifd_channels();
multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
+ qemu_mutex_init(&multifd_send_state->multifd_send_mutex);
qemu_sem_init(&multifd_send_state->channels_created, 0);
qemu_sem_init(&multifd_send_state->channels_ready, 0);
qatomic_set(&multifd_send_state->exiting, 0);
@@ -840,8 +952,11 @@ bool multifd_send_setup(void)
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
+ p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state));
+ p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
+ p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION);
}
- p->name = g_strdup_printf("mig/src/send_%d", i);
+ p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
p->write_flags = 0;
if (!multifd_new_send_channel_create(p, &local_err)) {
@@ -875,6 +990,8 @@ bool multifd_send_setup(void)
assert(p->iov);
}
+ multifd_device_state_send_setup();
+
return true;
err:
@@ -1014,6 +1131,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
p->packet_len = 0;
g_free(p->packet);
p->packet = NULL;
+ g_clear_pointer(&p->packet_dev_state, g_free);
g_free(p->normal);
p->normal = NULL;
g_free(p->zero);
@@ -1115,8 +1233,37 @@ void multifd_recv_sync_main(void)
trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
}
+static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp)
+{
+ g_autofree char *dev_state_buf = NULL;
+ int ret;
+
+ dev_state_buf = g_malloc(p->next_packet_size);
+
+ ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp);
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1]
+ != 0) {
+ error_setg(errp, "unterminated multifd device state idstr");
+ return -1;
+ }
+
+ if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr,
+ p->packet_dev_state->instance_id,
+ dev_state_buf, p->next_packet_size,
+ errp)) {
+ ret = -1;
+ }
+
+ return ret;
+}
+
static void *multifd_recv_thread(void *opaque)
{
+ MigrationState *s = migrate_get_current();
MultiFDRecvParams *p = opaque;
Error *local_err = NULL;
bool use_packets = multifd_use_packets();
@@ -1125,19 +1272,65 @@ static void *multifd_recv_thread(void *opaque)
trace_multifd_recv_thread_start(p->id);
rcu_register_thread();
+ if (!s->multifd_clean_tls_termination) {
+ p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF;
+ }
+
while (true) {
+ MultiFDPacketHdr_t hdr;
uint32_t flags = 0;
+ bool is_device_state = false;
bool has_data = false;
+ uint8_t *pkt_buf;
+ size_t pkt_len;
+
p->normal_num = 0;
if (use_packets) {
+ struct iovec iov = {
+ .iov_base = (void *)&hdr,
+ .iov_len = sizeof(hdr)
+ };
+
if (multifd_recv_should_exit()) {
break;
}
- ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
- p->packet_len, &local_err);
- if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
+ ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL,
+ p->read_flags, &local_err);
+ if (!ret) {
+ /* EOF */
+ assert(!local_err);
+ break;
+ }
+
+ if (ret == -1) {
+ break;
+ }
+
+ ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
+ if (ret) {
+ break;
+ }
+
+ is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
+ if (is_device_state) {
+ pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
+ pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
+ } else {
+ pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
+ pkt_len = p->packet_len - sizeof(hdr);
+ }
+
+ ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
+ &local_err);
+ if (!ret) {
+ /* EOF */
+ error_setg(&local_err, "multifd: unexpected EOF after packet header");
+ break;
+ }
+
+ if (ret == -1) {
break;
}
@@ -1151,9 +1344,18 @@ static void *multifd_recv_thread(void *opaque)
flags = p->flags;
/* recv methods don't know how to handle the SYNC flag */
p->flags &= ~MULTIFD_FLAG_SYNC;
- if (!(flags & MULTIFD_FLAG_SYNC)) {
+
+ if (is_device_state) {
+ has_data = p->next_packet_size > 0;
+ } else {
+ /*
+ * Even if it's a SYNC packet, this needs to be set
+ * because older QEMUs (<9.0) still send data along with
+ * the SYNC packet.
+ */
has_data = p->normal_num || p->zero_num;
}
+
qemu_mutex_unlock(&p->mutex);
} else {
/*
@@ -1182,14 +1384,36 @@ static void *multifd_recv_thread(void *opaque)
}
if (has_data) {
- ret = multifd_recv_state->ops->recv(p, &local_err);
+ /*
+ * multifd thread should not be active and receive data
+ * when migration is in the Postcopy phase. Two threads
+ * writing the same memory area could easily corrupt
+ * the guest state.
+ */
+ assert(!migration_in_postcopy());
+ if (is_device_state) {
+ assert(use_packets);
+ ret = multifd_device_state_recv(p, &local_err);
+ } else {
+ ret = multifd_recv_state->ops->recv(p, &local_err);
+ }
if (ret != 0) {
break;
}
+ } else if (is_device_state) {
+ error_setg(&local_err,
+ "multifd: received empty device state packet");
+ break;
}
if (use_packets) {
if (flags & MULTIFD_FLAG_SYNC) {
+ if (is_device_state) {
+ error_setg(&local_err,
+ "multifd: received SYNC device state packet");
+ break;
+ }
+
qemu_sem_post(&multifd_recv_state->sem_sync);
qemu_sem_wait(&p->sem_sync);
}
@@ -1258,8 +1482,9 @@ int multifd_recv_setup(Error **errp)
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
+ p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
}
- p->name = g_strdup_printf("mig/dst/recv_%d", i);
+ p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
p->normal = g_new0(ram_addr_t, page_count);
p->zero = g_new0(ram_addr_t, page_count);
}