From 3e6bed619a1d13858e540e01aae275abdf9146ae Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Fri, 3 Mar 2023 12:45:29 +0100 Subject: monitor: cleanup detection of qmp_dispatcher_co shutting down Instead of overloading qmp_dispatcher_co_busy, make the coroutine pointer NULL. This will make things break spectacularly if somebody tries to start a request after monitor_cleanup(). AIO_WAIT_WHILE_UNLOCKED() does not need qatomic_mb_read(), because the macro contains all the necessary memory barriers. Signed-off-by: Paolo Bonzini --- monitor/qmp.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'monitor/qmp.c') diff --git a/monitor/qmp.c b/monitor/qmp.c index 092c527..f0cc6dc 100644 --- a/monitor/qmp.c +++ b/monitor/qmp.c @@ -226,6 +226,7 @@ void coroutine_fn monitor_qmp_dispatcher_co(void *data) /* On shutdown, don't take any more requests from the queue */ if (qmp_dispatcher_co_shutdown) { + qatomic_set(&qmp_dispatcher_co, NULL); return; } @@ -250,6 +251,7 @@ void coroutine_fn monitor_qmp_dispatcher_co(void *data) * yielded and were reentered from monitor_cleanup() */ if (qmp_dispatcher_co_shutdown) { + qatomic_set(&qmp_dispatcher_co, NULL); return; } } -- cgit v1.1 From 0ff25537018c0939919a35886265c38db28b2a8a Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Fri, 3 Mar 2023 12:51:33 +0100 Subject: monitor: cleanup fetching of QMP requests Use a continue statement so that "after going to sleep" is treated the same way as "after processing a request". Pull the monitor_lock critical section out of monitor_qmp_requests_pop_any_with_lock() and protect qmp_dispatcher_co_shutdown with the monitor_lock. The two changes are complex to separate because monitor_qmp_dispatcher_co() previously had a complicated logic to check for shutdown both before and after going to sleep. Signed-off-by: Paolo Bonzini --- monitor/qmp.c | 40 +++++++++++++++------------------------- 1 file changed, 15 insertions(+), 25 deletions(-) (limited to 'monitor/qmp.c') diff --git a/monitor/qmp.c b/monitor/qmp.c index f0cc6dc..dfc2156 100644 --- a/monitor/qmp.c +++ b/monitor/qmp.c @@ -178,8 +178,6 @@ static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void) Monitor *mon; MonitorQMP *qmp_mon; - QEMU_LOCK_GUARD(&monitor_lock); - QTAILQ_FOREACH(mon, &mon_list, entry) { if (!monitor_is_qmp(mon)) { continue; @@ -215,6 +213,10 @@ void coroutine_fn monitor_qmp_dispatcher_co(void *data) MonitorQMP *mon; while (true) { + /* + * busy must be set to true again by whoever + * rescheduled us to avoid double scheduling + */ assert(qatomic_mb_read(&qmp_dispatcher_co_busy) == true); /* @@ -224,36 +226,23 @@ void coroutine_fn monitor_qmp_dispatcher_co(void *data) */ qatomic_mb_set(&qmp_dispatcher_co_busy, false); - /* On shutdown, don't take any more requests from the queue */ - if (qmp_dispatcher_co_shutdown) { - qatomic_set(&qmp_dispatcher_co, NULL); - return; + WITH_QEMU_LOCK_GUARD(&monitor_lock) { + /* On shutdown, don't take any more requests from the queue */ + if (qmp_dispatcher_co_shutdown) { + return NULL; + } + + req_obj = monitor_qmp_requests_pop_any_with_lock(); } - while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) { + if (!req_obj) { /* * No more requests to process. Wait to be reentered from * handle_qmp_command() when it pushes more requests, or * from monitor_cleanup() when it requests shutdown. */ - if (!qmp_dispatcher_co_shutdown) { - qemu_coroutine_yield(); - - /* - * busy must be set to true again by whoever - * rescheduled us to avoid double scheduling - */ - assert(qatomic_xchg(&qmp_dispatcher_co_busy, false) == true); - } - - /* - * qmp_dispatcher_co_shutdown may have changed if we - * yielded and were reentered from monitor_cleanup() - */ - if (qmp_dispatcher_co_shutdown) { - qatomic_set(&qmp_dispatcher_co, NULL); - return; - } + qemu_coroutine_yield(); + continue; } trace_monitor_qmp_in_band_dequeue(req_obj, @@ -342,6 +331,7 @@ void coroutine_fn monitor_qmp_dispatcher_co(void *data) aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co); qemu_coroutine_yield(); } + qatomic_set(&qmp_dispatcher_co, NULL); } static void handle_qmp_command(void *opaque, QObject *req, Error *err) -- cgit v1.1 From 9f2d58546e8cc83b1b033c4f89dcb188e7b05c0c Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Fri, 3 Mar 2023 13:51:44 +0100 Subject: monitor: introduce qmp_dispatcher_co_wake This makes it possible to turn qmp_dispatcher_co_busy into a static variable. Signed-off-by: Paolo Bonzini --- monitor/qmp.c | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) (limited to 'monitor/qmp.c') diff --git a/monitor/qmp.c b/monitor/qmp.c index dfc2156..613b74e 100644 --- a/monitor/qmp.c +++ b/monitor/qmp.c @@ -33,6 +33,27 @@ #include "qapi/qmp/qlist.h" #include "trace.h" +/* + * qmp_dispatcher_co_busy is used for synchronisation between the + * monitor thread and the main thread to ensure that the dispatcher + * coroutine never gets scheduled a second time when it's already + * scheduled (scheduling the same coroutine twice is forbidden). + * + * It is true if the coroutine is active and processing requests. + * Additional requests may then be pushed onto mon->qmp_requests, + * and @qmp_dispatcher_co_shutdown may be set without further ado. + * @qmp_dispatcher_co_busy must not be woken up in this case. + * + * If false, you also have to set @qmp_dispatcher_co_busy to true and + * wake up @qmp_dispatcher_co after pushing the new requests. + * + * The coroutine will automatically change this variable back to false + * before it yields. Nobody else may set the variable to false. + * + * Access must be atomic for thread safety. + */ +static bool qmp_dispatcher_co_busy = true; + struct QMPRequest { /* Owner of the request */ MonitorQMP *mon; @@ -334,6 +355,13 @@ void coroutine_fn monitor_qmp_dispatcher_co(void *data) qatomic_set(&qmp_dispatcher_co, NULL); } +void qmp_dispatcher_co_wake(void) +{ + if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) { + aio_co_wake(qmp_dispatcher_co); + } +} + static void handle_qmp_command(void *opaque, QObject *req, Error *err) { MonitorQMP *mon = opaque; @@ -395,9 +423,7 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err) } /* Kick the dispatcher routine */ - if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) { - aio_co_wake(qmp_dispatcher_co); - } + qmp_dispatcher_co_wake(); } static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size) -- cgit v1.1 From 60f4f62efeb174fe7433ce9ebc37836e70ec9b75 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Fri, 3 Mar 2023 13:51:58 +0100 Subject: monitor: extract request dequeuing to a new function Signed-off-by: Paolo Bonzini --- monitor/qmp.c | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) (limited to 'monitor/qmp.c') diff --git a/monitor/qmp.c b/monitor/qmp.c index 613b74e..e6b1043 100644 --- a/monitor/qmp.c +++ b/monitor/qmp.c @@ -226,13 +226,8 @@ static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void) return req_obj; } -void coroutine_fn monitor_qmp_dispatcher_co(void *data) +static QMPRequest *monitor_qmp_dispatcher_pop_any(void) { - QMPRequest *req_obj = NULL; - QDict *rsp; - bool oob_enabled; - MonitorQMP *mon; - while (true) { /* * busy must be set to true again by whoever @@ -248,24 +243,36 @@ void coroutine_fn monitor_qmp_dispatcher_co(void *data) qatomic_mb_set(&qmp_dispatcher_co_busy, false); WITH_QEMU_LOCK_GUARD(&monitor_lock) { + QMPRequest *req_obj; + /* On shutdown, don't take any more requests from the queue */ if (qmp_dispatcher_co_shutdown) { return NULL; } req_obj = monitor_qmp_requests_pop_any_with_lock(); + if (req_obj) { + return req_obj; + } } - if (!req_obj) { - /* - * No more requests to process. Wait to be reentered from - * handle_qmp_command() when it pushes more requests, or - * from monitor_cleanup() when it requests shutdown. - */ - qemu_coroutine_yield(); - continue; - } + /* + * No more requests to process. Wait to be reentered from + * handle_qmp_command() when it pushes more requests, or + * from monitor_cleanup() when it requests shutdown. + */ + qemu_coroutine_yield(); + } +} + +void coroutine_fn monitor_qmp_dispatcher_co(void *data) +{ + QMPRequest *req_obj; + QDict *rsp; + bool oob_enabled; + MonitorQMP *mon; + while ((req_obj = monitor_qmp_dispatcher_pop_any()) != NULL) { trace_monitor_qmp_in_band_dequeue(req_obj, req_obj->mon->qmp_requests->length); -- cgit v1.1 From eea7cd3fc5139d7523f3c7a67d9c864b944dfacd Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Wed, 15 Mar 2023 12:34:01 +0100 Subject: monitor: do not use mb_read/mb_set Instead of relying on magic memory barriers, document the pattern that is being used. It is the one based on Dekker's algorithm, and in this case it is embodied as follows: enqueue request; sleeping = true; smp_mb(); smp_mb(); if (sleeping) kick(); if (!have a request) yield(); Signed-off-by: Paolo Bonzini --- monitor/qmp.c | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) (limited to 'monitor/qmp.c') diff --git a/monitor/qmp.c b/monitor/qmp.c index e6b1043..c8e0156 100644 --- a/monitor/qmp.c +++ b/monitor/qmp.c @@ -39,13 +39,16 @@ * coroutine never gets scheduled a second time when it's already * scheduled (scheduling the same coroutine twice is forbidden). * - * It is true if the coroutine is active and processing requests. - * Additional requests may then be pushed onto mon->qmp_requests, - * and @qmp_dispatcher_co_shutdown may be set without further ado. - * @qmp_dispatcher_co_busy must not be woken up in this case. + * It is true if the coroutine will process at least one more request + * before going to sleep. Either it has been kicked already, or it + * is active and processing requests. Additional requests may therefore + * be pushed onto mon->qmp_requests, and @qmp_dispatcher_co_shutdown may + * be set without further ado. @qmp_dispatcher_co must not be woken up + * in this case. * - * If false, you also have to set @qmp_dispatcher_co_busy to true and - * wake up @qmp_dispatcher_co after pushing the new requests. + * If false, you have to wake up @qmp_dispatcher_co after pushing new + * requests. You also have to set @qmp_dispatcher_co_busy to true + * before waking up the coroutine. * * The coroutine will automatically change this variable back to false * before it yields. Nobody else may set the variable to false. @@ -230,15 +233,18 @@ static QMPRequest *monitor_qmp_dispatcher_pop_any(void) { while (true) { /* - * busy must be set to true again by whoever - * rescheduled us to avoid double scheduling + * To avoid double scheduling, busy is true on entry to + * monitor_qmp_dispatcher_co(), and must be set again before + * aio_co_wake()-ing it. */ - assert(qatomic_mb_read(&qmp_dispatcher_co_busy) == true); + assert(qatomic_read(&qmp_dispatcher_co_busy) == true); /* * Mark the dispatcher as not busy already here so that we * don't miss any new requests coming in the middle of our * processing. + * + * Clear qmp_dispatcher_co_busy before reading request. */ qatomic_mb_set(&qmp_dispatcher_co_busy, false); @@ -364,6 +370,9 @@ void coroutine_fn monitor_qmp_dispatcher_co(void *data) void qmp_dispatcher_co_wake(void) { + /* Write request before reading qmp_dispatcher_co_busy. */ + smp_mb__before_rmw(); + if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) { aio_co_wake(qmp_dispatcher_co); } -- cgit v1.1