1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
"""
Miscellaneous Utilities
This module provides asyncio utilities and compatibility wrappers for
Python 3.6 to provide some features that otherwise become available in
Python 3.7+.
"""
import asyncio
import sys
from typing import (
Any,
Coroutine,
Optional,
TypeVar,
cast,
)
T = TypeVar('T')
# --------------------------
# Section: Utility Functions
# --------------------------
async def flush(writer: asyncio.StreamWriter) -> None:
"""
Utility function to ensure a StreamWriter is *fully* drained.
`asyncio.StreamWriter.drain` only promises we will return to below
the "high-water mark". This function ensures we flush the entire
buffer -- by setting the high water mark to 0 and then calling
drain. The flow control limits are restored after the call is
completed.
"""
transport = cast(asyncio.WriteTransport, writer.transport)
# https://github.com/python/typeshed/issues/5779
low, high = transport.get_write_buffer_limits() # type: ignore
transport.set_write_buffer_limits(0, 0)
try:
await writer.drain()
finally:
transport.set_write_buffer_limits(high, low)
def upper_half(func: T) -> T:
"""
Do-nothing decorator that annotates a method as an "upper-half" method.
These methods must not call bottom-half functions directly, but can
schedule them to run.
"""
return func
def bottom_half(func: T) -> T:
"""
Do-nothing decorator that annotates a method as a "bottom-half" method.
These methods must take great care to handle their own exceptions whenever
possible. If they go unhandled, they will cause termination of the loop.
These methods do not, in general, have the ability to directly
report information to a caller’s context and will usually be
collected as a Task result instead.
They must not call upper-half functions directly.
"""
return func
# -------------------------------
# Section: Compatibility Wrappers
# -------------------------------
def create_task(coro: Coroutine[Any, Any, T],
loop: Optional[asyncio.AbstractEventLoop] = None
) -> 'asyncio.Future[T]':
"""
Python 3.6-compatible `asyncio.create_task` wrapper.
:param coro: The coroutine to execute in a task.
:param loop: Optionally, the loop to create the task in.
:return: An `asyncio.Future` object.
"""
if sys.version_info >= (3, 7):
if loop is not None:
return loop.create_task(coro)
return asyncio.create_task(coro) # pylint: disable=no-member
# Python 3.6:
return asyncio.ensure_future(coro, loop=loop)
def is_closing(writer: asyncio.StreamWriter) -> bool:
"""
Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
:param writer: The `asyncio.StreamWriter` object.
:return: `True` if the writer is closing, or closed.
"""
if sys.version_info >= (3, 7):
return writer.is_closing()
# Python 3.6:
transport = writer.transport
assert isinstance(transport, asyncio.WriteTransport)
return transport.is_closing()
async def wait_closed(writer: asyncio.StreamWriter) -> None:
"""
Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
:param writer: The `asyncio.StreamWriter` to wait on.
"""
if sys.version_info >= (3, 7):
await writer.wait_closed()
return
# Python 3.6
transport = writer.transport
assert isinstance(transport, asyncio.WriteTransport)
while not transport.is_closing():
await asyncio.sleep(0)
# This is an ugly workaround, but it's the best I can come up with.
sock = transport.get_extra_info('socket')
if sock is None:
# Our transport doesn't have a socket? ...
# Nothing we can reasonably do.
return
while sock.fileno() != -1:
await asyncio.sleep(0)
|