diff options
Diffstat (limited to 'net/colo-compare.c')
-rw-r--r-- | net/colo-compare.c | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/net/colo-compare.c b/net/colo-compare.c index dd745a4..80e6532 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -27,11 +27,16 @@ #include "qemu/sockets.h" #include "colo.h" #include "sysemu/iothread.h" +#include "net/colo-compare.h" +#include "migration/colo.h" #define TYPE_COLO_COMPARE "colo-compare" #define COLO_COMPARE(obj) \ OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) +static QTAILQ_HEAD(, CompareState) net_compares = + QTAILQ_HEAD_INITIALIZER(net_compares); + #define COMPARE_READ_LEN_MAX NET_BUFSIZE #define MAX_QUEUE_SIZE 1024 @@ -41,6 +46,10 @@ /* TODO: Should be configurable */ #define REGULAR_PACKET_CHECK_MS 3000 +static QemuMutex event_mtx; +static QemuCond event_complete_cond; +static int event_unhandled_count; + /* * + CompareState ++ * | | @@ -87,6 +96,11 @@ typedef struct CompareState { IOThread *iothread; GMainContext *worker_context; QEMUTimer *packet_check_timer; + + QEMUBH *event_bh; + enum colo_event event; + + QTAILQ_ENTRY(CompareState) next; } CompareState; typedef struct CompareClass { @@ -736,6 +750,25 @@ static void check_old_packet_regular(void *opaque) REGULAR_PACKET_CHECK_MS); } +/* Public API, Used for COLO frame to notify compare event */ +void colo_notify_compares_event(void *opaque, int event, Error **errp) +{ + CompareState *s; + + qemu_mutex_lock(&event_mtx); + QTAILQ_FOREACH(s, &net_compares, next) { + s->event = event; + qemu_bh_schedule(s->event_bh); + event_unhandled_count++; + } + /* Wait all compare threads to finish handling this event */ + while (event_unhandled_count > 0) { + qemu_cond_wait(&event_complete_cond, &event_mtx); + } + + qemu_mutex_unlock(&event_mtx); +} + static void colo_compare_timer_init(CompareState *s) { AioContext *ctx = iothread_get_aio_context(s->iothread); @@ -756,6 +789,30 @@ static void colo_compare_timer_del(CompareState *s) } } +static void colo_flush_packets(void *opaque, void *user_data); + +static void colo_compare_handle_event(void *opaque) +{ + CompareState *s = opaque; + + switch (s->event) { + case COLO_EVENT_CHECKPOINT: + g_queue_foreach(&s->conn_list, colo_flush_packets, s); + break; + case COLO_EVENT_FAILOVER: + break; + default: + break; + } + + assert(event_unhandled_count > 0); + + qemu_mutex_lock(&event_mtx); + event_unhandled_count--; + qemu_cond_broadcast(&event_complete_cond); + qemu_mutex_unlock(&event_mtx); +} + static void colo_compare_iothread(CompareState *s) { object_ref(OBJECT(s->iothread)); @@ -769,6 +826,7 @@ static void colo_compare_iothread(CompareState *s) s, s->worker_context, true); colo_compare_timer_init(s); + s->event_bh = qemu_bh_new(colo_compare_handle_event, s); } static char *compare_get_pri_indev(Object *obj, Error **errp) @@ -926,8 +984,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr); net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr); + QTAILQ_INSERT_TAIL(&net_compares, s, next); + g_queue_init(&s->conn_list); + qemu_mutex_init(&event_mtx); + qemu_cond_init(&event_complete_cond); + s->connection_track_table = g_hash_table_new_full(connection_key_hash, connection_key_equal, g_free, @@ -990,6 +1053,7 @@ static void colo_compare_init(Object *obj) static void colo_compare_finalize(Object *obj) { CompareState *s = COLO_COMPARE(obj); + CompareState *tmp = NULL; qemu_chr_fe_deinit(&s->chr_pri_in, false); qemu_chr_fe_deinit(&s->chr_sec_in, false); @@ -997,6 +1061,16 @@ static void colo_compare_finalize(Object *obj) if (s->iothread) { colo_compare_timer_del(s); } + + qemu_bh_delete(s->event_bh); + + QTAILQ_FOREACH(tmp, &net_compares, next) { + if (tmp == s) { + QTAILQ_REMOVE(&net_compares, s, next); + break; + } + } + /* Release all unhandled packets after compare thead exited */ g_queue_foreach(&s->conn_list, colo_flush_packets, s); @@ -1009,6 +1083,10 @@ static void colo_compare_finalize(Object *obj) if (s->iothread) { object_unref(OBJECT(s->iothread)); } + + qemu_mutex_destroy(&event_mtx); + qemu_cond_destroy(&event_complete_cond); + g_free(s->pri_indev); g_free(s->sec_indev); g_free(s->outdev); |