2222from  typing  import  (
2323    Any ,
2424    Callable ,
25+     Iterable ,
2526    NewType ,
2627    Optional ,
2728    cast ,
5758DISABLE_TYPE  =  NewType ("DISABLE_TYPE" , object )
5859MonitorReturn  =  Optional [DISABLE_TYPE ]
5960DISABLE  =  cast (MonitorReturn , getattr (sys_monitoring , "DISABLE" , None ))
61+ TOffset  =  int 
6062
61- ALWAYS_JUMPS  =  {
62-     dis .opmap [name ] for  name  in 
63-     ["JUMP_FORWARD" , "JUMP_BACKWARD" , "JUMP_BACKWARD_NO_INTERRUPT" ]
64- }
63+ ALWAYS_JUMPS : set [int ] =  set ()
64+ RETURNS : set [int ] =  set ()
6565
66- RETURNS  =  {
67-     dis .opmap [name ] for  name  in  ["RETURN_VALUE" , "RETURN_GENERATOR" ]
68- }
66+ if  env .PYBEHAVIOR .branch_right_left :
67+     ALWAYS_JUMPS .update (
68+         dis .opmap [name ]
69+         for  name  in  ["JUMP_FORWARD" , "JUMP_BACKWARD" , "JUMP_BACKWARD_NO_INTERRUPT" ]
70+     )
71+ 
72+     RETURNS .update (dis .opmap [name ] for  name  in  ["RETURN_VALUE" , "RETURN_GENERATOR" ])
6973
7074
7175if  LOG :  # pragma: debugging 
@@ -175,16 +179,26 @@ def _decorator(meth: AnyCallable) -> AnyCallable:
175179
176180
177181class  InstructionWalker :
178-     def  __init__ (self , code : CodeType ):
182+     """Utility to step through trails of instructions.""" 
183+ 
184+     def  __init__ (self , code : CodeType ) ->  None :
179185        self .code  =  code 
180-         self .insts : dict [int , dis .Instruction ] =  {}
186+         self .insts : dict [TOffset , dis .Instruction ] =  {}
181187
188+         inst  =  None 
182189        for  inst  in  dis .get_instructions (code ):
183190            self .insts [inst .offset ] =  inst 
184191
192+         assert  inst  is  not   None 
185193        self .max_offset  =  inst .offset 
186194
187-     def  walk (self , * , start_at = 0 , follow_jumps = True ):
195+     def  walk (
196+         self , * , start_at : TOffset  =  0 , follow_jumps : bool  =  True 
197+     ) ->  Iterable [dis .Instruction ]:
198+         """ 
199+         Yield instructions starting from `start_at`.  Follow unconditional 
200+         jumps if `follow_jumps` is true. 
201+         """ 
188202        seen  =  set ()
189203        offset  =  start_at 
190204        while  offset  <  self .max_offset  +  1 :
@@ -199,52 +213,57 @@ def walk(self, *, start_at=0, follow_jumps=True):
199213            offset  +=  2 
200214
201215
202- def  populate_branch_trails (code : CodeType , code_info : CodeInfo ) ->  tuple [list [int ], TArc  |  None ]:
216+ def  populate_branch_trails (code : CodeType , code_info : CodeInfo ) ->  None :
217+     """ 
218+     Populate the `branch_trails` attribute on `code_info`. 
219+     """ 
203220    iwalker  =  InstructionWalker (code )
204221    for  inst  in  iwalker .walk (follow_jumps = False ):
205222        log (f"considering { inst = }  " )
206223        if  not  inst .jump_target :
207-             log (f "no jump_target" )
224+             log ("no jump_target" )
208225            continue 
209226        if  inst .opcode  in  ALWAYS_JUMPS :
210-             log (f "always jumps" )
227+             log ("always jumps" )
211228            continue 
212229
213230        from_line  =  inst .line_number 
231+         assert  from_line  is  not   None 
214232
215-         def  walkabout (start_at , branch_kind ):
216-             insts  =  []
233+         def  walk_one_branch (
234+             start_at : TOffset , branch_kind : str 
235+         ) ->  tuple [list [TOffset ], TArc  |  None ]:
236+             # pylint: disable=cell-var-from-loop 
237+             inst_offsets : list [TOffset ] =  []
217238            to_line  =  None 
218239            for  inst2  in  iwalker .walk (start_at = start_at ):
219-                 insts .append (inst2 .offset )
240+                 inst_offsets .append (inst2 .offset )
220241                if  inst2 .line_number  and  inst2 .line_number  !=  from_line :
221242                    to_line  =  inst2 .line_number 
222243                    break 
223244                elif  inst2 .jump_target  and  (inst2 .opcode  not  in   ALWAYS_JUMPS ):
224-                     log (f"stop: { inst2 .jump_target = }  , { inst2 .opcode = }   ({ dis .opname [inst2 .opcode ]}  ), { ALWAYS_JUMPS = }  " )
245+                     log (
246+                         f"stop: { inst2 .jump_target = }  , " 
247+                         +  f"{ inst2 .opcode = }   ({ dis .opname [inst2 .opcode ]}  ), " 
248+                         +  f"{ ALWAYS_JUMPS = }  " 
249+                     )
225250                    break 
226251                elif  inst2 .opcode  in  RETURNS :
227252                    to_line  =  - code .co_firstlineno 
228253                    break 
229-             # if to_line is None: 
230-             #     import contextlib 
231-             #     with open("/tmp/foo.out", "a") as f: 
232-             #         with contextlib.redirect_stdout(f): 
233-             #             print() 
234-             #             print(f"{code = }") 
235-             #             print(f"{from_line = }, {to_line = }, {start_at = }") 
236-             #             dis.dis(code) 
237-             #     1/0 
238254            if  to_line  is  not   None :
239-                 log (f"possible branch from @{ start_at }  : { insts }  , { (from_line , to_line )}   { code }  " )
240-                 return  insts , (from_line , to_line )
255+                 log (
256+                     f"possible branch from @{ start_at }  : " 
257+                     +  f"{ inst_offsets }  , { (from_line , to_line )}   { code }  " 
258+                 )
259+                 return  inst_offsets , (from_line , to_line )
241260            else :
242-                 log (f" no possible branch from @{ start_at }  : { insts }  " )
261+                 log (f" no possible branch from @{ start_at }  : { inst_offsets }  " )
243262                return  [], None 
244263
245264        code_info .branch_trails [inst .offset ] =  (
246-             walkabout (start_at = inst .offset  +  2 , branch_kind = "not-taken" ),
247-             walkabout (start_at = inst .jump_target , branch_kind = "taken" ),
265+             walk_one_branch (start_at = inst .offset  +  2 , branch_kind = "not-taken" ),
266+             walk_one_branch (start_at = inst .jump_target , branch_kind = "taken" ),
248267        )
249268
250269
@@ -254,7 +273,7 @@ class CodeInfo:
254273
255274    tracing : bool 
256275    file_data : TTraceFileData  |  None 
257-     byte_to_line : dict [int ,  int ] |  None 
276+     byte_to_line : dict [TOffset ,  TLineNo ] |  None 
258277    # Keys are start instruction offsets for branches. 
259278    # Values are two tuples: 
260279    #   ( 
@@ -263,15 +282,15 @@ class CodeInfo:
263282    #   ) 
264283    #   Two possible trails from the branch point, left and right. 
265284    branch_trails : dict [
266-         int ,
285+         TOffset ,
267286        tuple [
268-             tuple [list [int ], TArc ]  |  None ,
269-             tuple [list [int ], TArc ]  |  None ,
270-         ]
287+             tuple [list [TOffset ], TArc  |  None ] ,
288+             tuple [list [TOffset ], TArc  |  None ] ,
289+         ], 
271290    ]
272291
273292
274- def  bytes_to_lines (code : CodeType ) ->  dict [int ,  int ]:
293+ def  bytes_to_lines (code : CodeType ) ->  dict [TOffset ,  TLineNo ]:
275294    """Make a dict mapping byte code offsets to line numbers.""" 
276295    b2l  =  {}
277296    for  bstart , bend , lineno  in  code .co_lines ():
@@ -335,15 +354,21 @@ def start(self) -> None:
335354        sys_monitoring .use_tool_id (self .myid , "coverage.py" )
336355        register  =  functools .partial (sys_monitoring .register_callback , self .myid )
337356        events  =  sys .monitoring .events 
338-         import  contextlib 
339357
340358        sys_monitoring .set_events (self .myid , events .PY_START )
341359        register (events .PY_START , self .sysmon_py_start )
342360        if  self .trace_arcs :
343361            register (events .PY_RETURN , self .sysmon_py_return )
344362            register (events .LINE , self .sysmon_line_arcs )
345-             register (events .BRANCH_RIGHT , self .sysmon_branch_either )    # type:ignore[attr-defined] 
346-             register (events .BRANCH_LEFT , self .sysmon_branch_either )     # type:ignore[attr-defined] 
363+             if  env .PYBEHAVIOR .branch_right_left :
364+                 register (
365+                     events .BRANCH_RIGHT ,  # type:ignore[attr-defined] 
366+                     self .sysmon_branch_either ,
367+                 )
368+                 register (
369+                     events .BRANCH_LEFT ,  # type:ignore[attr-defined] 
370+                     self .sysmon_branch_either ,
371+                 )
347372        else :
348373            register (events .LINE , self .sysmon_line_lines )
349374        sys_monitoring .restart_events ()
@@ -385,7 +410,7 @@ def get_stats(self) -> dict[str, int] | None:
385410
386411    @panopticon ("code" , "@" ) 
387412    def  sysmon_py_start (  # pylint: disable=useless-return 
388-         self , code : CodeType , instruction_offset : int 
413+         self , code : CodeType , instruction_offset : TOffset 
389414    ) ->  MonitorReturn :
390415        """Handle sys.monitoring.events.PY_START events.""" 
391416        # Entering a new frame.  Decide if we should trace in this file. 
@@ -433,7 +458,7 @@ def sysmon_py_start(  # pylint: disable=useless-return
433458                branch_trails = {},
434459            )
435460            self .code_infos [id (code )] =  code_info 
436-             populate_branch_trails (code , code_info )      # TODO: should be a method? 
461+             populate_branch_trails (code , code_info )  # TODO: should be a method? 
437462            self .code_objects .append (code )
438463
439464            if  tracing_code :
@@ -445,7 +470,8 @@ def sysmon_py_start(  # pylint: disable=useless-return
445470                        if  self .trace_arcs :
446471                            assert  env .PYBEHAVIOR .branch_right_left 
447472                            local_events  |=  (
448-                                 events .BRANCH_RIGHT  |  events .BRANCH_LEFT  # type:ignore[attr-defined] 
473+                                 events .BRANCH_RIGHT   # type:ignore[attr-defined] 
474+                                 |  events .BRANCH_LEFT   # type:ignore[attr-defined] 
449475                            )
450476                        sys_monitoring .set_local_events (self .myid , code , local_events )
451477                        # 111963: 
@@ -457,7 +483,7 @@ def sysmon_py_start(  # pylint: disable=useless-return
457483    def  sysmon_py_return (  # pylint: disable=useless-return 
458484        self ,
459485        code : CodeType ,
460-         instruction_offset : int ,
486+         instruction_offset : TOffset ,
461487        retval : object ,
462488    ) ->  MonitorReturn :
463489        """Handle sys.monitoring.events.PY_RETURN events for branch coverage.""" 
@@ -472,7 +498,7 @@ def sysmon_py_return(  # pylint: disable=useless-return
472498        return  None 
473499
474500    @panopticon ("code" , "line" ) 
475-     def  sysmon_line_lines (self , code : CodeType , line_number : int ) ->  MonitorReturn :
501+     def  sysmon_line_lines (self , code : CodeType , line_number : TLineNo ) ->  MonitorReturn :
476502        """Handle sys.monitoring.events.LINE events for line coverage.""" 
477503        code_info  =  self .code_infos [id (code )]
478504        if  code_info .file_data  is  not   None :
@@ -481,7 +507,7 @@ def sysmon_line_lines(self, code: CodeType, line_number: int) -> MonitorReturn:
481507        return  DISABLE 
482508
483509    @panopticon ("code" , "line" ) 
484-     def  sysmon_line_arcs (self , code : CodeType , line_number : int ) ->  MonitorReturn :
510+     def  sysmon_line_arcs (self , code : CodeType , line_number : TLineNo ) ->  MonitorReturn :
485511        """Handle sys.monitoring.events.LINE events for branch coverage.""" 
486512        code_info  =  self .code_infos [id (code )]
487513        if  code_info .file_data  is  not   None :
@@ -492,7 +518,7 @@ def sysmon_line_arcs(self, code: CodeType, line_number: int) -> MonitorReturn:
492518
493519    @panopticon ("code" , "@" , "@" ) 
494520    def  sysmon_branch_either (
495-         self , code : CodeType , instruction_offset : int , destination_offset : int 
521+         self , code : CodeType , instruction_offset : TOffset , destination_offset : TOffset 
496522    ) ->  MonitorReturn :
497523        """Handle BRANCH_RIGHT and BRANCH_LEFT events.""" 
498524        code_info  =  self .code_infos [id (code )]
0 commit comments