diff --git a/sentry_sdk/integrations/threading.py b/sentry_sdk/integrations/threading.py index c031c51f50..cfe54c829c 100644 --- a/sentry_sdk/integrations/threading.py +++ b/sentry_sdk/integrations/threading.py @@ -2,6 +2,7 @@ import warnings from functools import wraps from threading import Thread, current_thread +from concurrent.futures import ThreadPoolExecutor, Future import sentry_sdk from sentry_sdk.integrations import Integration @@ -24,6 +25,7 @@ from sentry_sdk._types import ExcInfo F = TypeVar("F", bound=Callable[..., Any]) + T = TypeVar("T", bound=Any) class ThreadingIntegration(Integration): @@ -59,6 +61,15 @@ def setup_once(): django_version = None channels_version = None + is_async_emulated_with_threads = ( + sys.version_info < (3, 9) + and channels_version is not None + and channels_version < "4.0.0" + and django_version is not None + and django_version >= (3, 0) + and django_version < (4, 0) + ) + @wraps(old_start) def sentry_start(self, *a, **kw): # type: (Thread, *Any, **Any) -> Any @@ -67,14 +78,7 @@ def sentry_start(self, *a, **kw): return old_start(self, *a, **kw) if integration.propagate_scope: - if ( - sys.version_info < (3, 9) - and channels_version is not None - and channels_version < "4.0.0" - and django_version is not None - and django_version >= (3, 0) - and django_version < (4, 0) - ): + if is_async_emulated_with_threads: warnings.warn( "There is a known issue with Django channels 2.x and 3.x when using Python 3.8 or older. " "(Async support is emulated using threads and some Sentry data may be leaked between those threads.) " @@ -109,6 +113,9 @@ def sentry_start(self, *a, **kw): return old_start(self, *a, **kw) Thread.start = sentry_start # type: ignore + ThreadPoolExecutor.submit = _wrap_threadpool_executor_submit( # type: ignore + ThreadPoolExecutor.submit, is_async_emulated_with_threads + ) def _wrap_run(isolation_scope_to_use, current_scope_to_use, old_run_func): @@ -134,6 +141,43 @@ def _run_old_run_func(): return run # type: ignore +def _wrap_threadpool_executor_submit(func, is_async_emulated_with_threads): + # type: (Callable[..., Future[T]], bool) -> Callable[..., Future[T]] + """ + Wrap submit call to propagate scopes on task submission. + """ + + @wraps(func) + def sentry_submit(self, fn, *args, **kwargs): + # type: (ThreadPoolExecutor, Callable[..., T], *Any, **Any) -> Future[T] + integration = sentry_sdk.get_client().get_integration(ThreadingIntegration) + if integration is None: + return func(self, fn, *args, **kwargs) + + if integration.propagate_scope and is_async_emulated_with_threads: + isolation_scope = sentry_sdk.get_isolation_scope() + current_scope = sentry_sdk.get_current_scope() + elif integration.propagate_scope: + isolation_scope = sentry_sdk.get_isolation_scope().fork() + current_scope = sentry_sdk.get_current_scope().fork() + else: + isolation_scope = None + current_scope = None + + def wrapped_fn(*args, **kwargs): + # type: (*Any, **Any) -> Any + if isolation_scope is not None and current_scope is not None: + with use_isolation_scope(isolation_scope): + with use_scope(current_scope): + return fn(*args, **kwargs) + + return fn(*args, **kwargs) + + return func(self, wrapped_fn, *args, **kwargs) + + return sentry_submit + + def _capture_exception(): # type: () -> ExcInfo exc_info = sys.exc_info() diff --git a/tests/integrations/threading/test_threading.py b/tests/integrations/threading/test_threading.py index 799298910b..9c9a24aa63 100644 --- a/tests/integrations/threading/test_threading.py +++ b/tests/integrations/threading/test_threading.py @@ -276,3 +276,64 @@ def do_some_work(number): - op="outer-submit-4": description="Thread: main"\ """ ) + + +@pytest.mark.parametrize( + "propagate_scope", + (True, False), + ids=["propagate_scope=True", "propagate_scope=False"], +) +def test_spans_from_threadpool( + sentry_init, capture_events, render_span_tree, propagate_scope +): + sentry_init( + traces_sample_rate=1.0, + integrations=[ThreadingIntegration(propagate_scope=propagate_scope)], + ) + events = capture_events() + + def do_some_work(number): + with sentry_sdk.start_span( + op=f"inner-run-{number}", name=f"Thread: child-{number}" + ): + pass + + with sentry_sdk.start_transaction(op="outer-trx"): + with futures.ThreadPoolExecutor(max_workers=1) as executor: + for number in range(5): + with sentry_sdk.start_span( + op=f"outer-submit-{number}", name="Thread: main" + ): + future = executor.submit(do_some_work, number) + future.result() + + (event,) = events + + if propagate_scope: + assert render_span_tree(event) == dedent( + """\ + - op="outer-trx": description=null + - op="outer-submit-0": description="Thread: main" + - op="inner-run-0": description="Thread: child-0" + - op="outer-submit-1": description="Thread: main" + - op="inner-run-1": description="Thread: child-1" + - op="outer-submit-2": description="Thread: main" + - op="inner-run-2": description="Thread: child-2" + - op="outer-submit-3": description="Thread: main" + - op="inner-run-3": description="Thread: child-3" + - op="outer-submit-4": description="Thread: main" + - op="inner-run-4": description="Thread: child-4"\ +""" + ) + + elif not propagate_scope: + assert render_span_tree(event) == dedent( + """\ + - op="outer-trx": description=null + - op="outer-submit-0": description="Thread: main" + - op="outer-submit-1": description="Thread: main" + - op="outer-submit-2": description="Thread: main" + - op="outer-submit-3": description="Thread: main" + - op="outer-submit-4": description="Thread: main"\ +""" + )