Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions sentry_sdk/integrations/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import warnings
from functools import wraps
from threading import Thread, current_thread
from concurrent.futures import ThreadPoolExecutor, Future

import sentry_sdk
from sentry_sdk.integrations import Integration
Expand All @@ -24,6 +25,7 @@
from sentry_sdk._types import ExcInfo

F = TypeVar("F", bound=Callable[..., Any])
T = TypeVar("T", bound=Any)


class ThreadingIntegration(Integration):
Expand Down Expand Up @@ -109,6 +111,7 @@ def sentry_start(self, *a, **kw):
return old_start(self, *a, **kw)

Thread.start = sentry_start # type: ignore
ThreadPoolExecutor.submit = _wrap_threadpool_executor_submit(ThreadPoolExecutor.submit) # type: ignore


def _wrap_run(isolation_scope_to_use, current_scope_to_use, old_run_func):
Expand All @@ -134,6 +137,38 @@ def _run_old_run_func():
return run # type: ignore


def _wrap_threadpool_executor_submit(func):
# type: (Callable[..., Future[T]]) -> Callable[..., Future[T]]
"""
Wrap submit call to propagate scopes on task submission.
"""

@wraps(func)
def sentry_submit(self, fn, *args, **kwargs):
# type: (ThreadPoolExecutor, Callable[..., T], *Any, **Any) -> Future[T]
integration = sentry_sdk.get_client().get_integration(ThreadingIntegration)
if integration is None:
return func(self, fn, *args, **kwargs)

if integration.propagate_scope:
isolation_scope = sentry_sdk.get_isolation_scope().fork()
current_scope = sentry_sdk.get_current_scope().fork()
else:
isolation_scope = None
current_scope = None

def wrapped_fn(*args, **kwargs):
# type: (*Any, **Any) -> Any
if isolation_scope is not None and current_scope is not None:
with use_isolation_scope(isolation_scope):
with use_scope(current_scope):
return fn(*args, **kwargs)

return func(self, wrapped_fn, *args, **kwargs)

return sentry_submit


def _capture_exception():
# type: () -> ExcInfo
exc_info = sys.exc_info()
Expand Down
41 changes: 41 additions & 0 deletions tests/integrations/threading/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,44 @@ def do_some_work(number):
- op="outer-submit-4": description="Thread: main"\
"""
)


def test_spans_from_threadpool(sentry_init, capture_events, render_span_tree):
sentry_init(
traces_sample_rate=1.0,
integrations=[ThreadingIntegration()],
)
events = capture_events()

def do_some_work(number):
with sentry_sdk.start_span(
op=f"inner-run-{number}", name=f"Thread: child-{number}"
):
pass

with sentry_sdk.start_transaction(op="outer-trx"):
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for number in range(5):
with sentry_sdk.start_span(
op=f"outer-submit-{number}", name="Thread: main"
):
future = executor.submit(do_some_work, number)
future.result()

(event,) = events

assert render_span_tree(event) == dedent(
"""\
- op="outer-trx": description=null
- op="outer-submit-0": description="Thread: main"
- op="inner-run-0": description="Thread: child-0"
- op="outer-submit-1": description="Thread: main"
- op="inner-run-1": description="Thread: child-1"
- op="outer-submit-2": description="Thread: main"
- op="inner-run-2": description="Thread: child-2"
- op="outer-submit-3": description="Thread: main"
- op="inner-run-3": description="Thread: child-3"
- op="outer-submit-4": description="Thread: main"
- op="inner-run-4": description="Thread: child-4"\
"""
)
Loading