2222from typing import (
2323 Any ,
2424 Callable ,
25+ Iterable ,
2526 NewType ,
2627 Optional ,
2728 cast ,
5758DISABLE_TYPE = NewType ("DISABLE_TYPE" , object )
5859MonitorReturn = Optional [DISABLE_TYPE ]
5960DISABLE = cast (MonitorReturn , getattr (sys_monitoring , "DISABLE" , None ))
61+ TOffset = int
6062
61- ALWAYS_JUMPS = {
62- dis .opmap [name ] for name in
63- ["JUMP_FORWARD" , "JUMP_BACKWARD" , "JUMP_BACKWARD_NO_INTERRUPT" ]
64- }
63+ ALWAYS_JUMPS : set [int ] = set ()
64+ RETURNS : set [int ] = set ()
6565
66- RETURNS = {
67- dis .opmap [name ] for name in ["RETURN_VALUE" , "RETURN_GENERATOR" ]
68- }
66+ if env .PYBEHAVIOR .branch_right_left :
67+ ALWAYS_JUMPS .update (
68+ dis .opmap [name ]
69+ for name in ["JUMP_FORWARD" , "JUMP_BACKWARD" , "JUMP_BACKWARD_NO_INTERRUPT" ]
70+ )
71+
72+ RETURNS .update (dis .opmap [name ] for name in ["RETURN_VALUE" , "RETURN_GENERATOR" ])
6973
7074
7175if LOG : # pragma: debugging
@@ -175,16 +179,26 @@ def _decorator(meth: AnyCallable) -> AnyCallable:
175179
176180
177181class InstructionWalker :
178- def __init__ (self , code : CodeType ):
182+ """Utility to step through trails of instructions."""
183+
184+ def __init__ (self , code : CodeType ) -> None :
179185 self .code = code
180- self .insts : dict [int , dis .Instruction ] = {}
186+ self .insts : dict [TOffset , dis .Instruction ] = {}
181187
188+ inst = None
182189 for inst in dis .get_instructions (code ):
183190 self .insts [inst .offset ] = inst
184191
192+ assert inst is not None
185193 self .max_offset = inst .offset
186194
187- def walk (self , * , start_at = 0 , follow_jumps = True ):
195+ def walk (
196+ self , * , start_at : TOffset = 0 , follow_jumps : bool = True
197+ ) -> Iterable [dis .Instruction ]:
198+ """
199+ Yield instructions starting from `start_at`. Follow unconditional
200+ jumps if `follow_jumps` is true.
201+ """
188202 seen = set ()
189203 offset = start_at
190204 while offset < self .max_offset + 1 :
@@ -199,52 +213,57 @@ def walk(self, *, start_at=0, follow_jumps=True):
199213 offset += 2
200214
201215
202- def populate_branch_trails (code : CodeType , code_info : CodeInfo ) -> tuple [list [int ], TArc | None ]:
216+ def populate_branch_trails (code : CodeType , code_info : CodeInfo ) -> None :
217+ """
218+ Populate the `branch_trails` attribute on `code_info`.
219+ """
203220 iwalker = InstructionWalker (code )
204221 for inst in iwalker .walk (follow_jumps = False ):
205222 log (f"considering { inst = } " )
206223 if not inst .jump_target :
207- log (f "no jump_target" )
224+ log ("no jump_target" )
208225 continue
209226 if inst .opcode in ALWAYS_JUMPS :
210- log (f "always jumps" )
227+ log ("always jumps" )
211228 continue
212229
213230 from_line = inst .line_number
231+ assert from_line is not None
214232
215- def walkabout (start_at , branch_kind ):
216- insts = []
233+ def walk_one_branch (
234+ start_at : TOffset , branch_kind : str
235+ ) -> tuple [list [TOffset ], TArc | None ]:
236+ # pylint: disable=cell-var-from-loop
237+ inst_offsets : list [TOffset ] = []
217238 to_line = None
218239 for inst2 in iwalker .walk (start_at = start_at ):
219- insts .append (inst2 .offset )
240+ inst_offsets .append (inst2 .offset )
220241 if inst2 .line_number and inst2 .line_number != from_line :
221242 to_line = inst2 .line_number
222243 break
223244 elif inst2 .jump_target and (inst2 .opcode not in ALWAYS_JUMPS ):
224- log (f"stop: { inst2 .jump_target = } , { inst2 .opcode = } ({ dis .opname [inst2 .opcode ]} ), { ALWAYS_JUMPS = } " )
245+ log (
246+ f"stop: { inst2 .jump_target = } , "
247+ + f"{ inst2 .opcode = } ({ dis .opname [inst2 .opcode ]} ), "
248+ + f"{ ALWAYS_JUMPS = } "
249+ )
225250 break
226251 elif inst2 .opcode in RETURNS :
227252 to_line = - code .co_firstlineno
228253 break
229- # if to_line is None:
230- # import contextlib
231- # with open("/tmp/foo.out", "a") as f:
232- # with contextlib.redirect_stdout(f):
233- # print()
234- # print(f"{code = }")
235- # print(f"{from_line = }, {to_line = }, {start_at = }")
236- # dis.dis(code)
237- # 1/0
238254 if to_line is not None :
239- log (f"possible branch from @{ start_at } : { insts } , { (from_line , to_line )} { code } " )
240- return insts , (from_line , to_line )
255+ log (
256+ f"possible branch from @{ start_at } : "
257+ + f"{ inst_offsets } , { (from_line , to_line )} { code } "
258+ )
259+ return inst_offsets , (from_line , to_line )
241260 else :
242- log (f" no possible branch from @{ start_at } : { insts } " )
261+ log (f" no possible branch from @{ start_at } : { inst_offsets } " )
243262 return [], None
244263
245264 code_info .branch_trails [inst .offset ] = (
246- walkabout (start_at = inst .offset + 2 , branch_kind = "not-taken" ),
247- walkabout (start_at = inst .jump_target , branch_kind = "taken" ),
265+ walk_one_branch (start_at = inst .offset + 2 , branch_kind = "not-taken" ),
266+ walk_one_branch (start_at = inst .jump_target , branch_kind = "taken" ),
248267 )
249268
250269
@@ -254,7 +273,7 @@ class CodeInfo:
254273
255274 tracing : bool
256275 file_data : TTraceFileData | None
257- byte_to_line : dict [int , int ] | None
276+ byte_to_line : dict [TOffset , TLineNo ] | None
258277 # Keys are start instruction offsets for branches.
259278 # Values are two tuples:
260279 # (
@@ -263,15 +282,15 @@ class CodeInfo:
263282 # )
264283 # Two possible trails from the branch point, left and right.
265284 branch_trails : dict [
266- int ,
285+ TOffset ,
267286 tuple [
268- tuple [list [int ], TArc ] | None ,
269- tuple [list [int ], TArc ] | None ,
270- ]
287+ tuple [list [TOffset ], TArc | None ] ,
288+ tuple [list [TOffset ], TArc | None ] ,
289+ ],
271290 ]
272291
273292
274- def bytes_to_lines (code : CodeType ) -> dict [int , int ]:
293+ def bytes_to_lines (code : CodeType ) -> dict [TOffset , TLineNo ]:
275294 """Make a dict mapping byte code offsets to line numbers."""
276295 b2l = {}
277296 for bstart , bend , lineno in code .co_lines ():
@@ -335,15 +354,21 @@ def start(self) -> None:
335354 sys_monitoring .use_tool_id (self .myid , "coverage.py" )
336355 register = functools .partial (sys_monitoring .register_callback , self .myid )
337356 events = sys .monitoring .events
338- import contextlib
339357
340358 sys_monitoring .set_events (self .myid , events .PY_START )
341359 register (events .PY_START , self .sysmon_py_start )
342360 if self .trace_arcs :
343361 register (events .PY_RETURN , self .sysmon_py_return )
344362 register (events .LINE , self .sysmon_line_arcs )
345- register (events .BRANCH_RIGHT , self .sysmon_branch_either ) # type:ignore[attr-defined]
346- register (events .BRANCH_LEFT , self .sysmon_branch_either ) # type:ignore[attr-defined]
363+ if env .PYBEHAVIOR .branch_right_left :
364+ register (
365+ events .BRANCH_RIGHT , # type:ignore[attr-defined]
366+ self .sysmon_branch_either ,
367+ )
368+ register (
369+ events .BRANCH_LEFT , # type:ignore[attr-defined]
370+ self .sysmon_branch_either ,
371+ )
347372 else :
348373 register (events .LINE , self .sysmon_line_lines )
349374 sys_monitoring .restart_events ()
@@ -385,7 +410,7 @@ def get_stats(self) -> dict[str, int] | None:
385410
386411 @panopticon ("code" , "@" )
387412 def sysmon_py_start ( # pylint: disable=useless-return
388- self , code : CodeType , instruction_offset : int
413+ self , code : CodeType , instruction_offset : TOffset
389414 ) -> MonitorReturn :
390415 """Handle sys.monitoring.events.PY_START events."""
391416 # Entering a new frame. Decide if we should trace in this file.
@@ -433,7 +458,7 @@ def sysmon_py_start( # pylint: disable=useless-return
433458 branch_trails = {},
434459 )
435460 self .code_infos [id (code )] = code_info
436- populate_branch_trails (code , code_info ) # TODO: should be a method?
461+ populate_branch_trails (code , code_info ) # TODO: should be a method?
437462 self .code_objects .append (code )
438463
439464 if tracing_code :
@@ -445,7 +470,8 @@ def sysmon_py_start( # pylint: disable=useless-return
445470 if self .trace_arcs :
446471 assert env .PYBEHAVIOR .branch_right_left
447472 local_events |= (
448- events .BRANCH_RIGHT | events .BRANCH_LEFT # type:ignore[attr-defined]
473+ events .BRANCH_RIGHT # type:ignore[attr-defined]
474+ | events .BRANCH_LEFT # type:ignore[attr-defined]
449475 )
450476 sys_monitoring .set_local_events (self .myid , code , local_events )
451477 # 111963:
@@ -457,7 +483,7 @@ def sysmon_py_start( # pylint: disable=useless-return
457483 def sysmon_py_return ( # pylint: disable=useless-return
458484 self ,
459485 code : CodeType ,
460- instruction_offset : int ,
486+ instruction_offset : TOffset ,
461487 retval : object ,
462488 ) -> MonitorReturn :
463489 """Handle sys.monitoring.events.PY_RETURN events for branch coverage."""
@@ -472,7 +498,7 @@ def sysmon_py_return( # pylint: disable=useless-return
472498 return None
473499
474500 @panopticon ("code" , "line" )
475- def sysmon_line_lines (self , code : CodeType , line_number : int ) -> MonitorReturn :
501+ def sysmon_line_lines (self , code : CodeType , line_number : TLineNo ) -> MonitorReturn :
476502 """Handle sys.monitoring.events.LINE events for line coverage."""
477503 code_info = self .code_infos [id (code )]
478504 if code_info .file_data is not None :
@@ -481,7 +507,7 @@ def sysmon_line_lines(self, code: CodeType, line_number: int) -> MonitorReturn:
481507 return DISABLE
482508
483509 @panopticon ("code" , "line" )
484- def sysmon_line_arcs (self , code : CodeType , line_number : int ) -> MonitorReturn :
510+ def sysmon_line_arcs (self , code : CodeType , line_number : TLineNo ) -> MonitorReturn :
485511 """Handle sys.monitoring.events.LINE events for branch coverage."""
486512 code_info = self .code_infos [id (code )]
487513 if code_info .file_data is not None :
@@ -492,7 +518,7 @@ def sysmon_line_arcs(self, code: CodeType, line_number: int) -> MonitorReturn:
492518
493519 @panopticon ("code" , "@" , "@" )
494520 def sysmon_branch_either (
495- self , code : CodeType , instruction_offset : int , destination_offset : int
521+ self , code : CodeType , instruction_offset : TOffset , destination_offset : TOffset
496522 ) -> MonitorReturn :
497523 """Handle BRANCH_RIGHT and BRANCH_LEFT events."""
498524 code_info = self .code_infos [id (code )]
0 commit comments