Skip to content

Commit fe4e630

Browse files
authored
bukzor/di 1163 thread leak detection dark launch (#97320)
- **DI-1008: assert that tests don't leak threads** - **remove singletonproduer._shutdown_all() hack** - **custom diff, for unambiguous +/- thread blocks** - **attempt to keep only test_ (or testutils) frames, if we have them** - **pytest config: pytest.mark.thread_leak_allowlist** - **DI-1163: dark-launch thread-leak assertions** - **remove "hax"** - **test for thread-leak differ**
1 parent 4337d72 commit fe4e630

File tree

12 files changed

+547
-0
lines changed

12 files changed

+547
-0
lines changed

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ markers = [
217217
"sentry_metrics: test requires access to sentry metrics",
218218
"symbolicator: test requires access to symbolicator",
219219
"querybuilder: smoke tests for QueryBuilders",
220+
"thread_leak_allowlist: temporarily allow known-issue thread leaks",
220221
]
221222
filterwarnings = [
222223
# Consider all warnings to be errors other than the ignored ones.
@@ -583,6 +584,7 @@ module = [
583584
"sentry.testutils.pytest.json_report_reruns",
584585
"sentry.testutils.pytest.show_flaky_failures",
585586
"sentry.testutils.skips",
587+
"sentry.testutils.thread_leaks.*",
586588
"sentry.toolbar.utils.*",
587589
"sentry.types.*",
588590
"sentry.uptime.migrations.*",
@@ -774,6 +776,7 @@ module = [
774776
"tests.sentry.tasks.test_delete_seer_grouping_records",
775777
"tests.sentry.tasks.test_on_demand_metrics",
776778
"tests.sentry.testutils.helpers.*",
779+
"tests.sentry.testutils.thread_leaks.*",
777780
"tests.sentry.types.*",
778781
"tests.sentry.usage_accountant.*",
779782
"tests.sentry.users.api.bases.*",

src/sentry/testutils/thread_leaks/__init__.py

Whitespace-only changes.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
"""Utility used to regression-proof our thread-leak fixes.
2+
3+
It turns out due to various indirections, just showing the name and exact
4+
"target" of a thread isn't enough to find where/how it was spawned. This code
5+
does a bit of work to record and show exactly where the Thread came from, which
6+
proved essential when working to fix these things.
7+
"""
8+
9+
import os
10+
import threading
11+
import traceback
12+
from collections.abc import Generator
13+
from contextlib import contextmanager
14+
from threading import Thread
15+
from traceback import StackSummary
16+
from typing import Any
17+
from unittest import mock
18+
19+
from . import sentry
20+
from .diff import diff
21+
22+
_cwd = os.getcwd() + "/"
23+
del os # hygiene
24+
25+
26+
class ThreadLeakAssertionError(AssertionError):
27+
pass
28+
29+
30+
def _where(cwd: str = _cwd) -> StackSummary:
31+
stack = traceback.extract_stack()
32+
for frame in stack:
33+
frame.filename = frame.filename.replace(cwd, "./") # for readability
34+
return stack
35+
36+
37+
@contextmanager
38+
def threading_remembers_where() -> Generator[None]:
39+
"""Smuggle a ._where StackSummary attribute onto each Thread construction."""
40+
__init__ = Thread.__init__
41+
42+
def patched__init__(self: Thread, *a: Any, **k: Any) -> None:
43+
setattr(self, "_where", _where())
44+
__init__(self, *a, **k)
45+
46+
with mock.patch.object(Thread, "__init__", patched__init__):
47+
yield
48+
49+
50+
@contextmanager
51+
def assert_none(strict: bool = True) -> Generator[dict[str, Any]]:
52+
"""Assert no thread leaks occurred during context execution."""
53+
54+
with threading_remembers_where():
55+
expected = threading.enumerate()
56+
result: dict[str, Any] = {"events": {}}
57+
yield result
58+
actual = threading.enumerate()
59+
60+
thread_leaks = set(actual) - set(expected)
61+
if not thread_leaks:
62+
return
63+
64+
result["events"] = sentry.capture_event(thread_leaks, strict)
65+
if strict:
66+
raise ThreadLeakAssertionError(diff(old=expected, new=actual))
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
"""Generate readable diffs showing thread creation locations for debugging leaks."""
2+
3+
import sys
4+
from collections.abc import Generator, Iterable
5+
from threading import Thread
6+
from traceback import FrameSummary, StackSummary
7+
8+
9+
def get_relevant_frames(stack: Iterable[FrameSummary]) -> StackSummary:
10+
"""Filter stack frames to show only test and testutil code relevant for debugging.
11+
12+
Applies a series of filters to remove stdlib and system code, then focuses on
13+
test and testutil frames where thread leak fixes typically need to go. Each filter
14+
is only applied if it doesn't remove all frames.
15+
"""
16+
# Get stdlib directory path if available (for filtering system code)
17+
stdlib_dir = getattr(sys, "_stdlib_dir", None)
18+
19+
# Apply each filter independently, reverting if it would remove all frames
20+
for filter in (
21+
lambda frame: frame.filename == __file__, # Remove this very file
22+
lambda frame: frame.line is None, # Remove "frozen" stdlib modules
23+
lambda frame: stdlib_dir
24+
and frame.filename.startswith(stdlib_dir + "/"), # Remove Python stdlib
25+
lambda frame: frame.filename.startswith(sys.prefix), # Remove system Python files
26+
# Keep only test/testutil frames (where fixes will need to go)
27+
lambda frame: "/test_" not in frame.filename, # NOTE: filter-not is a double negative
28+
lambda frame: "/testutils/" not in frame.filename,
29+
):
30+
filtered_stack = [frame for frame in stack if not filter(frame)]
31+
# Only apply the filter if it leaves some frames (fail-safe)
32+
if filtered_stack:
33+
stack = StackSummary.from_list(filtered_stack)
34+
35+
return StackSummary.from_list(stack)
36+
37+
38+
def _get_thread_function_name(thread: Thread) -> str:
39+
"""Extract fully qualified function name from thread target.
40+
41+
Handles cases where thread target is None or wrapped (e.g., functools.partial).
42+
Returns a string representation suitable for debugging output.
43+
"""
44+
func = getattr(thread, "_target", None)
45+
if func is None:
46+
return "None"
47+
48+
# Use __qualname__ if available, fallback to str() for complex objects like functools.partial
49+
func_name = getattr(func, "__qualname__", str(func))
50+
return f"{func.__module__}.{func_name}"
51+
52+
53+
def _threads_to_diffable(threads: list[Thread]) -> list[str]:
54+
"""Convert threads to string representations suitable for diffing.
55+
56+
Each thread becomes a formatted string containing:
57+
- Thread representation (name, ID, daemon status)
58+
- Target function fully qualified name
59+
- Indented stack trace showing where thread was created
60+
61+
Threads are sorted by ID for consistent diff output.
62+
"""
63+
result: list[str] = []
64+
for thread in sorted(threads, key=lambda t: t.ident or 0):
65+
func_fqname = _get_thread_function_name(thread)
66+
stack = getattr(thread, "_where", [])
67+
stack = get_relevant_frames(stack)
68+
stack = "".join(stack.format())
69+
# Indent stack trace lines for visual hierarchy in diff output
70+
stack = ("\n" + stack).replace("\n", "\n ").strip(" ")
71+
result.append(f"{thread!r}@{func_fqname}{stack}\n")
72+
return result
73+
74+
75+
def _diff(old: list[str], new: list[str]) -> Generator[str]:
76+
"""Generate unified diff lines between two thread lists."""
77+
import difflib
78+
79+
matcher = difflib.SequenceMatcher(None, old, new)
80+
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
81+
for tags, threads, prefix in [
82+
(("equal",), old[i1:i2], " "),
83+
(("delete", "replace"), old[i1:i2], "-"),
84+
(("insert", "replace"), new[j1:j2], "+"),
85+
]:
86+
if tag in tags:
87+
for thread_str in threads:
88+
for line in thread_str.splitlines():
89+
yield f"{prefix} {line}\n"
90+
91+
92+
def diff(old: list[Thread], new: list[Thread]) -> str:
93+
"""Generate unambiguous unified diff from structured thread data."""
94+
return "\n" + "".join(
95+
_diff(
96+
_threads_to_diffable(old),
97+
_threads_to_diffable(new),
98+
)
99+
)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Pytest integration for thread leak detection."""
2+
3+
from collections.abc import Generator
4+
5+
import pytest
6+
7+
from .assertion import assert_none
8+
9+
10+
def check_test(request: pytest.FixtureRequest, strict: bool = True) -> Generator[None]:
11+
"""Check test for thread leaks, respecting allowlist markers."""
12+
if request.node.get_closest_marker("thread_leak_allowlist"):
13+
yield
14+
else:
15+
with assert_none(strict):
16+
yield
17+
18+
19+
def allowlist(reason: str | None = None, *, issue: int) -> pytest.MarkDecorator:
20+
"""Mark test as allowed to leak threads with tracking issue."""
21+
decorator = pytest.mark.thread_leak_allowlist(reason=reason, issue=issue)
22+
return decorator
23+
24+
25+
def singleton_cleanup(reason: str | None = None, *, issue: int) -> pytest.MarkDecorator:
26+
"""Mark test as needing singleton cleanup with tracking issue."""
27+
decorator = pytest.mark.thread_leak_singleton_cleanup(reason=reason, issue=issue)
28+
return decorator
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""Sentry integration for reporting thread leak events with proper context."""
2+
3+
from __future__ import annotations
4+
5+
import functools
6+
from collections.abc import Iterable
7+
from threading import Thread
8+
from traceback import FrameSummary
9+
from typing import TYPE_CHECKING
10+
11+
import sentry_sdk.scope
12+
13+
from .diff import get_relevant_frames
14+
15+
if TYPE_CHECKING:
16+
# this is *only* defined when TYPE_CHECKING =(
17+
from sentry_sdk._types import Event as SentryEvent
18+
19+
20+
@functools.cache
21+
def get_scope() -> sentry_sdk.scope.Scope:
22+
"""Create configured Sentry scope for thread leak reporting."""
23+
from os import environ
24+
25+
sha: str | None
26+
branch: str | None
27+
if environ.get("GITHUB_ACTIONS") == "true":
28+
sha = environ["GITHUB_SHA"]
29+
branch = environ.get("GITHUB_HEAD_REF")
30+
if branch:
31+
environment = "PR"
32+
else:
33+
environment = "master"
34+
branch = environ["GITHUB_REF_NAME"]
35+
else:
36+
sha = branch = None
37+
environment = "local"
38+
39+
client = sentry_sdk.Client(
40+
# proj-thread-leaks
41+
dsn="https://[email protected]/4509798820085760",
42+
environment=environment,
43+
)
44+
45+
scope = sentry_sdk.get_current_scope().fork()
46+
scope.set_client(client)
47+
scope.update_from_kwargs(
48+
# Don't set level - scope overrides event-level
49+
extras={"git-branch": branch, "git-sha": sha},
50+
)
51+
return scope
52+
53+
54+
def capture_event(thread_leaks: set[Thread], strict: bool) -> dict[str, SentryEvent]:
55+
"""Report thread leaks to Sentry with proper event formatting."""
56+
# Report to Sentry
57+
scope = get_scope()
58+
events = {}
59+
with sentry_sdk.scope.use_scope(scope):
60+
for thread_leak in thread_leaks:
61+
event = get_thread_leak_event(thread_leak, strict)
62+
event_id = scope.capture_event(event)
63+
if event_id is not None:
64+
events[event_id] = event
65+
scope.client.flush()
66+
return events
67+
68+
69+
def get_thread_leak_event(thread: Thread, strict: bool = True) -> SentryEvent:
70+
"""Create Sentry event from leaked thread."""
71+
stack: Iterable[FrameSummary] = getattr(thread, "_where", [])
72+
return event_from_stack(repr(thread), stack, strict)
73+
74+
75+
def event_from_stack(value: str, stack: Iterable[FrameSummary], strict: bool) -> SentryEvent:
76+
relevant_frames = get_relevant_frames(stack)
77+
78+
# https://develop.sentry.dev/sdk/data-model/event-payloads/exception/
79+
exception = {
80+
"mechanism": {
81+
"type": __name__,
82+
"handled": not strict,
83+
"help_link": "https://www.notion.so/sentry/How-To-Thread-Leaks-2488b10e4b5d8049965cc057b5fb5f6b",
84+
},
85+
"type": "ThreadLeakAssertionError",
86+
"value": value,
87+
"stacktrace": {
88+
"frames": [
89+
{
90+
"filename": frame.filename,
91+
"function": frame.name,
92+
"module": frame.locals.get("__name__") if frame.locals else None,
93+
"lineno": frame.lineno,
94+
"context_line": frame.line,
95+
"in_app": frame in relevant_frames,
96+
}
97+
for frame in stack
98+
]
99+
},
100+
}
101+
return {
102+
"level": "error" if strict else "warning",
103+
"message": "Thread leak detected",
104+
"exception": {"values": [exception]},
105+
}

tests/conftest.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from sentry.silo.base import SiloMode
1313
from sentry.testutils.pytest.sentry import get_default_silo_mode_for_test_cases
14+
from sentry.testutils.thread_leaks import pytest as thread_leaks
1415

1516
pytest_plugins = ["sentry.testutils.pytest"]
1617

@@ -56,6 +57,12 @@ def unclosed_files():
5657
assert _open_files() == fds
5758

5859

60+
@pytest.fixture(autouse=True)
61+
def unclosed_threads(request):
62+
# TODO(DI-1067): strict mode
63+
yield from thread_leaks.check_test(request, strict=False)
64+
65+
5966
@pytest.fixture(autouse=True)
6067
def validate_silo_mode():
6168
# NOTE! Hybrid cloud uses many mechanisms to simulate multiple different configurations of the application

tests/sentry/testutils/thread_leaks/__init__.py

Whitespace-only changes.
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
"""Test configuration for thread leak tests."""
2+
3+
from collections.abc import Generator
4+
5+
import pytest
6+
7+
8+
@pytest.fixture(scope="package", autouse=True)
9+
def thread_leak_test_environment() -> Generator[None]:
10+
"""Set sentry environment to "selftest" for all tests in this package."""
11+
from sentry.testutils.thread_leaks.sentry import get_scope
12+
13+
scope = get_scope()
14+
orig = scope.client.options.get("environment")
15+
scope.client.options["environment"] = "selftest"
16+
17+
yield
18+
19+
scope.client.options["environment"] = orig

0 commit comments

Comments
 (0)