2
2
"""
3
3
4
4
from test .support import cpython_only , verbose
5
+ from _testcapi import install_error_injection_hook
5
6
import asyncio
6
7
import dis
7
8
import sys
@@ -11,24 +12,23 @@ class InjectedException(Exception):
11
12
"""Exception injected into a running frame via a trace function"""
12
13
pass
13
14
14
- def raise_after_instruction (target_function , target_instruction ):
15
+ def raise_after_offset (target_function , target_offset ):
15
16
"""Sets a trace function to inject an exception into given function
16
17
17
18
Relies on the ability to request that a trace function be called for
18
19
every executed opcode, not just every line
19
20
"""
20
21
target_code = target_function .__code__
21
- def inject_exception (frame , event , arg ):
22
- if frame .f_code is not target_code :
23
- return
24
- frame .f_trace_opcodes = True
25
- if frame .f_lasti >= target_instruction :
26
- if frame .f_lasti > frame .f_pendingi :
27
- raise InjectedException (f"Failing after { frame .f_lasti } " )
28
- return inject_exception
29
- sys .settrace (inject_exception )
30
-
31
- # TODO: Add a test case that ensures raise_after_instruction is working
22
+ def inject_exception ():
23
+ print ("Raising injected exception" )
24
+ raise InjectedException (f"Failing after { target_offset } " )
25
+ # This installs a trace hook that's implemented in C, and hence won't
26
+ # trigger any of the per-bytecode processing in the eval loop
27
+ # This means it can register the pending call that raises the exception and
28
+ # the pending call won't be processed until after the trace hook returns
29
+ install_error_injection_hook (target_code , target_offset , inject_exception )
30
+
31
+ # TODO: Add a test case that ensures raise_after_offset is working
32
32
# properly (otherwise there's a risk the tests will pass due to the
33
33
# exception not being injected properly)
34
34
@@ -51,11 +51,11 @@ def setUp(self):
51
51
self .addCleanup (sys .settrace , old_trace )
52
52
sys .settrace (None )
53
53
54
- def assert_cm_exited (self , tracking_cm , target_instruction , traced_operation ):
54
+ def assert_cm_exited (self , tracking_cm , target_offset , traced_operation ):
55
55
if tracking_cm .enter_without_exit :
56
56
msg = ("Context manager entered without exit due to "
57
- f"exception injected at offset { target_instruction } in:\n "
58
- f"{ dis .Bytecode (traced_operation ).dis ()} " )
57
+ f"exception injected at offset { target_offset } in:\n "
58
+ f"{ dis .Bytecode (traced_operation ).dis ()} " )
59
59
self .fail (msg )
60
60
61
61
def test_synchronous_cm (self ):
@@ -71,21 +71,21 @@ def traced_function():
71
71
with tracking_cm :
72
72
1 + 1
73
73
return
74
- target_instruction = - 1
75
- num_instructions = len (traced_function .__code__ .co_code ) - 2
76
- while target_instruction < num_instructions :
77
- target_instruction += 1
78
- raise_after_instruction (traced_function , target_instruction )
74
+ target_offset = - 1
75
+ max_offset = len (traced_function .__code__ .co_code ) - 2
76
+ while target_offset < max_offset :
77
+ target_offset += 1
78
+ raise_after_offset (traced_function , target_offset )
79
79
try :
80
80
traced_function ()
81
81
except InjectedException :
82
82
# key invariant: if we entered the CM, we exited it
83
- self .assert_cm_exited (tracking_cm , target_instruction , traced_function )
83
+ self .assert_cm_exited (tracking_cm , target_offset , traced_function )
84
84
else :
85
- self .fail (f"Exception wasn't raised @{ target_instruction } " )
85
+ self .fail (f"Exception wasn't raised @{ target_offset } " )
86
86
87
87
88
- def test_asynchronous_cm (self ):
88
+ def _test_asynchronous_cm (self ):
89
89
class AsyncTrackingCM ():
90
90
def __init__ (self ):
91
91
self .enter_without_exit = None
@@ -98,19 +98,19 @@ async def traced_coroutine():
98
98
async with tracking_cm :
99
99
1 + 1
100
100
return
101
- target_instruction = - 1
102
- num_instructions = len (traced_coroutine .__code__ .co_code ) - 2
101
+ target_offset = - 1
102
+ max_offset = len (traced_coroutine .__code__ .co_code ) - 2
103
103
loop = asyncio .get_event_loop ()
104
- while target_instruction < num_instructions :
105
- target_instruction += 1
106
- raise_after_instruction (traced_coroutine , target_instruction )
104
+ while target_offset < max_offset :
105
+ target_offset += 1
106
+ raise_after_offset (traced_coroutine , target_offset )
107
107
try :
108
108
loop .run_until_complete (traced_coroutine ())
109
109
except InjectedException :
110
110
# key invariant: if we entered the CM, we exited it
111
- self .assert_cm_exited (tracking_cm , target_instruction , traced_coroutine )
111
+ self .assert_cm_exited (tracking_cm , target_offset , traced_coroutine )
112
112
else :
113
- self .fail (f"Exception wasn't raised @{ target_instruction } " )
113
+ self .fail (f"Exception wasn't raised @{ target_offset } " )
114
114
115
115
116
116
if __name__ == '__main__' :
0 commit comments