6
6
import asyncio
7
7
import dis
8
8
import sys
9
+ import threading
9
10
import unittest
10
11
11
12
class InjectedException (Exception ):
@@ -20,8 +21,9 @@ def raise_after_offset(target_function, target_offset):
20
21
"""
21
22
target_code = target_function .__code__
22
23
def inject_exception ():
23
- print ("Raising injected exception" )
24
- raise InjectedException (f"Failing after { target_offset } " )
24
+ exc = InjectedException (f"Failing after { target_offset } " )
25
+ print (f"Raising injected exception: { exc } " )
26
+ raise exc
25
27
# This installs a trace hook that's implemented in C, and hence won't
26
28
# trigger any of the per-bytecode processing in the eval loop
27
29
# This means it can register the pending call that raises the exception and
@@ -51,24 +53,22 @@ def setUp(self):
51
53
self .addCleanup (sys .settrace , old_trace )
52
54
sys .settrace (None )
53
55
54
- def assert_cm_exited (self , tracking_cm , target_offset , traced_operation ):
55
- if tracking_cm .enter_without_exit :
56
+ def assert_lock_released (self , test_lock , target_offset , traced_operation ):
57
+ just_acquired = test_lock .acquire (blocking = False )
58
+ # Either we just acquired the lock, or the test didn't release it
59
+ test_lock .release ()
60
+ if not just_acquired :
56
61
msg = ("Context manager entered without exit due to "
57
62
f"exception injected at offset { target_offset } in:\n "
58
63
f"{ dis .Bytecode (traced_operation ).dis ()} " )
59
64
self .fail (msg )
60
65
61
66
def test_synchronous_cm (self ):
62
- class TrackingCM ():
63
- def __init__ (self ):
64
- self .enter_without_exit = None
65
- def __enter__ (self ):
66
- self .enter_without_exit = True
67
- def __exit__ (self , * args ):
68
- self .enter_without_exit = False
69
- tracking_cm = TrackingCM ()
67
+ # Must use a signal-safe CM, otherwise __exit__ will start
68
+ # but then fail to actually run as the pending call gets processed
69
+ test_lock = threading .Lock ()
70
70
def traced_function ():
71
- with tracking_cm :
71
+ with test_lock :
72
72
1 + 1
73
73
return
74
74
target_offset = - 1
@@ -80,12 +80,20 @@ def traced_function():
80
80
traced_function ()
81
81
except InjectedException :
82
82
# key invariant: if we entered the CM, we exited it
83
- self .assert_cm_exited ( tracking_cm , target_offset , traced_function )
83
+ self .assert_lock_released ( test_lock , target_offset , traced_function )
84
84
else :
85
85
self .fail (f"Exception wasn't raised @{ target_offset } " )
86
86
87
87
88
- def test_asynchronous_cm (self ):
88
+ def _test_asynchronous_cm (self ):
89
+ # NOTE: this can't work, since asyncio is written in Python, and hence
90
+ # will always process pending calls at some point during the evaluation
91
+ # of __aenter__ and __aexit__
92
+ #
93
+ # So to handle that case, we need to some way to tell the event loop
94
+ # to convert pending call processing into calls to
95
+ # asyncio.get_event_loop().call_soon() instead of processing them
96
+ # immediately
89
97
class AsyncTrackingCM ():
90
98
def __init__ (self ):
91
99
self .enter_without_exit = None
@@ -108,7 +116,7 @@ async def traced_coroutine():
108
116
loop .run_until_complete (traced_coroutine ())
109
117
except InjectedException :
110
118
# key invariant: if we entered the CM, we exited it
111
- self .assert_cm_exited (tracking_cm , target_offset , traced_coroutine )
119
+ self .assertFalse (tracking_cm . enter_without_exit )
112
120
else :
113
121
self .fail (f"Exception wasn't raised @{ target_offset } " )
114
122
0 commit comments