diff options
Diffstat (limited to 'migration/ram-compress.c')
-rw-r--r-- | migration/ram-compress.c | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/migration/ram-compress.c b/migration/ram-compress.c index d9bc67d..c25562f 100644 --- a/migration/ram-compress.c +++ b/migration/ram-compress.c @@ -48,6 +48,24 @@ static QemuThread *compress_threads; static QemuMutex comp_done_lock; static QemuCond comp_done_cond; +struct DecompressParam { + bool done; + bool quit; + QemuMutex mutex; + QemuCond cond; + void *des; + uint8_t *compbuf; + int len; + z_stream stream; +}; +typedef struct DecompressParam DecompressParam; + +static QEMUFile *decomp_file; +static DecompressParam *decomp_param; +static QemuThread *decompress_threads; +static QemuMutex decomp_done_lock; +static QemuCond decomp_done_cond; + static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, ram_addr_t offset, uint8_t *source_buf); @@ -272,3 +290,188 @@ retry: return pages; } + +/* return the size after decompression, or negative value on error */ +static int +qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, + const uint8_t *source, size_t source_len) +{ + int err; + + err = inflateReset(stream); + if (err != Z_OK) { + return -1; + } + + stream->avail_in = source_len; + stream->next_in = (uint8_t *)source; + stream->avail_out = dest_len; + stream->next_out = dest; + + err = inflate(stream, Z_NO_FLUSH); + if (err != Z_STREAM_END) { + return -1; + } + + return stream->total_out; +} + +static void *do_data_decompress(void *opaque) +{ + DecompressParam *param = opaque; + unsigned long pagesize; + uint8_t *des; + int len, ret; + + qemu_mutex_lock(¶m->mutex); + while (!param->quit) { + if (param->des) { + des = param->des; + len = param->len; + param->des = 0; + qemu_mutex_unlock(¶m->mutex); + + pagesize = TARGET_PAGE_SIZE; + + ret = qemu_uncompress_data(¶m->stream, des, pagesize, + param->compbuf, len); + if (ret < 0 && migrate_get_current()->decompress_error_check) { + error_report("decompress data failed"); + qemu_file_set_error(decomp_file, ret); + } + + qemu_mutex_lock(&decomp_done_lock); + param->done = true; + qemu_cond_signal(&decomp_done_cond); + qemu_mutex_unlock(&decomp_done_lock); + + qemu_mutex_lock(¶m->mutex); + } else { + qemu_cond_wait(¶m->cond, ¶m->mutex); + } + } + qemu_mutex_unlock(¶m->mutex); + + return NULL; +} + +int wait_for_decompress_done(void) +{ + int idx, thread_count; + + if (!migrate_compress()) { + return 0; + } + + thread_count = migrate_decompress_threads(); + qemu_mutex_lock(&decomp_done_lock); + for (idx = 0; idx < thread_count; idx++) { + while (!decomp_param[idx].done) { + qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); + } + } + qemu_mutex_unlock(&decomp_done_lock); + return qemu_file_get_error(decomp_file); +} + +void compress_threads_load_cleanup(void) +{ + int i, thread_count; + + if (!migrate_compress()) { + return; + } + thread_count = migrate_decompress_threads(); + for (i = 0; i < thread_count; i++) { + /* + * we use it as a indicator which shows if the thread is + * properly init'd or not + */ + if (!decomp_param[i].compbuf) { + break; + } + + qemu_mutex_lock(&decomp_param[i].mutex); + decomp_param[i].quit = true; + qemu_cond_signal(&decomp_param[i].cond); + qemu_mutex_unlock(&decomp_param[i].mutex); + } + for (i = 0; i < thread_count; i++) { + if (!decomp_param[i].compbuf) { + break; + } + + qemu_thread_join(decompress_threads + i); + qemu_mutex_destroy(&decomp_param[i].mutex); + qemu_cond_destroy(&decomp_param[i].cond); + inflateEnd(&decomp_param[i].stream); + g_free(decomp_param[i].compbuf); + decomp_param[i].compbuf = NULL; + } + g_free(decompress_threads); + g_free(decomp_param); + decompress_threads = NULL; + decomp_param = NULL; + decomp_file = NULL; +} + +int compress_threads_load_setup(QEMUFile *f) +{ + int i, thread_count; + + if (!migrate_compress()) { + return 0; + } + + thread_count = migrate_decompress_threads(); + decompress_threads = g_new0(QemuThread, thread_count); + decomp_param = g_new0(DecompressParam, thread_count); + qemu_mutex_init(&decomp_done_lock); + qemu_cond_init(&decomp_done_cond); + decomp_file = f; + for (i = 0; i < thread_count; i++) { + if (inflateInit(&decomp_param[i].stream) != Z_OK) { + goto exit; + } + + decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + qemu_mutex_init(&decomp_param[i].mutex); + qemu_cond_init(&decomp_param[i].cond); + decomp_param[i].done = true; + decomp_param[i].quit = false; + qemu_thread_create(decompress_threads + i, "decompress", + do_data_decompress, decomp_param + i, + QEMU_THREAD_JOINABLE); + } + return 0; +exit: + compress_threads_load_cleanup(); + return -1; +} + +void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len) +{ + int idx, thread_count; + + thread_count = migrate_decompress_threads(); + QEMU_LOCK_GUARD(&decomp_done_lock); + while (true) { + for (idx = 0; idx < thread_count; idx++) { + if (decomp_param[idx].done) { + decomp_param[idx].done = false; + qemu_mutex_lock(&decomp_param[idx].mutex); + qemu_get_buffer(f, decomp_param[idx].compbuf, len); + decomp_param[idx].des = host; + decomp_param[idx].len = len; + qemu_cond_signal(&decomp_param[idx].cond); + qemu_mutex_unlock(&decomp_param[idx].mutex); + break; + } + } + if (idx < thread_count) { + break; + } else { + qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); + } + } +} |