Skip to content

Commit 41983fc

Browse files
committed
fix(asyncio integration): Filter SDK internal tasks from asyncio spans
Add a context manager to track internal task creation context. This can be used to track what tasks are created internally by the SDK GH-4699
1 parent 1334a29 commit 41983fc

File tree

6 files changed

+118
-13
lines changed

6 files changed

+118
-13
lines changed

sentry_sdk/client.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from sentry_sdk.integrations.dedupe import DedupeIntegration
3939
from sentry_sdk.sessions import SessionFlusher
4040
from sentry_sdk.envelope import Envelope
41+
from sentry_sdk.utils import mark_sentry_task_internal
4142

4243
from sentry_sdk.profiler.continuous_profiler import setup_continuous_profiler
4344
from sentry_sdk.profiler.transaction_profiler import (
@@ -964,9 +965,10 @@ async def _flush_and_close(
964965
):
965966

966967
try:
967-
flush_task = self.transport.loop.create_task(
968-
_flush_and_close(timeout, callback)
969-
)
968+
with mark_sentry_task_internal():
969+
flush_task = self.transport.loop.create_task(
970+
_flush_and_close(timeout, callback)
971+
)
970972
except RuntimeError:
971973
# Shutdown the components anyway
972974
self._close_components()
@@ -999,9 +1001,10 @@ def flush( # type: ignore[override]
9991001
self.transport, "loop"
10001002
):
10011003
try:
1002-
return self.transport.loop.create_task(
1003-
self._flush_async(timeout, callback)
1004-
)
1004+
with mark_sentry_task_internal():
1005+
return self.transport.loop.create_task(
1006+
self._flush_async(timeout, callback)
1007+
)
10051008
except RuntimeError:
10061009
logger.warning("Event loop not running, aborting flush.")
10071010
return None

sentry_sdk/integrations/asyncio.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
import sentry_sdk
55
from sentry_sdk.consts import OP
66
from sentry_sdk.integrations import Integration, DidNotEnable
7-
from sentry_sdk.utils import event_from_exception, logger, reraise
7+
from sentry_sdk.utils import (
8+
event_from_exception,
9+
logger,
10+
reraise,
11+
_is_sentry_internal_task,
12+
)
813
from sentry_sdk.transport import AsyncHttpTransport
914

1015
try:
@@ -83,6 +88,18 @@ def _sentry_task_factory(
8388
**kwargs: Any,
8489
) -> asyncio.Future[Any]:
8590

91+
# Check if this is an internal Sentry task
92+
is_internal = _is_sentry_internal_task.get()
93+
94+
if is_internal:
95+
if orig_task_factory:
96+
return orig_task_factory(loop, coro, **kwargs)
97+
else:
98+
task = Task(coro, loop=loop, **kwargs)
99+
if task._source_traceback: # type: ignore
100+
del task._source_traceback[-1] # type: ignore
101+
return task
102+
86103
async def _task_with_sentry_span_creation() -> Any:
87104
result = None
88105

sentry_sdk/transport.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@
4040
import certifi
4141

4242
from sentry_sdk.consts import EndpointType
43-
from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
43+
from sentry_sdk.utils import (
44+
Dsn,
45+
logger,
46+
capture_internal_exceptions,
47+
mark_sentry_task_internal,
48+
)
4449
from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker
4550
from sentry_sdk.envelope import Envelope, Item, PayloadRef
4651

@@ -800,7 +805,8 @@ def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore
800805
self.background_tasks.clear()
801806
try:
802807
# Return the pool cleanup task so caller can await it if needed
803-
return self.loop.create_task(self._pool.aclose()) # type: ignore
808+
with mark_sentry_task_internal():
809+
return self.loop.create_task(self._pool.aclose(), name="sentry_sdk_pool_aclose") # type: ignore
804810
except RuntimeError:
805811
logger.warning("Event loop not running, aborting kill.")
806812
return None

sentry_sdk/utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22
import base64
3+
import contextvars
34
import json
45
import linecache
56
import logging
@@ -12,6 +13,7 @@
1213
import threading
1314
import time
1415
from collections import namedtuple
16+
from contextlib import contextmanager
1517
from datetime import datetime, timezone
1618
from decimal import Decimal
1719
from functools import partial, partialmethod, wraps
@@ -72,6 +74,21 @@
7274

7375
_installed_modules = None
7476

77+
_is_sentry_internal_task = contextvars.ContextVar(
78+
"_is_sentry_internal_task", default=False
79+
)
80+
81+
82+
@contextmanager
83+
def mark_sentry_task_internal():
84+
"""Context manager to mark a task as Sentry internal."""
85+
token = _is_sentry_internal_task.set(True)
86+
try:
87+
yield
88+
finally:
89+
_is_sentry_internal_task.reset(token)
90+
91+
7592
BASE64_ALPHABET = re.compile(r"^[a-zA-Z0-9/+=]*$")
7693

7794
FALSY_ENV_VALUES = frozenset(("false", "f", "n", "no", "off", "0"))

sentry_sdk/worker.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from time import sleep, time
88
from sentry_sdk._queue import Queue, FullError
9-
from sentry_sdk.utils import logger
9+
from sentry_sdk.utils import logger, mark_sentry_task_internal
1010
from sentry_sdk.consts import DEFAULT_QUEUE_SIZE
1111

1212
from typing import TYPE_CHECKING
@@ -231,7 +231,10 @@ def start(self) -> None:
231231
self._loop = asyncio.get_running_loop()
232232
if self._queue is None:
233233
self._queue = asyncio.Queue(maxsize=self._queue_size)
234-
self._task = self._loop.create_task(self._target())
234+
with mark_sentry_task_internal():
235+
self._task = self._loop.create_task(
236+
self._target(), name="sentry_sdk_async_worker"
237+
)
235238
self._task_for_pid = os.getpid()
236239
except RuntimeError:
237240
# There is no event loop running
@@ -273,7 +276,11 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N
273276

274277
def flush(self, timeout: float, callback: Optional[Any] = None) -> Optional[asyncio.Task[None]]: # type: ignore[override]
275278
if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running():
276-
return self._loop.create_task(self._wait_flush(timeout, callback))
279+
with mark_sentry_task_internal():
280+
return self._loop.create_task(
281+
self._wait_flush(timeout, callback),
282+
name="sentry_sdk_async_worker_flush",
283+
)
277284
return None
278285

279286
def submit(self, callback: Callable[[], Any]) -> bool:
@@ -295,7 +302,11 @@ async def _target(self) -> None:
295302
self._queue.task_done()
296303
break
297304
# Firing tasks instead of awaiting them allows for concurrent requests
298-
task = asyncio.create_task(self._process_callback(callback))
305+
with mark_sentry_task_internal():
306+
task = asyncio.create_task(
307+
self._process_callback(callback),
308+
name="sentry_sdk_async_worker_process_callback",
309+
)
299310
# Create a strong reference to the task so it can be cancelled on kill
300311
# and does not get garbage collected while running
301312
self._active_tasks.add(task)

tests/integrations/asyncio/test_asyncio.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import sentry_sdk
99
from sentry_sdk.consts import OP
1010
from sentry_sdk.integrations.asyncio import AsyncioIntegration, patch_asyncio
11+
from sentry_sdk.utils import mark_sentry_task_internal
12+
1113

1214
try:
1315
from contextvars import Context, ContextVar
@@ -426,3 +428,52 @@ def test_loop_close_flushes_async_transport(sentry_init):
426428
except Exception:
427429
if not loop.is_closed():
428430
loop.close()
431+
432+
433+
@minimum_python_38
434+
@pytest.mark.asyncio
435+
async def test_internal_tasks_not_wrapped(sentry_init, capture_events):
436+
437+
sentry_init(integrations=[AsyncioIntegration()], traces_sample_rate=1.0)
438+
events = capture_events()
439+
440+
# Create a user task that should be wrapped
441+
async def user_task():
442+
await asyncio.sleep(0.01)
443+
return "user_result"
444+
445+
# Create an internal task that should NOT be wrapped
446+
async def internal_task():
447+
await asyncio.sleep(0.01)
448+
return "internal_result"
449+
450+
with sentry_sdk.start_transaction(name="test_transaction"):
451+
user_task_obj = asyncio.create_task(user_task())
452+
453+
with mark_sentry_task_internal():
454+
internal_task_obj = asyncio.create_task(internal_task())
455+
456+
user_result = await user_task_obj
457+
internal_result = await internal_task_obj
458+
459+
assert user_result == "user_result"
460+
assert internal_result == "internal_result"
461+
462+
assert len(events) == 1
463+
transaction = events[0]
464+
465+
user_spans = []
466+
internal_spans = []
467+
468+
for span in transaction.get("spans", []):
469+
if "user_task" in span.get("description", ""):
470+
user_spans.append(span)
471+
elif "internal_task" in span.get("description", ""):
472+
internal_spans.append(span)
473+
474+
assert (
475+
len(user_spans) > 0
476+
), f"User task should have been traced. All spans: {[s.get('description') for s in transaction.get('spans', [])]}"
477+
assert (
478+
len(internal_spans) == 0
479+
), f"Internal task should NOT have been traced. All spans: {[s.get('description') for s in transaction.get('spans', [])]}"

0 commit comments

Comments
 (0)