Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9f24136
ref(transport): Add abstract base class for worker implementation
srothh Jul 14, 2025
001f36c
ref(transport): Add _create_worker factory method to Transport
srothh Jul 14, 2025
401b1bc
ref(worker): Add flush_async method to Worker ABC
srothh Jul 17, 2025
3f43d8f
ref(worker): Move worker flush_async from Worker ABC
srothh Jul 17, 2025
15fa295
ref(worker): Amend function signature for coroutines
srothh Jul 17, 2025
ef780f3
ref(worker): Add missing docstrings to worker ABC
srothh Jul 24, 2025
f63e46f
feat(transport): Add an async task-based worker for transport
srothh Jul 17, 2025
1804271
ref(worker): Make worker work with new ABC interface
srothh Jul 17, 2025
11da869
fix(worker): Check if callbacks from worker queue are coroutines or f…
srothh Jul 17, 2025
779a0d6
ref(worker): Amend return type of submit and flush to accomodate for …
srothh Jul 17, 2025
0895d23
ref(worker): Add type parameters for AsyncWorker variables
srothh Jul 17, 2025
bbf426b
ref(worker): Remove loop upon killing worker
srothh Jul 17, 2025
744dc8a
feat(worker): Enable concurrent callbacks on async task worker
srothh Jul 18, 2025
fcc8040
fix(worker): Modify kill behaviour to mirror threaded worker
srothh Jul 18, 2025
9a43d9b
ref(worker): add proper type annotation to worker task list
srothh Jul 21, 2025
b5eda0e
ref(worker): Refactor implementation to incorporate feedback
srothh Jul 30, 2025
9e380b8
ref(worker): fix queue initialization
srothh Jul 30, 2025
ee44621
ref(worker): Add queue as optional to allow for initialisation in start
srothh Jul 30, 2025
d9f7383
ref(worker): Change to sync flush method that launches task
srothh Jul 30, 2025
d2e647b
ref(worker): Readd coroutine check for worker callbacks
srothh Jul 30, 2025
859a0e2
ref(worker): Remove sync callbacks from worker processing for now
srothh Jul 31, 2025
a0cdb4d
Merge remote-tracking branch 'origin/srothh/transport-class-hierarchy…
sl0thentr0py Aug 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions sentry_sdk/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from sentry_sdk.consts import EndpointType
from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
from sentry_sdk.worker import BackgroundWorker
from sentry_sdk.worker import BackgroundWorker, Worker
from sentry_sdk.envelope import Envelope, Item, PayloadRef

from typing import TYPE_CHECKING
Expand Down Expand Up @@ -173,7 +173,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
Transport.__init__(self, options)
assert self.parsed_dsn is not None
self.options: Dict[str, Any] = options
self._worker = BackgroundWorker(queue_size=options["transport_queue_size"])
self._worker = self._create_worker(options)
self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
self._disabled_until: Dict[Optional[str], datetime] = {}
# We only use this Retry() class for the `get_retry_after` method it exposes
Expand Down Expand Up @@ -224,6 +224,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
elif self._compression_algo == "br":
self._compression_level = 4

def _create_worker(self: Self, options: Dict[str, Any]) -> Worker:
# For now, we only support the threaded sync background worker.
return BackgroundWorker(queue_size=options["transport_queue_size"])

def record_lost_event(
self: Self,
reason: str,
Expand Down
196 changes: 194 additions & 2 deletions sentry_sdk/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations
from abc import ABC, abstractmethod
import os
import threading
import asyncio

from time import sleep, time
from sentry_sdk._queue import Queue, FullError
Expand All @@ -16,7 +18,65 @@
_TERMINATOR = object()


class BackgroundWorker:
class Worker(ABC):
"""
Base class for all workers.

A worker is used to process events in the background and send them to Sentry.
"""

@property
@abstractmethod
def is_alive(self) -> bool:
"""
Checks whether the worker is alive and running.

Returns True if the worker is alive, False otherwise.
"""
pass

@abstractmethod
def kill(self) -> None:
"""
Kills the worker.

This method is used to kill the worker. The queue will be drained up to the point where the worker is killed.
The worker will not be able to process any more events.
"""
pass

def flush(
self, timeout: float, callback: Optional[Callable[[int, float], Any]] = None
) -> None:
"""
Flush the worker.

This method blocks until the worker has flushed all events or the specified timeout is reached.
Default implementation is a no-op, since this method may only be relevant to some workers.
Subclasses should override this method if necessary.
"""
return None

@abstractmethod
def full(self) -> bool:
"""
Checks whether the worker's queue is full.

Returns True if the queue is full, False otherwise.
"""
pass

@abstractmethod
def submit(self, callback: Callable[[], Any]) -> bool:
"""
Schedule a callback to be executed by the worker.

Returns True if the callback was scheduled, False if the queue is full.
"""
pass


class BackgroundWorker(Worker):
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
self._queue: Queue = Queue(queue_size)
self._lock = threading.Lock()
Expand Down Expand Up @@ -106,7 +166,7 @@ def _wait_flush(self, timeout: float, callback: Optional[Any]) -> None:
pending = self._queue.qsize() + 1
logger.error("flush timed out, dropped %s events", pending)

def submit(self, callback: Callable[[], None]) -> bool:
def submit(self, callback: Callable[[], Any]) -> bool:
self._ensure_thread()
try:
self._queue.put_nowait(callback)
Expand All @@ -127,3 +187,135 @@ def _target(self) -> None:
finally:
self._queue.task_done()
sleep(0)


class AsyncWorker(Worker):
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
self._queue: Optional[asyncio.Queue[Any]] = None
self._queue_size = queue_size
self._task: Optional[asyncio.Task[None]] = None
# Event loop needs to remain in the same process
self._task_for_pid: Optional[int] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
# Track active callback tasks so they have a strong reference and can be cancelled on kill
self._active_tasks: set[asyncio.Task[None]] = set()

@property
def is_alive(self) -> bool:
if self._task_for_pid != os.getpid():
return False
if not self._task or not self._loop:
return False
return self._loop.is_running() and not self._task.done()

def kill(self) -> None:
if self._task:
if self._queue is not None:
try:
self._queue.put_nowait(_TERMINATOR)
except asyncio.QueueFull:
logger.debug("async worker queue full, kill failed")
# Also cancel any active callback tasks
# Avoid modifying the set while cancelling tasks
tasks_to_cancel = set(self._active_tasks)
for task in tasks_to_cancel:
task.cancel()
self._active_tasks.clear()
self._loop = None
self._task = None
self._task_for_pid = None

def start(self) -> None:
if not self.is_alive:
try:
self._loop = asyncio.get_running_loop()
if self._queue is None:
self._queue = asyncio.Queue(maxsize=self._queue_size)
self._task = self._loop.create_task(self._target())
self._task_for_pid = os.getpid()
except RuntimeError:
# There is no event loop running
logger.warning("No event loop running, async worker not started")
self._loop = None
self._task = None
self._task_for_pid = None

def full(self) -> bool:
if self._queue is None:
return True
return self._queue.full()

def _ensure_task(self) -> None:
if not self.is_alive:
self.start()

async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None:
if not self._loop or not self._loop.is_running() or self._queue is None:
return

initial_timeout = min(0.1, timeout)

# Timeout on the join
try:
await asyncio.wait_for(self._queue.join(), timeout=initial_timeout)
except asyncio.TimeoutError:
pending = self._queue.qsize() + len(self._active_tasks)
logger.debug("%d event(s) pending on flush", pending)
if callback is not None:
callback(pending, timeout)

try:
remaining_timeout = timeout - initial_timeout
await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout)
except asyncio.TimeoutError:
pending = self._queue.qsize() + len(self._active_tasks)
logger.error("flush timed out, dropped %s events", pending)

async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> None:
logger.debug("background worker got flush request")
if self.is_alive and timeout > 0.0:
await self._wait_flush(timeout, callback)
logger.debug("background worker flushed")

def submit(self, callback: Callable[[], Any]) -> bool:
self._ensure_task()
if self._queue is None:
return False
try:
self._queue.put_nowait(callback)
return True
except asyncio.QueueFull:
return False

async def _target(self) -> None:
if self._queue is None:
return
while True:
callback = await self._queue.get()
if callback is _TERMINATOR:
self._queue.task_done()
break
# Firing tasks instead of awaiting them allows for concurrent requests
task = asyncio.create_task(self._process_callback(callback))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is asyncio.create_task while all the others use self._loop directly. Is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_target is the function actually running in the task, whereas other parts where tasks get instantiated are called from the transport. This means that running it directly with asyncio will reference the correct loop that is executing target. I believe using self._loop here could introduce a race, as kill() resets the loop reference to None, which can happen before the task _submit reaches the TERMINATOR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(for this reason also the other functions introduce an is_alive() check)

# Create a strong reference to the task so it can be cancelled on kill
# and does not get garbage collected while running
self._active_tasks.add(task)
task.add_done_callback(self._on_task_complete)
# Yield to let the event loop run other tasks
await asyncio.sleep(0)

async def _process_callback(self, callback: Callable[[], Any]) -> None:
# Callback is an async coroutine, need to await it
await callback()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: AsyncWorker Awaits Sync Callbacks

The AsyncWorker's _process_callback method incorrectly awaits the callback. This violates the Worker interface, which expects synchronous callbacks (as accepted by the submit method), causing runtime TypeErrors when synchronous functions are submitted.

Locations (1)
Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed previously


def _on_task_complete(self, task: asyncio.Task[None]) -> None:
try:
task.result()
except Exception:
logger.error("Failed processing job", exc_info=True)
finally:
# Mark the task as done and remove it from the active tasks set
# This happens only after the task has completed
if self._queue is not None:
self._queue.task_done()
self._active_tasks.discard(task)
Loading