diff options
Diffstat (limited to 'python/qemu/qmp/util.py')
-rw-r--r-- | python/qemu/qmp/util.py | 143 |
1 files changed, 37 insertions, 106 deletions
diff --git a/python/qemu/qmp/util.py b/python/qemu/qmp/util.py index ca6225e..a8229e5 100644 --- a/python/qemu/qmp/util.py +++ b/python/qemu/qmp/util.py @@ -1,25 +1,16 @@ """ Miscellaneous Utilities -This module provides asyncio utilities and compatibility wrappers for -Python 3.6 to provide some features that otherwise become available in -Python 3.7+. - -Various logging and debugging utilities are also provided, such as -`exception_summary()` and `pretty_traceback()`, used primarily for -adding information into the logging stream. +This module provides asyncio and various logging and debugging +utilities, such as `exception_summary()` and `pretty_traceback()`, used +primarily for adding information into the logging stream. """ import asyncio import sys import traceback -from typing import ( - Any, - Coroutine, - Optional, - TypeVar, - cast, -) +from typing import TypeVar, cast +import warnings T = TypeVar('T') @@ -30,9 +21,35 @@ T = TypeVar('T') # -------------------------- +def get_or_create_event_loop() -> asyncio.AbstractEventLoop: + """ + Return this thread's current event loop, or create a new one. + + This function behaves similarly to asyncio.get_event_loop() in + Python<=3.13, where if there is no event loop currently associated + with the current context, it will create and register one. It should + generally not be used in any asyncio-native applications. + """ + try: + with warnings.catch_warnings(): + # Python <= 3.13 will trigger deprecation warnings if no + # event loop is set, but will create and set a new loop. + warnings.simplefilter("ignore") + loop = asyncio.get_event_loop() + except RuntimeError: + # Python 3.14+: No event loop set for this thread, + # create and set one. + loop = asyncio.new_event_loop() + # Set this loop as the current thread's loop, to be returned + # by calls to get_event_loop() in the future. + asyncio.set_event_loop(loop) + + return loop + + async def flush(writer: asyncio.StreamWriter) -> None: """ - Utility function to ensure a StreamWriter is *fully* drained. + Utility function to ensure an `asyncio.StreamWriter` is *fully* drained. `asyncio.StreamWriter.drain` only promises we will return to below the "high-water mark". This function ensures we flush the entire @@ -72,102 +89,13 @@ def bottom_half(func: T) -> T: These methods do not, in general, have the ability to directly report information to a caller’s context and will usually be - collected as a Task result instead. + collected as an `asyncio.Task` result instead. They must not call upper-half functions directly. """ return func -# ------------------------------- -# Section: Compatibility Wrappers -# ------------------------------- - - -def create_task(coro: Coroutine[Any, Any, T], - loop: Optional[asyncio.AbstractEventLoop] = None - ) -> 'asyncio.Future[T]': - """ - Python 3.6-compatible `asyncio.create_task` wrapper. - - :param coro: The coroutine to execute in a task. - :param loop: Optionally, the loop to create the task in. - - :return: An `asyncio.Future` object. - """ - if sys.version_info >= (3, 7): - if loop is not None: - return loop.create_task(coro) - return asyncio.create_task(coro) # pylint: disable=no-member - - # Python 3.6: - return asyncio.ensure_future(coro, loop=loop) - - -def is_closing(writer: asyncio.StreamWriter) -> bool: - """ - Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper. - - :param writer: The `asyncio.StreamWriter` object. - :return: `True` if the writer is closing, or closed. - """ - if sys.version_info >= (3, 7): - return writer.is_closing() - - # Python 3.6: - transport = writer.transport - assert isinstance(transport, asyncio.WriteTransport) - return transport.is_closing() - - -async def wait_closed(writer: asyncio.StreamWriter) -> None: - """ - Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper. - - :param writer: The `asyncio.StreamWriter` to wait on. - """ - if sys.version_info >= (3, 7): - await writer.wait_closed() - return - - # Python 3.6 - transport = writer.transport - assert isinstance(transport, asyncio.WriteTransport) - - while not transport.is_closing(): - await asyncio.sleep(0) - - # This is an ugly workaround, but it's the best I can come up with. - sock = transport.get_extra_info('socket') - - if sock is None: - # Our transport doesn't have a socket? ... - # Nothing we can reasonably do. - return - - while sock.fileno() != -1: - await asyncio.sleep(0) - - -def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T: - """ - Python 3.6-compatible `asyncio.run` wrapper. - - :param coro: A coroutine to execute now. - :return: The return value from the coroutine. - """ - if sys.version_info >= (3, 7): - return asyncio.run(coro, debug=debug) - - # Python 3.6 - loop = asyncio.get_event_loop() - loop.set_debug(debug) - ret = loop.run_until_complete(coro) - loop.close() - - return ret - - # ---------------------------- # Section: Logging & Debugging # ---------------------------- @@ -177,8 +105,11 @@ def exception_summary(exc: BaseException) -> str: """ Return a summary string of an arbitrary exception. - It will be of the form "ExceptionType: Error Message", if the error + It will be of the form "ExceptionType: Error Message" if the error string is non-empty, and just "ExceptionType" otherwise. + + This code is based on CPython's implementation of + `traceback.TracebackException.format_exception_only`. """ name = type(exc).__qualname__ smod = type(exc).__module__ |