aboutsummaryrefslogtreecommitdiff
path: root/migration/multifd-device-state.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/multifd-device-state.c')
-rw-r--r--migration/multifd-device-state.c92
1 files changed, 92 insertions, 0 deletions
diff --git a/migration/multifd-device-state.c b/migration/multifd-device-state.c
index 3097ffa..94222d0 100644
--- a/migration/multifd-device-state.c
+++ b/migration/multifd-device-state.c
@@ -10,7 +10,10 @@
*/
#include "qemu/osdep.h"
+#include "qapi/error.h"
#include "qemu/lockable.h"
+#include "block/thread-pool.h"
+#include "migration.h"
#include "migration/misc.h"
#include "multifd.h"
#include "options.h"
@@ -19,6 +22,9 @@ static struct {
QemuMutex queue_job_mutex;
MultiFDSendData *send_data;
+
+ ThreadPool *threads;
+ bool threads_abort;
} *multifd_send_device_state;
void multifd_device_state_send_setup(void)
@@ -29,10 +35,14 @@ void multifd_device_state_send_setup(void)
qemu_mutex_init(&multifd_send_device_state->queue_job_mutex);
multifd_send_device_state->send_data = multifd_send_data_alloc();
+
+ multifd_send_device_state->threads = thread_pool_new();
+ multifd_send_device_state->threads_abort = false;
}
void multifd_device_state_send_cleanup(void)
{
+ g_clear_pointer(&multifd_send_device_state->threads, thread_pool_free);
g_clear_pointer(&multifd_send_device_state->send_data,
multifd_send_data_free);
@@ -118,3 +128,85 @@ bool multifd_device_state_supported(void)
return migrate_multifd() && !migrate_mapped_ram() &&
migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE;
}
+
+static void multifd_device_state_save_thread_data_free(void *opaque)
+{
+ SaveLiveCompletePrecopyThreadData *data = opaque;
+
+ g_clear_pointer(&data->idstr, g_free);
+ g_free(data);
+}
+
+static int multifd_device_state_save_thread(void *opaque)
+{
+ SaveLiveCompletePrecopyThreadData *data = opaque;
+ g_autoptr(Error) local_err = NULL;
+
+ if (!data->hdlr(data, &local_err)) {
+ MigrationState *s = migrate_get_current();
+
+ /*
+ * Can't call abort_device_state_save_threads() here since new
+ * save threads could still be in process of being launched
+ * (if, for example, the very first save thread launched exited
+ * with an error very quickly).
+ */
+
+ assert(local_err);
+
+ /*
+ * In case of multiple save threads failing which thread error
+ * return we end setting is purely arbitrary.
+ */
+ migrate_set_error(s, local_err);
+ }
+
+ return 0;
+}
+
+bool multifd_device_state_save_thread_should_exit(void)
+{
+ return qatomic_read(&multifd_send_device_state->threads_abort);
+}
+
+void
+multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr,
+ char *idstr, uint32_t instance_id,
+ void *opaque)
+{
+ SaveLiveCompletePrecopyThreadData *data;
+
+ assert(multifd_device_state_supported());
+ assert(multifd_send_device_state);
+
+ assert(!qatomic_read(&multifd_send_device_state->threads_abort));
+
+ data = g_new(SaveLiveCompletePrecopyThreadData, 1);
+ data->hdlr = hdlr;
+ data->idstr = g_strdup(idstr);
+ data->instance_id = instance_id;
+ data->handler_opaque = opaque;
+
+ thread_pool_submit_immediate(multifd_send_device_state->threads,
+ multifd_device_state_save_thread,
+ data,
+ multifd_device_state_save_thread_data_free);
+}
+
+void multifd_abort_device_state_save_threads(void)
+{
+ assert(multifd_device_state_supported());
+
+ qatomic_set(&multifd_send_device_state->threads_abort, true);
+}
+
+bool multifd_join_device_state_save_threads(void)
+{
+ MigrationState *s = migrate_get_current();
+
+ assert(multifd_device_state_supported());
+
+ thread_pool_wait(multifd_send_device_state->threads);
+
+ return !migrate_has_error(s);
+}