|
4 | 4 | import typing as t |
5 | 5 |
|
6 | 6 | from ddtrace.internal.injection import HookType |
| 7 | +from ddtrace.internal.logger import get_logger |
7 | 8 | from ddtrace.internal.test_visibility.coverage_lines import CoverageLines |
8 | 9 |
|
9 | 10 |
|
| 11 | +log = get_logger(__name__) |
| 12 | + |
| 13 | + |
10 | 14 | # This is primarily to make mypy happy without having to nest the rest of this module behind a version check |
11 | 15 | assert sys.version_info >= (3, 12) # nosec |
12 | 16 |
|
|
17 | 21 | RETURN_CONST = dis.opmap["RETURN_CONST"] |
18 | 22 | EMPTY_MODULE_BYTES = bytes([RESUME, 0, RETURN_CONST, 0]) |
19 | 23 |
|
20 | | -# Register the coverage tool with the low-impact monitoring system |
21 | | -try: |
22 | | - sys.monitoring.use_tool_id(sys.monitoring.COVERAGE_ID, "datadog") # noqa |
23 | | -except ValueError: |
24 | | - # TODO: Another coverage tool is already in use. Either warn the user |
25 | | - # or free the tool and register ours. |
26 | | - def instrument_all_lines( |
27 | | - code: CodeType, hook: HookType, path: str, package: str |
28 | | - ) -> t.Tuple[CodeType, CoverageLines]: |
29 | | - # No-op |
| 24 | +_CODE_HOOKS: t.Dict[CodeType, t.Tuple[HookType, str, t.Dict[int, t.Tuple[str, t.Optional[t.Tuple[str]]]]]] = {} |
| 25 | + |
| 26 | + |
| 27 | +def instrument_all_lines(code: CodeType, hook: HookType, path: str, package: str) -> t.Tuple[CodeType, CoverageLines]: |
| 28 | + coverage_tool = sys.monitoring.get_tool(sys.monitoring.COVERAGE_ID) |
| 29 | + if coverage_tool is not None and coverage_tool != "datadog": |
| 30 | + log.debug("Coverage tool '%s' already registered, not gathering coverage", coverage_tool) |
30 | 31 | return code, CoverageLines() |
31 | 32 |
|
32 | | -else: |
33 | | - _CODE_HOOKS: t.Dict[CodeType, t.Tuple[HookType, str, t.Dict[int, t.Tuple[str, t.Optional[t.Tuple[str]]]]]] = {} |
| 33 | + if coverage_tool is None: |
| 34 | + log.debug("Registering code coverage tool") |
| 35 | + _register_monitoring() |
| 36 | + |
| 37 | + return _instrument_all_lines_with_monitoring(code, hook, path, package) |
| 38 | + |
| 39 | + |
| 40 | +def _line_event_handler(code: CodeType, line: int) -> t.Any: |
| 41 | + hook, path, import_names = _CODE_HOOKS[code] |
| 42 | + import_name = import_names.get(line, None) |
| 43 | + return hook((line, path, import_name)) |
| 44 | + |
34 | 45 |
|
35 | | - def _line_event_handler(code: CodeType, line: int) -> t.Any: |
36 | | - hook, path, import_names = _CODE_HOOKS[code] |
37 | | - import_name = import_names.get(line, None) |
38 | | - return hook((line, path, import_name)) |
| 46 | +def _register_monitoring(): |
| 47 | + """ |
| 48 | + Register the coverage tool with the low-impact monitoring system. |
| 49 | + """ |
| 50 | + sys.monitoring.use_tool_id(sys.monitoring.COVERAGE_ID, "datadog") |
39 | 51 |
|
40 | 52 | # Register the line callback |
41 | 53 | sys.monitoring.register_callback( |
42 | 54 | sys.monitoring.COVERAGE_ID, sys.monitoring.events.LINE, _line_event_handler |
43 | 55 | ) # noqa |
44 | 56 |
|
45 | | - def instrument_all_lines( |
46 | | - code: CodeType, hook: HookType, path: str, package: str |
47 | | - ) -> t.Tuple[CodeType, CoverageLines]: |
48 | | - # Enable local line events for the code object |
49 | | - sys.monitoring.set_local_events(sys.monitoring.COVERAGE_ID, code, sys.monitoring.events.LINE) # noqa |
50 | | - |
51 | | - # Collect all the line numbers in the code object |
52 | | - linestarts = dict(dis.findlinestarts(code)) |
53 | | - |
54 | | - lines = CoverageLines() |
55 | | - import_names: t.Dict[int, t.Tuple[str, t.Optional[t.Tuple[str, ...]]]] = {} |
56 | | - |
57 | | - # The previous two arguments are kept in order to track the depth of the IMPORT_NAME |
58 | | - # For example, from ...package import module |
59 | | - current_arg: int = 0 |
60 | | - previous_arg: int = 0 |
61 | | - _previous_previous_arg: int = 0 |
62 | | - current_import_name: t.Optional[str] = None |
63 | | - current_import_package: t.Optional[str] = None |
64 | | - |
65 | | - line = 0 |
66 | | - |
67 | | - ext: list[bytes] = [] |
68 | | - code_iter = iter(enumerate(code.co_code)) |
69 | | - try: |
70 | | - while True: |
71 | | - offset, opcode = next(code_iter) |
72 | | - _, arg = next(code_iter) |
73 | | - |
74 | | - if opcode == RESUME: |
75 | | - continue |
76 | | - |
77 | | - if offset in linestarts: |
78 | | - line = linestarts[offset] |
79 | | - lines.add(line) |
80 | | - |
81 | | - # Make sure that the current module is marked as depending on its own package by instrumenting the |
82 | | - # first executable line |
83 | | - if code.co_name == "<module>" and len(lines) == 1 and package is not None: |
84 | | - import_names[line] = (package, ("",)) |
85 | | - |
86 | | - if opcode is EXTENDED_ARG: |
87 | | - ext.append(arg) |
88 | | - continue |
| 57 | + |
| 58 | +def _instrument_all_lines_with_monitoring( |
| 59 | + code: CodeType, hook: HookType, path: str, package: str |
| 60 | +) -> t.Tuple[CodeType, CoverageLines]: |
| 61 | + # Enable local line events for the code object |
| 62 | + sys.monitoring.set_local_events(sys.monitoring.COVERAGE_ID, code, sys.monitoring.events.LINE) # noqa |
| 63 | + |
| 64 | + # Collect all the line numbers in the code object |
| 65 | + linestarts = dict(dis.findlinestarts(code)) |
| 66 | + |
| 67 | + lines = CoverageLines() |
| 68 | + import_names: t.Dict[int, t.Tuple[str, t.Optional[t.Tuple[str, ...]]]] = {} |
| 69 | + |
| 70 | + # The previous two arguments are kept in order to track the depth of the IMPORT_NAME |
| 71 | + # For example, from ...package import module |
| 72 | + current_arg: int = 0 |
| 73 | + previous_arg: int = 0 |
| 74 | + _previous_previous_arg: int = 0 |
| 75 | + current_import_name: t.Optional[str] = None |
| 76 | + current_import_package: t.Optional[str] = None |
| 77 | + |
| 78 | + line = 0 |
| 79 | + |
| 80 | + ext: list[bytes] = [] |
| 81 | + code_iter = iter(enumerate(code.co_code)) |
| 82 | + try: |
| 83 | + while True: |
| 84 | + offset, opcode = next(code_iter) |
| 85 | + _, arg = next(code_iter) |
| 86 | + |
| 87 | + if opcode == RESUME: |
| 88 | + continue |
| 89 | + |
| 90 | + if offset in linestarts: |
| 91 | + line = linestarts[offset] |
| 92 | + lines.add(line) |
| 93 | + |
| 94 | + # Make sure that the current module is marked as depending on its own package by instrumenting the |
| 95 | + # first executable line |
| 96 | + if code.co_name == "<module>" and len(lines) == 1 and package is not None: |
| 97 | + import_names[line] = (package, ("",)) |
| 98 | + |
| 99 | + if opcode is EXTENDED_ARG: |
| 100 | + ext.append(arg) |
| 101 | + continue |
| 102 | + else: |
| 103 | + _previous_previous_arg = previous_arg |
| 104 | + previous_arg = current_arg |
| 105 | + current_arg = int.from_bytes([*ext, arg], "big", signed=False) |
| 106 | + ext.clear() |
| 107 | + |
| 108 | + if opcode == IMPORT_NAME: |
| 109 | + import_depth: int = code.co_consts[_previous_previous_arg] |
| 110 | + current_import_name: str = code.co_names[current_arg] |
| 111 | + # Adjust package name if the import is relative and a parent (ie: if depth is more than 1) |
| 112 | + current_import_package: str = ( |
| 113 | + ".".join(package.split(".")[: -import_depth + 1]) if import_depth > 1 else package |
| 114 | + ) |
| 115 | + |
| 116 | + if line in import_names: |
| 117 | + import_names[line] = ( |
| 118 | + current_import_package, |
| 119 | + tuple(list(import_names[line][1]) + [current_import_name]), |
| 120 | + ) |
89 | 121 | else: |
90 | | - _previous_previous_arg = previous_arg |
91 | | - previous_arg = current_arg |
92 | | - current_arg = int.from_bytes([*ext, arg], "big", signed=False) |
93 | | - ext.clear() |
94 | | - |
95 | | - if opcode == IMPORT_NAME: |
96 | | - import_depth: int = code.co_consts[_previous_previous_arg] |
97 | | - current_import_name: str = code.co_names[current_arg] |
98 | | - # Adjust package name if the import is relative and a parent (ie: if depth is more than 1) |
99 | | - current_import_package: str = ( |
100 | | - ".".join(package.split(".")[: -import_depth + 1]) if import_depth > 1 else package |
| 122 | + import_names[line] = (current_import_package, (current_import_name,)) |
| 123 | + |
| 124 | + # Also track import from statements since it's possible that the "from" target is a module, eg: |
| 125 | + # from my_package import my_module |
| 126 | + # Since the package has not changed, we simply extend the previous import names with the new value |
| 127 | + if opcode == IMPORT_FROM: |
| 128 | + import_from_name = f"{current_import_name}.{code.co_names[current_arg]}" |
| 129 | + if line in import_names: |
| 130 | + import_names[line] = ( |
| 131 | + current_import_package, |
| 132 | + tuple(list(import_names[line][1]) + [import_from_name]), |
101 | 133 | ) |
| 134 | + else: |
| 135 | + import_names[line] = (current_import_package, (import_from_name,)) |
| 136 | + |
| 137 | + except StopIteration: |
| 138 | + pass |
| 139 | + |
| 140 | + # Recursively instrument nested code objects |
| 141 | + for nested_code in (_ for _ in code.co_consts if isinstance(_, CodeType)): |
| 142 | + _, nested_lines = instrument_all_lines(nested_code, hook, path, package) |
| 143 | + lines.update(nested_lines) |
| 144 | + |
| 145 | + # Register the hook and argument for the code object |
| 146 | + _CODE_HOOKS[code] = (hook, path, import_names) |
| 147 | + |
| 148 | + # Special case for empty modules (eg: __init__.py ): |
| 149 | + # Make sure line 0 is marked as executable, and add package dependency |
| 150 | + if not lines and code.co_name == "<module>" and code.co_code == EMPTY_MODULE_BYTES: |
| 151 | + lines.add(0) |
| 152 | + if package is not None: |
| 153 | + import_names[0] = (package, ("",)) |
102 | 154 |
|
103 | | - if line in import_names: |
104 | | - import_names[line] = ( |
105 | | - current_import_package, |
106 | | - tuple(list(import_names[line][1]) + [current_import_name]), |
107 | | - ) |
108 | | - else: |
109 | | - import_names[line] = (current_import_package, (current_import_name,)) |
110 | | - |
111 | | - # Also track import from statements since it's possible that the "from" target is a module, eg: |
112 | | - # from my_package import my_module |
113 | | - # Since the package has not changed, we simply extend the previous import names with the new value |
114 | | - if opcode == IMPORT_FROM: |
115 | | - import_from_name = f"{current_import_name}.{code.co_names[current_arg]}" |
116 | | - if line in import_names: |
117 | | - import_names[line] = ( |
118 | | - current_import_package, |
119 | | - tuple(list(import_names[line][1]) + [import_from_name]), |
120 | | - ) |
121 | | - else: |
122 | | - import_names[line] = (current_import_package, (import_from_name,)) |
123 | | - |
124 | | - except StopIteration: |
125 | | - pass |
126 | | - |
127 | | - # Recursively instrument nested code objects |
128 | | - for nested_code in (_ for _ in code.co_consts if isinstance(_, CodeType)): |
129 | | - _, nested_lines = instrument_all_lines(nested_code, hook, path, package) |
130 | | - lines.update(nested_lines) |
131 | | - |
132 | | - # Register the hook and argument for the code object |
133 | | - _CODE_HOOKS[code] = (hook, path, import_names) |
134 | | - |
135 | | - # Special case for empty modules (eg: __init__.py ): |
136 | | - # Make sure line 0 is marked as executable, and add package dependency |
137 | | - if not lines and code.co_name == "<module>" and code.co_code == EMPTY_MODULE_BYTES: |
138 | | - lines.add(0) |
139 | | - if package is not None: |
140 | | - import_names[0] = (package, ("",)) |
141 | | - |
142 | | - return code, lines |
| 155 | + return code, lines |
0 commit comments