Skip to content

Commit c4a986b

Browse files
authored
Add transport worker factory and abstract base class for background worker (#4580)
Add a new abstract base class for the background worker implementations in the transport. This allows for implementation of the upcoming async task-based worker in addition to the current thread based worker used in the sync context. Additionally, add a factory method in the HttpTransportCore shared class, to allow the worker methods to live higher up in the class hierarchy regardless of specific implementation. GH-4578
1 parent 4e56e5c commit c4a986b

File tree

2 files changed

+67
-4
lines changed

2 files changed

+67
-4
lines changed

sentry_sdk/transport.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
from sentry_sdk.consts import EndpointType
3030
from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
31-
from sentry_sdk.worker import BackgroundWorker
31+
from sentry_sdk.worker import BackgroundWorker, Worker
3232
from sentry_sdk.envelope import Envelope, Item, PayloadRef
3333

3434
from typing import TYPE_CHECKING
@@ -173,7 +173,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
173173
Transport.__init__(self, options)
174174
assert self.parsed_dsn is not None
175175
self.options: Dict[str, Any] = options
176-
self._worker = BackgroundWorker(queue_size=options["transport_queue_size"])
176+
self._worker = self._create_worker(options)
177177
self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
178178
self._disabled_until: Dict[Optional[str], datetime] = {}
179179
# We only use this Retry() class for the `get_retry_after` method it exposes
@@ -224,6 +224,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
224224
elif self._compression_algo == "br":
225225
self._compression_level = 4
226226

227+
def _create_worker(self: Self, options: Dict[str, Any]) -> Worker:
228+
# For now, we only support the threaded sync background worker.
229+
return BackgroundWorker(queue_size=options["transport_queue_size"])
230+
227231
def record_lost_event(
228232
self: Self,
229233
reason: str,

sentry_sdk/worker.py

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from __future__ import annotations
2+
from abc import ABC, abstractmethod
23
import os
34
import threading
45

@@ -16,7 +17,65 @@
1617
_TERMINATOR = object()
1718

1819

19-
class BackgroundWorker:
20+
class Worker(ABC):
21+
"""
22+
Base class for all workers.
23+
24+
A worker is used to process events in the background and send them to Sentry.
25+
"""
26+
27+
@property
28+
@abstractmethod
29+
def is_alive(self) -> bool:
30+
"""
31+
Checks whether the worker is alive and running.
32+
33+
Returns True if the worker is alive, False otherwise.
34+
"""
35+
pass
36+
37+
@abstractmethod
38+
def kill(self) -> None:
39+
"""
40+
Kills the worker.
41+
42+
This method is used to kill the worker. The queue will be drained up to the point where the worker is killed.
43+
The worker will not be able to process any more events.
44+
"""
45+
pass
46+
47+
def flush(
48+
self, timeout: float, callback: Optional[Callable[[int, float], Any]] = None
49+
) -> None:
50+
"""
51+
Flush the worker.
52+
53+
This method blocks until the worker has flushed all events or the specified timeout is reached.
54+
Default implementation is a no-op, since this method may only be relevant to some workers.
55+
Subclasses should override this method if necessary.
56+
"""
57+
return None
58+
59+
@abstractmethod
60+
def full(self) -> bool:
61+
"""
62+
Checks whether the worker's queue is full.
63+
64+
Returns True if the queue is full, False otherwise.
65+
"""
66+
pass
67+
68+
@abstractmethod
69+
def submit(self, callback: Callable[[], Any]) -> bool:
70+
"""
71+
Schedule a callback to be executed by the worker.
72+
73+
Returns True if the callback was scheduled, False if the queue is full.
74+
"""
75+
pass
76+
77+
78+
class BackgroundWorker(Worker):
2079
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
2180
self._queue: Queue = Queue(queue_size)
2281
self._lock = threading.Lock()
@@ -106,7 +165,7 @@ def _wait_flush(self, timeout: float, callback: Optional[Any]) -> None:
106165
pending = self._queue.qsize() + 1
107166
logger.error("flush timed out, dropped %s events", pending)
108167

109-
def submit(self, callback: Callable[[], None]) -> bool:
168+
def submit(self, callback: Callable[[], Any]) -> bool:
110169
self._ensure_thread()
111170
try:
112171
self._queue.put_nowait(callback)

0 commit comments

Comments
 (0)