Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions Lib/test/support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
"force_not_colorized",
"BrokenIter",
"in_systemd_nspawn_sync_suppressed",
"run_no_yield_async_fn", "run_yielding_async_fn", "async_yield",
]


Expand Down Expand Up @@ -2940,3 +2941,31 @@ def in_systemd_nspawn_sync_suppressed() -> bool:
os.close(fd)

return False

def run_no_yield_async_fn(async_fn, /, *args, **kwargs):
coro = async_fn(*args, **kwargs)
try:
coro.send(None)
except StopIteration as e:
return e.value
else:
raise AssertionError("coroutine did not complete")
finally:
coro.close()


@types.coroutine
def async_yield(v):
return (yield v)


def run_yielding_async_fn(async_fn, /, *args, **kwargs):
coro = async_fn(*args, **kwargs)
try:
while True:
try:
coro.send(None)
except StopIteration as e:
return e.value
finally:
coro.close()
15 changes: 2 additions & 13 deletions Lib/test/test_contextlib_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,13 @@
asynccontextmanager, AbstractAsyncContextManager,
AsyncExitStack, nullcontext, aclosing, contextmanager)
from test import support
from test.support import run_no_yield_async_fn as _run_async_fn
import unittest
import traceback

from test.test_contextlib import TestBaseExitStack


def _run_async_fn(async_fn, /, *args, **kwargs):
coro = async_fn(*args, **kwargs)
try:
coro.send(None)
except StopIteration as e:
return e.value
else:
raise AssertionError("coroutine did not complete")
finally:
coro.close()


def _async_test(async_fn):
"""Decorator to turn an async function into a synchronous function"""
@functools.wraps(async_fn)
Expand Down Expand Up @@ -546,7 +535,7 @@ def __exit__(self, *exc_details):
exit_stack = SyncAsyncExitStack
callback_error_internal_frames = [
('__exit__', 'return _run_async_fn(self.__aexit__, *exc_details)'),
('_run_async_fn', 'coro.send(None)'),
('run_no_yield_async_fn', 'coro.send(None)'),
('__aexit__', 'raise exc'),
('__aexit__', 'cb_suppress = cb(*exc_details)'),
]
Expand Down
15 changes: 4 additions & 11 deletions Lib/test/test_inspect/test_inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from test.support import cpython_only, import_helper
from test.support import MISSING_C_DOCSTRINGS, ALWAYS_EQ
from test.support import run_no_yield_async_fn
from test.support.import_helper import DirsOnSysPath, ready_to_import
from test.support.os_helper import TESTFN, temp_cwd
from test.support.script_helper import assert_python_ok, assert_python_failure, kill_python
Expand Down Expand Up @@ -1161,19 +1162,11 @@ def f(self):
sys.modules.pop("inspect_actual")

def test_nested_class_definition_inside_async_function(self):
def run(coro):
try:
coro.send(None)
except StopIteration as e:
return e.value
else:
raise RuntimeError("coroutine did not complete synchronously!")
finally:
coro.close()
run = run_no_yield_async_fn

self.assertSourceEqual(run(mod2.func225()), 226, 227)
self.assertSourceEqual(run(mod2.func225), 226, 227)
self.assertSourceEqual(mod2.cls226, 231, 235)
self.assertSourceEqual(run(mod2.cls226().func232()), 233, 234)
self.assertSourceEqual(run(mod2.cls226().func232), 233, 234)

def test_class_definition_same_name_diff_methods(self):
self.assertSourceEqual(mod2.cls296, 296, 298)
Expand Down
57 changes: 28 additions & 29 deletions Lib/test/test_pdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
from test.support.pty_helper import run_pty, FakeInput
from unittest.mock import patch

# gh-114275: WASI fails to run asyncio tests, similar skip than test_asyncio.
SKIP_ASYNCIO_TESTS = (not support.has_socket_support)
SKIP_CORO_TESTS = False


class PdbTestInput(object):
Expand Down Expand Up @@ -2049,23 +2048,23 @@ def test_pdb_next_command_for_generator():
finished
"""

if not SKIP_ASYNCIO_TESTS:
if not SKIP_CORO_TESTS:
def test_pdb_next_command_for_coroutine():
"""Testing skip unwindng stack on yield for coroutines for "next" command

>>> import asyncio
>>> from test.support import run_yielding_async_fn, async_yield

>>> async def test_coro():
... await asyncio.sleep(0)
... await asyncio.sleep(0)
... await asyncio.sleep(0)
... await async_yield(0)
... await async_yield(0)
... await async_yield(0)

>>> async def test_main():
... import pdb; pdb.Pdb(nosigint=True, readrc=False).set_trace()
... await test_coro()

>>> def test_function():
... asyncio.run(test_main())
... run_yielding_async_fn(test_main)
... print("finished")

>>> with PdbTestInput(['step',
Expand All @@ -2088,13 +2087,13 @@ def test_pdb_next_command_for_coroutine():
-> async def test_coro():
(Pdb) step
> <doctest test.test_pdb.test_pdb_next_command_for_coroutine[1]>(2)test_coro()
-> await asyncio.sleep(0)
-> await async_yield(0)
(Pdb) next
> <doctest test.test_pdb.test_pdb_next_command_for_coroutine[1]>(3)test_coro()
-> await asyncio.sleep(0)
-> await async_yield(0)
(Pdb) next
> <doctest test.test_pdb.test_pdb_next_command_for_coroutine[1]>(4)test_coro()
-> await asyncio.sleep(0)
-> await async_yield(0)
(Pdb) next
Internal StopIteration
> <doctest test.test_pdb.test_pdb_next_command_for_coroutine[2]>(3)test_main()
Expand All @@ -2110,11 +2109,11 @@ def test_pdb_next_command_for_coroutine():
def test_pdb_next_command_for_asyncgen():
"""Testing skip unwindng stack on yield for coroutines for "next" command

>>> import asyncio
>>> from test.support import run_yielding_async_fn, async_yield

>>> async def agen():
... yield 1
... await asyncio.sleep(0)
... await async_yield(0)
... yield 2

>>> async def test_coro():
Expand All @@ -2126,7 +2125,7 @@ def test_pdb_next_command_for_asyncgen():
... await test_coro()

>>> def test_function():
... asyncio.run(test_main())
... run_yielding_async_fn(test_main)
... print("finished")

>>> with PdbTestInput(['step',
Expand Down Expand Up @@ -2163,7 +2162,7 @@ def test_pdb_next_command_for_asyncgen():
-> yield 1
(Pdb) next
> <doctest test.test_pdb.test_pdb_next_command_for_asyncgen[1]>(3)agen()
-> await asyncio.sleep(0)
-> await async_yield(0)
(Pdb) continue
2
finished
Expand Down Expand Up @@ -2228,23 +2227,23 @@ def test_pdb_return_command_for_generator():
finished
"""

if not SKIP_ASYNCIO_TESTS:
if not SKIP_CORO_TESTS:
def test_pdb_return_command_for_coroutine():
"""Testing no unwindng stack on yield for coroutines for "return" command

>>> import asyncio
>>> from test.support import run_yielding_async_fn, async_yield

>>> async def test_coro():
... await asyncio.sleep(0)
... await asyncio.sleep(0)
... await asyncio.sleep(0)
... await async_yield(0)
... await async_yield(0)
... await async_yield(0)

>>> async def test_main():
... import pdb; pdb.Pdb(nosigint=True, readrc=False).set_trace()
... await test_coro()

>>> def test_function():
... asyncio.run(test_main())
... run_yielding_async_fn(test_main)
... print("finished")

>>> with PdbTestInput(['step',
Expand All @@ -2264,10 +2263,10 @@ def test_pdb_return_command_for_coroutine():
-> async def test_coro():
(Pdb) step
> <doctest test.test_pdb.test_pdb_return_command_for_coroutine[1]>(2)test_coro()
-> await asyncio.sleep(0)
-> await async_yield(0)
(Pdb) next
> <doctest test.test_pdb.test_pdb_return_command_for_coroutine[1]>(3)test_coro()
-> await asyncio.sleep(0)
-> await async_yield(0)
(Pdb) continue
finished
"""
Expand Down Expand Up @@ -2320,28 +2319,28 @@ def test_pdb_until_command_for_generator():
finished
"""

if not SKIP_ASYNCIO_TESTS:
if not SKIP_CORO_TESTS:
def test_pdb_until_command_for_coroutine():
"""Testing no unwindng stack for coroutines
for "until" command if target breakpoint is not reached

>>> import asyncio
>>> from test.support import run_yielding_async_fn, async_yield

>>> async def test_coro():
... print(0)
... await asyncio.sleep(0)
... await async_yield(0)
... print(1)
... await asyncio.sleep(0)
... await async_yield(0)
... print(2)
... await asyncio.sleep(0)
... await async_yield(0)
... print(3)

>>> async def test_main():
... import pdb; pdb.Pdb(nosigint=True, readrc=False).set_trace()
... await test_coro()

>>> def test_function():
... asyncio.run(test_main())
... run_yielding_async_fn(test_main)
... print("finished")

>>> with PdbTestInput(['step',
Expand Down
Loading