Skip to content

Commit 0cd1459

Browse files
feat: Add concurrent.futures integration
1 parent 567a91a commit 0cd1459

File tree

3 files changed

+180
-0
lines changed

3 files changed

+180
-0
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from functools import wraps
2+
3+
from concurrent.futures import ThreadPoolExecutor
4+
5+
import sentry_sdk
6+
from sentry_sdk.integrations import Integration
7+
from sentry_sdk.scope import use_isolation_scope, use_scope
8+
9+
from typing import TYPE_CHECKING
10+
11+
if TYPE_CHECKING:
12+
from typing import Any
13+
from typing import Callable
14+
15+
16+
class ConcurrentIntegration(Integration):
17+
identifier = "concurrent"
18+
19+
def __init__(self, record_exceptions_on_futures=True):
20+
# type: (bool) -> None
21+
self.record_exceptions_on_futures = record_exceptions_on_futures
22+
23+
@staticmethod
24+
def setup_once():
25+
# type: () -> None
26+
old_submit = ThreadPoolExecutor.submit
27+
28+
@wraps(old_submit)
29+
def sentry_submit(self, fn, *args, **kwargs):
30+
# type: (ThreadPoolExecutor, Callable, *Any, **Any) -> Any
31+
integration = sentry_sdk.get_client().get_integration(ConcurrentIntegration)
32+
if integration is None:
33+
return old_submit(self, fn, *args, **kwargs)
34+
35+
isolation_scope = sentry_sdk.get_isolation_scope().fork()
36+
current_scope = sentry_sdk.get_current_scope().fork()
37+
38+
def wrapped_fn(*args, **kwargs):
39+
# type: (*Any, **Any) -> Any
40+
with use_isolation_scope(isolation_scope):
41+
with use_scope(current_scope):
42+
return fn(*args, **kwargs)
43+
44+
return old_submit(self, wrapped_fn, *args, **kwargs)
45+
46+
ThreadPoolExecutor.submit = sentry_submit
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
from textwrap import dedent
2+
from concurrent import futures
3+
from concurrent.futures import Future, ThreadPoolExecutor
4+
5+
import sentry_sdk
6+
7+
from sentry_sdk.integrations.concurrent import ConcurrentIntegration
8+
9+
original_submit = ThreadPoolExecutor.submit
10+
original_set_exception = Future.set_exception
11+
12+
13+
def test_propagates_threadpool_scope(sentry_init, capture_events):
14+
sentry_init(
15+
default_integrations=False,
16+
traces_sample_rate=1.0,
17+
integrations=[ConcurrentIntegration()],
18+
)
19+
events = capture_events()
20+
21+
def double(number):
22+
with sentry_sdk.start_span(op="task", name=str(number)):
23+
return number * 2
24+
25+
with sentry_sdk.start_transaction(name="test_handles_threadpool"):
26+
with futures.ThreadPoolExecutor(max_workers=1) as executor:
27+
tasks = [executor.submit(double, number) for number in [1, 2, 3, 4]]
28+
for future in futures.as_completed(tasks):
29+
print("Getting future value!", future.result())
30+
31+
sentry_sdk.flush()
32+
33+
assert len(events) == 1
34+
(event,) = events
35+
assert event["spans"][0]["trace_id"] == event["spans"][1]["trace_id"]
36+
assert event["spans"][1]["trace_id"] == event["spans"][2]["trace_id"]
37+
assert event["spans"][2]["trace_id"] == event["spans"][3]["trace_id"]
38+
assert event["spans"][3]["trace_id"] == event["spans"][0]["trace_id"]
39+
40+
41+
def test_propagates_threadpool_scope_in_map(sentry_init, capture_events):
42+
sentry_init(
43+
default_integrations=False,
44+
traces_sample_rate=1.0,
45+
integrations=[ConcurrentIntegration()],
46+
)
47+
events = capture_events()
48+
49+
def double(number):
50+
with sentry_sdk.start_span(op="task", name=str(number)):
51+
return number * 2
52+
53+
with sentry_sdk.start_transaction(name="test_handles_threadpool"):
54+
with futures.ThreadPoolExecutor(max_workers=1) as executor:
55+
for value in executor.map(double, [1, 2, 3, 4]):
56+
print("Getting future value!", value)
57+
58+
sentry_sdk.flush()
59+
60+
assert len(events) == 1
61+
(event,) = events
62+
assert event["spans"][0]["trace_id"] == event["spans"][1]["trace_id"]
63+
assert event["spans"][1]["trace_id"] == event["spans"][2]["trace_id"]
64+
assert event["spans"][2]["trace_id"] == event["spans"][3]["trace_id"]
65+
assert event["spans"][3]["trace_id"] == event["spans"][0]["trace_id"]
66+
67+
68+
def test_scope_data_not_leaked_in_executor(sentry_init):
69+
sentry_init(
70+
integrations=[ConcurrentIntegration()],
71+
)
72+
73+
sentry_sdk.set_tag("initial_tag", "initial_value")
74+
initial_iso_scope = sentry_sdk.get_isolation_scope()
75+
76+
def do_some_work():
77+
# check if we have the initial scope data propagated into the thread
78+
assert sentry_sdk.get_isolation_scope()._tags == {
79+
"initial_tag": "initial_value"
80+
}
81+
82+
# change data in isolation scope in thread
83+
sentry_sdk.set_tag("thread_tag", "thread_value")
84+
85+
with futures.ThreadPoolExecutor(max_workers=1) as executor:
86+
future = executor.submit(do_some_work)
87+
future.result()
88+
89+
# check if the initial scope data is not modified by the started thread
90+
assert initial_iso_scope._tags == {
91+
"initial_tag": "initial_value"
92+
}, "The isolation scope in the main thread should not be modified by the started thread."
93+
94+
95+
def test_spans_from_multiple_threads(sentry_init, capture_events, render_span_tree):
96+
sentry_init(
97+
traces_sample_rate=1.0,
98+
integrations=[ConcurrentIntegration()],
99+
)
100+
events = capture_events()
101+
102+
def do_some_work(number):
103+
with sentry_sdk.start_span(
104+
op=f"inner-run-{number}", name=f"Thread: child-{number}"
105+
):
106+
pass
107+
108+
with sentry_sdk.start_transaction(op="outer-trx"):
109+
with futures.ThreadPoolExecutor(max_workers=1) as executor:
110+
for number in range(5):
111+
with sentry_sdk.start_span(
112+
op=f"outer-submit-{number}", name="Thread: main"
113+
):
114+
future = executor.submit(do_some_work, number)
115+
future.result()
116+
117+
(event,) = events
118+
119+
assert render_span_tree(event) == dedent(
120+
"""\
121+
- op="outer-trx": description=null
122+
- op="outer-submit-0": description="Thread: main"
123+
- op="inner-run-0": description="Thread: child-0"
124+
- op="outer-submit-1": description="Thread: main"
125+
- op="inner-run-1": description="Thread: child-1"
126+
- op="outer-submit-2": description="Thread: main"
127+
- op="inner-run-2": description="Thread: child-2"
128+
- op="outer-submit-3": description="Thread: main"
129+
- op="inner-run-3": description="Thread: child-3"
130+
- op="outer-submit-4": description="Thread: main"
131+
- op="inner-run-4": description="Thread: child-4"\
132+
"""
133+
)

tests/test_basics.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,7 @@ def foo(event, hint):
868868
(["atexit"], "sentry.python"),
869869
(["boto3"], "sentry.python"),
870870
(["celery"], "sentry.python"),
871+
(["concurrent"], "sentry.python"),
871872
(["dedupe"], "sentry.python"),
872873
(["excepthook"], "sentry.python"),
873874
(["unraisablehook"], "sentry.python"),

0 commit comments

Comments
 (0)