Skip to content

Commit 41b412f

Browse files
committed
Add loop.shutdown_asyncgens() method.
1 parent 6f07279 commit 41b412f

File tree

4 files changed

+107
-1
lines changed

4 files changed

+107
-1
lines changed

tests/test_base.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,47 @@ def coro():
498498
self.assertFalse(isinstance(task, MyTask))
499499
self.loop.run_until_complete(task)
500500

501+
def test_shutdown_asyncgens_01(self):
502+
finalized = list()
503+
504+
if not hasattr(self.loop, 'shutdown_asyncgens'):
505+
raise unittest.SkipTest()
506+
507+
waiter_src = '''async def waiter(timeout, finalized, loop):
508+
try:
509+
await asyncio.sleep(timeout, loop=loop)
510+
yield 1
511+
finally:
512+
await asyncio.sleep(0, loop=loop)
513+
finalized.append(1)
514+
'''
515+
516+
try:
517+
g = {}
518+
exec(waiter_src, globals(), g)
519+
except SyntaxError:
520+
# Python < 3.6
521+
raise unittest.SkipTest()
522+
else:
523+
waiter = g['waiter']
524+
525+
async def wait():
526+
async for _ in waiter(1, finalized, self.loop):
527+
pass
528+
529+
t1 = self.loop.create_task(wait())
530+
t2 = self.loop.create_task(wait())
531+
532+
self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop))
533+
534+
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
535+
self.assertEqual(finalized, [1, 1])
536+
537+
# Silence warnings
538+
t1.cancel()
539+
t2.cancel()
540+
self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop))
541+
501542

502543
class TestBaseUV(_TestBase, UVTestCase):
503544

uvloop/includes/stdlib.pxi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ cdef tb_format_list = traceback.format_list
125125
cdef warnings_warn = warnings.warn
126126

127127
cdef weakref_WeakValueDictionary = weakref.WeakValueDictionary
128+
cdef weakref_WeakSet = weakref.WeakSet
128129

129130

130131
# Cython doesn't clean-up imported objects properly in Py3 mode,

uvloop/loop.pxd

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ cdef class Loop:
7171

7272
cdef object __weakref__
7373

74+
object _asyncgens
75+
bint _asyncgens_shutdown_called
76+
7477
char _recv_buffer[UV_STREAM_RECV_BUF_SIZE]
7578
bint _recv_buffer_in_use
7679

uvloop/loop.pyx

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,17 @@ cdef class Loop:
131131

132132
self._coroutine_wrapper_set = False
133133

134+
if hasattr(sys, 'get_asyncgen_hooks'):
135+
# Python >= 3.6
136+
# A weak set of all asynchronous generators that are
137+
# being iterated by the loop.
138+
self._asyncgens = weakref_WeakSet()
139+
else:
140+
self._asyncgens = None
141+
142+
# Set to True when `loop.shutdown_asyncgens` is called.
143+
self._asyncgens_shutdown_called = False
144+
134145
def __init__(self):
135146
self.set_debug((not sys_ignore_environment
136147
and bool(os_environ.get('PYTHONASYNCIODEBUG'))))
@@ -1081,10 +1092,16 @@ cdef class Loop:
10811092
# This is how asyncio loop behaves.
10821093
mode = uv.UV_RUN_NOWAIT
10831094
self._set_coroutine_wrapper(self._debug)
1095+
if self._asyncgens is not None:
1096+
old_agen_hooks = sys.get_asyncgen_hooks()
1097+
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
1098+
finalizer=self._asyncgen_finalizer_hook)
10841099
try:
10851100
self._run(mode)
10861101
finally:
1087-
self._set_coroutine_wrapper(0)
1102+
self._set_coroutine_wrapper(False)
1103+
if self._asyncgens is not None:
1104+
sys.set_asyncgen_hooks(*old_agen_hooks)
10881105

10891106
def close(self):
10901107
"""Close the event loop.
@@ -2471,6 +2488,50 @@ cdef class Loop:
24712488
await waiter
24722489
return udp, protocol
24732490

2491+
def _asyncgen_finalizer_hook(self, agen):
2492+
self._asyncgens.discard(agen)
2493+
if not self.is_closed():
2494+
self.create_task(agen.aclose())
2495+
# Wake up the loop if the finalizer was called from
2496+
# a different thread.
2497+
self._write_to_self()
2498+
2499+
def _asyncgen_firstiter_hook(self, agen):
2500+
if self._asyncgens_shutdown_called:
2501+
warnings_warn(
2502+
"asynchronous generator {!r} was scheduled after "
2503+
"loop.shutdown_asyncgens() call".format(agen),
2504+
ResourceWarning, source=self)
2505+
2506+
self._asyncgens.add(agen)
2507+
2508+
async def shutdown_asyncgens(self):
2509+
"""Shutdown all active asynchronous generators."""
2510+
self._asyncgens_shutdown_called = True
2511+
2512+
if self._asyncgens is None or not len(self._asyncgens):
2513+
# If Python version is <3.6 or we don't have any asynchronous
2514+
# generators alive.
2515+
return
2516+
2517+
closing_agens = list(self._asyncgens)
2518+
self._asyncgens.clear()
2519+
2520+
shutdown_coro = aio_gather(
2521+
*[ag.aclose() for ag in closing_agens],
2522+
return_exceptions=True,
2523+
loop=self)
2524+
2525+
results = await shutdown_coro
2526+
for result, agen in zip(results, closing_agens):
2527+
if isinstance(result, Exception):
2528+
self.call_exception_handler({
2529+
'message': 'an error occurred during closing of '
2530+
'asynchronous generator {!r}'.format(agen),
2531+
'exception': result,
2532+
'asyncgen': agen
2533+
})
2534+
24742535

24752536
cdef void __loop_alloc_buffer(uv.uv_handle_t* uvhandle,
24762537
size_t suggested_size,

0 commit comments

Comments
 (0)