Skip to content

Commit 0dee54f

Browse files
committed
Issue 29988: Test signal safety of with statements
Given the ability to get trace hooks to run after every opcode (instead of only between lines), its possible to provoke failures in with statements, where `__enter__` or `__aenter__` can run successfully, but a poorly timed external exception (e.g. KeyboardInterrupt) can lead to `__exit__` or `__aexit__` still being skipped.
1 parent cb5b68a commit 0dee54f

File tree

1 file changed

+123
-0
lines changed

1 file changed

+123
-0
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
"""Additional signal safety tests for "with" and "async with"
2+
"""
3+
4+
from test.support import cpython_only, verbose
5+
import asyncio
6+
import dis
7+
import sys
8+
import unittest
9+
10+
class InjectedException(Exception):
11+
"""Exception injected into a running frame via a trace function"""
12+
pass
13+
14+
def raise_after_instruction(target_function, target_instruction):
15+
"""Sets a trace function to inject an exception into given function
16+
17+
Relies on the ability to request that a trace function be called for
18+
every executed opcode, not just every line
19+
"""
20+
target_code = target_function.__code__
21+
def inject_exception(frame, event, arg):
22+
if frame.f_code is not target_code:
23+
return
24+
# TODO: Add `f_traceall` API to request tracing for every opcode
25+
# frame.f_traceall = True
26+
if frame.f_lasti >= target_instruction:
27+
raise RuntimeError(f"Failing after {target_instruction}")
28+
return inject_exception
29+
sys.settrace(inject_exception)
30+
31+
# TODO: Add a test case that ensures raise_after_instruction is working
32+
# properly (otherwise there's a risk the tests will pass due to the
33+
# exception not being injected properly)
34+
35+
@cpython_only
36+
class CheckSignalSafety(unittest.TestCase):
37+
"""Ensure with statements are signal-safe.
38+
39+
Signal safety means that, regardless of when external signals (e.g.
40+
KeyboardInterrupt) are received:
41+
42+
1. If __enter__ succeeds, __exit__ will be called
43+
2. If __aenter__ succeeeds, __aexit__ will be called *and*
44+
the resulting awaitable will be awaited
45+
46+
See https://bugs.python.org/issue29988 for more details
47+
"""
48+
49+
def setUp(self):
50+
old_trace = sys.gettrace()
51+
self.addCleanup(sys.settrace, old_trace)
52+
sys.settrace(None)
53+
54+
def assert_cm_exited(self, tracking_cm, target_instruction, traced_operation):
55+
if tracking_cm.enter_without_exit:
56+
msg = ("Context manager entered without exit due to "
57+
f"exception injected at offset {target_instruction} in:\n"
58+
f"{dis.Bytecode(traced_operation).dis()}")
59+
self.fail(msg)
60+
61+
def test_synchronous_cm(self):
62+
class TrackingCM():
63+
def __init__(self):
64+
self.enter_without_exit = None
65+
def __enter__(self):
66+
self.enter_without_exit = True
67+
def __exit__(self, *args):
68+
self.enter_without_exit = False
69+
tracking_cm = TrackingCM()
70+
def traced_function():
71+
with tracking_cm:
72+
1 + 1
73+
return
74+
target_instruction = -1
75+
num_instructions = len(traced_function.__code__.co_code) - 2
76+
import dis
77+
dis.dis(traced_function)
78+
while target_instruction < num_instructions:
79+
target_instruction += 1
80+
raise_after_instruction(traced_function, target_instruction)
81+
if verbose:
82+
print(f"Raising exception after {target_instruction}")
83+
try:
84+
traced_function()
85+
except RuntimeError:
86+
# key invariant: if we entered the CM, we exited it
87+
self.assert_cm_exited(tracking_cm, target_instruction, traced_function)
88+
else:
89+
self.fail(f"Exception wasn't raised @{target_instruction}")
90+
91+
92+
def test_asynchronous_cm(self):
93+
class AsyncTrackingCM():
94+
def __init__(self):
95+
self.enter_without_exit = None
96+
async def __aenter__(self):
97+
self.enter_without_exit = True
98+
async def __aexit__(self, *args):
99+
self.enter_without_exit = False
100+
tracking_cm = AsyncTrackingCM()
101+
async def traced_coroutine():
102+
async with tracking_cm:
103+
1 + 1
104+
return
105+
target_instruction = -1
106+
num_instructions = len(traced_coroutine.__code__.co_code) - 2
107+
loop = asyncio.get_event_loop()
108+
while target_instruction < num_instructions:
109+
target_instruction += 1
110+
raise_after_instruction(traced_coroutine, target_instruction)
111+
if verbose:
112+
print(f"Raising exception after {target_instruction}")
113+
try:
114+
loop.run_until_complete(traced_coroutine())
115+
except RuntimeError:
116+
# key invariant: if we entered the CM, we exited it
117+
self.assert_cm_exited(tracking_cm, target_instruction, traced_coroutine)
118+
else:
119+
self.fail(f"Exception wasn't raised @{target_instruction}")
120+
121+
122+
if __name__ == '__main__':
123+
unittest.main()

0 commit comments

Comments
 (0)