Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 52 additions & 8 deletions sentry_sdk/integrations/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import warnings
from functools import wraps
from threading import Thread, current_thread
from concurrent.futures import ThreadPoolExecutor, Future

import sentry_sdk
from sentry_sdk.integrations import Integration
Expand All @@ -24,6 +25,7 @@
from sentry_sdk._types import ExcInfo

F = TypeVar("F", bound=Callable[..., Any])
T = TypeVar("T", bound=Any)


class ThreadingIntegration(Integration):
Expand Down Expand Up @@ -59,6 +61,15 @@ def setup_once():
django_version = None
channels_version = None

is_async_emulated_with_threads = (
sys.version_info < (3, 9)
and channels_version is not None
and channels_version < "4.0.0"
and django_version is not None
and django_version >= (3, 0)
and django_version < (4, 0)
)

@wraps(old_start)
def sentry_start(self, *a, **kw):
# type: (Thread, *Any, **Any) -> Any
Expand All @@ -67,14 +78,7 @@ def sentry_start(self, *a, **kw):
return old_start(self, *a, **kw)

if integration.propagate_scope:
if (
sys.version_info < (3, 9)
and channels_version is not None
and channels_version < "4.0.0"
and django_version is not None
and django_version >= (3, 0)
and django_version < (4, 0)
):
if is_async_emulated_with_threads:
warnings.warn(
"There is a known issue with Django channels 2.x and 3.x when using Python 3.8 or older. "
"(Async support is emulated using threads and some Sentry data may be leaked between those threads.) "
Expand Down Expand Up @@ -109,6 +113,9 @@ def sentry_start(self, *a, **kw):
return old_start(self, *a, **kw)

Thread.start = sentry_start # type: ignore
ThreadPoolExecutor.submit = _wrap_threadpool_executor_submit( # type: ignore
ThreadPoolExecutor.submit, is_async_emulated_with_threads
)


def _wrap_run(isolation_scope_to_use, current_scope_to_use, old_run_func):
Expand All @@ -134,6 +141,43 @@ def _run_old_run_func():
return run # type: ignore


def _wrap_threadpool_executor_submit(func, is_async_emulated_with_threads):
# type: (Callable[..., Future[T]], bool) -> Callable[..., Future[T]]
"""
Wrap submit call to propagate scopes on task submission.
"""

@wraps(func)
def sentry_submit(self, fn, *args, **kwargs):
# type: (ThreadPoolExecutor, Callable[..., T], *Any, **Any) -> Future[T]
integration = sentry_sdk.get_client().get_integration(ThreadingIntegration)
if integration is None:
return func(self, fn, *args, **kwargs)

if integration.propagate_scope and is_async_emulated_with_threads:
isolation_scope = sentry_sdk.get_isolation_scope()
current_scope = sentry_sdk.get_current_scope()
elif integration.propagate_scope:
isolation_scope = sentry_sdk.get_isolation_scope().fork()
current_scope = sentry_sdk.get_current_scope().fork()
else:
isolation_scope = None
current_scope = None

def wrapped_fn(*args, **kwargs):
# type: (*Any, **Any) -> Any
if isolation_scope is not None and current_scope is not None:
with use_isolation_scope(isolation_scope):
with use_scope(current_scope):
return fn(*args, **kwargs)

return fn(*args, **kwargs)

return func(self, wrapped_fn, *args, **kwargs)

return sentry_submit


def _capture_exception():
# type: () -> ExcInfo
exc_info = sys.exc_info()
Expand Down
61 changes: 61 additions & 0 deletions tests/integrations/threading/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,64 @@ def do_some_work(number):
- op="outer-submit-4": description="Thread: main"\
"""
)


@pytest.mark.parametrize(
"propagate_scope",
(True, False),
ids=["propagate_scope=True", "propagate_scope=False"],
)
def test_spans_from_threadpool(
sentry_init, capture_events, render_span_tree, propagate_scope
):
sentry_init(
traces_sample_rate=1.0,
integrations=[ThreadingIntegration(propagate_scope=propagate_scope)],
)
events = capture_events()

def do_some_work(number):
with sentry_sdk.start_span(
op=f"inner-run-{number}", name=f"Thread: child-{number}"
):
pass

with sentry_sdk.start_transaction(op="outer-trx"):
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for number in range(5):
with sentry_sdk.start_span(
op=f"outer-submit-{number}", name="Thread: main"
):
future = executor.submit(do_some_work, number)
future.result()

(event,) = events

if propagate_scope:
assert render_span_tree(event) == dedent(
"""\
- op="outer-trx": description=null
- op="outer-submit-0": description="Thread: main"
- op="inner-run-0": description="Thread: child-0"
- op="outer-submit-1": description="Thread: main"
- op="inner-run-1": description="Thread: child-1"
- op="outer-submit-2": description="Thread: main"
- op="inner-run-2": description="Thread: child-2"
- op="outer-submit-3": description="Thread: main"
- op="inner-run-3": description="Thread: child-3"
- op="outer-submit-4": description="Thread: main"
- op="inner-run-4": description="Thread: child-4"\
"""
)

elif not propagate_scope:
assert render_span_tree(event) == dedent(
"""\
- op="outer-trx": description=null
- op="outer-submit-0": description="Thread: main"
- op="outer-submit-1": description="Thread: main"
- op="outer-submit-2": description="Thread: main"
- op="outer-submit-3": description="Thread: main"
- op="outer-submit-4": description="Thread: main"\
"""
)