aboutsummaryrefslogtreecommitdiff
path: root/migration/ram-compress.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/ram-compress.c')
-rw-r--r--migration/ram-compress.c203
1 files changed, 203 insertions, 0 deletions
diff --git a/migration/ram-compress.c b/migration/ram-compress.c
index d9bc67d..c25562f 100644
--- a/migration/ram-compress.c
+++ b/migration/ram-compress.c
@@ -48,6 +48,24 @@ static QemuThread *compress_threads;
static QemuMutex comp_done_lock;
static QemuCond comp_done_cond;
+struct DecompressParam {
+ bool done;
+ bool quit;
+ QemuMutex mutex;
+ QemuCond cond;
+ void *des;
+ uint8_t *compbuf;
+ int len;
+ z_stream stream;
+};
+typedef struct DecompressParam DecompressParam;
+
+static QEMUFile *decomp_file;
+static DecompressParam *decomp_param;
+static QemuThread *decompress_threads;
+static QemuMutex decomp_done_lock;
+static QemuCond decomp_done_cond;
+
static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
RAMBlock *block, ram_addr_t offset,
uint8_t *source_buf);
@@ -272,3 +290,188 @@ retry:
return pages;
}
+
+/* return the size after decompression, or negative value on error */
+static int
+qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
+ const uint8_t *source, size_t source_len)
+{
+ int err;
+
+ err = inflateReset(stream);
+ if (err != Z_OK) {
+ return -1;
+ }
+
+ stream->avail_in = source_len;
+ stream->next_in = (uint8_t *)source;
+ stream->avail_out = dest_len;
+ stream->next_out = dest;
+
+ err = inflate(stream, Z_NO_FLUSH);
+ if (err != Z_STREAM_END) {
+ return -1;
+ }
+
+ return stream->total_out;
+}
+
+static void *do_data_decompress(void *opaque)
+{
+ DecompressParam *param = opaque;
+ unsigned long pagesize;
+ uint8_t *des;
+ int len, ret;
+
+ qemu_mutex_lock(&param->mutex);
+ while (!param->quit) {
+ if (param->des) {
+ des = param->des;
+ len = param->len;
+ param->des = 0;
+ qemu_mutex_unlock(&param->mutex);
+
+ pagesize = TARGET_PAGE_SIZE;
+
+ ret = qemu_uncompress_data(&param->stream, des, pagesize,
+ param->compbuf, len);
+ if (ret < 0 && migrate_get_current()->decompress_error_check) {
+ error_report("decompress data failed");
+ qemu_file_set_error(decomp_file, ret);
+ }
+
+ qemu_mutex_lock(&decomp_done_lock);
+ param->done = true;
+ qemu_cond_signal(&decomp_done_cond);
+ qemu_mutex_unlock(&decomp_done_lock);
+
+ qemu_mutex_lock(&param->mutex);
+ } else {
+ qemu_cond_wait(&param->cond, &param->mutex);
+ }
+ }
+ qemu_mutex_unlock(&param->mutex);
+
+ return NULL;
+}
+
+int wait_for_decompress_done(void)
+{
+ int idx, thread_count;
+
+ if (!migrate_compress()) {
+ return 0;
+ }
+
+ thread_count = migrate_decompress_threads();
+ qemu_mutex_lock(&decomp_done_lock);
+ for (idx = 0; idx < thread_count; idx++) {
+ while (!decomp_param[idx].done) {
+ qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
+ }
+ }
+ qemu_mutex_unlock(&decomp_done_lock);
+ return qemu_file_get_error(decomp_file);
+}
+
+void compress_threads_load_cleanup(void)
+{
+ int i, thread_count;
+
+ if (!migrate_compress()) {
+ return;
+ }
+ thread_count = migrate_decompress_threads();
+ for (i = 0; i < thread_count; i++) {
+ /*
+ * we use it as a indicator which shows if the thread is
+ * properly init'd or not
+ */
+ if (!decomp_param[i].compbuf) {
+ break;
+ }
+
+ qemu_mutex_lock(&decomp_param[i].mutex);
+ decomp_param[i].quit = true;
+ qemu_cond_signal(&decomp_param[i].cond);
+ qemu_mutex_unlock(&decomp_param[i].mutex);
+ }
+ for (i = 0; i < thread_count; i++) {
+ if (!decomp_param[i].compbuf) {
+ break;
+ }
+
+ qemu_thread_join(decompress_threads + i);
+ qemu_mutex_destroy(&decomp_param[i].mutex);
+ qemu_cond_destroy(&decomp_param[i].cond);
+ inflateEnd(&decomp_param[i].stream);
+ g_free(decomp_param[i].compbuf);
+ decomp_param[i].compbuf = NULL;
+ }
+ g_free(decompress_threads);
+ g_free(decomp_param);
+ decompress_threads = NULL;
+ decomp_param = NULL;
+ decomp_file = NULL;
+}
+
+int compress_threads_load_setup(QEMUFile *f)
+{
+ int i, thread_count;
+
+ if (!migrate_compress()) {
+ return 0;
+ }
+
+ thread_count = migrate_decompress_threads();
+ decompress_threads = g_new0(QemuThread, thread_count);
+ decomp_param = g_new0(DecompressParam, thread_count);
+ qemu_mutex_init(&decomp_done_lock);
+ qemu_cond_init(&decomp_done_cond);
+ decomp_file = f;
+ for (i = 0; i < thread_count; i++) {
+ if (inflateInit(&decomp_param[i].stream) != Z_OK) {
+ goto exit;
+ }
+
+ decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+ qemu_mutex_init(&decomp_param[i].mutex);
+ qemu_cond_init(&decomp_param[i].cond);
+ decomp_param[i].done = true;
+ decomp_param[i].quit = false;
+ qemu_thread_create(decompress_threads + i, "decompress",
+ do_data_decompress, decomp_param + i,
+ QEMU_THREAD_JOINABLE);
+ }
+ return 0;
+exit:
+ compress_threads_load_cleanup();
+ return -1;
+}
+
+void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len)
+{
+ int idx, thread_count;
+
+ thread_count = migrate_decompress_threads();
+ QEMU_LOCK_GUARD(&decomp_done_lock);
+ while (true) {
+ for (idx = 0; idx < thread_count; idx++) {
+ if (decomp_param[idx].done) {
+ decomp_param[idx].done = false;
+ qemu_mutex_lock(&decomp_param[idx].mutex);
+ qemu_get_buffer(f, decomp_param[idx].compbuf, len);
+ decomp_param[idx].des = host;
+ decomp_param[idx].len = len;
+ qemu_cond_signal(&decomp_param[idx].cond);
+ qemu_mutex_unlock(&decomp_param[idx].mutex);
+ break;
+ }
+ }
+ if (idx < thread_count) {
+ break;
+ } else {
+ qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
+ }
+ }
+}