-
Notifications
You must be signed in to change notification settings - Fork 568
Add async task background worker #4591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 20 commits
9f24136
001f36c
401b1bc
3f43d8f
15fa295
ef780f3
f63e46f
1804271
11da869
779a0d6
0895d23
bbf426b
744dc8a
fcc8040
9a43d9b
b5eda0e
9e380b8
ee44621
d9f7383
d2e647b
859a0e2
a0cdb4d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,9 @@ | ||
| from __future__ import annotations | ||
| from abc import ABC, abstractmethod | ||
| import os | ||
| import threading | ||
| import asyncio | ||
| import inspect | ||
|
|
||
| from time import sleep, time | ||
| from sentry_sdk._queue import Queue, FullError | ||
|
|
@@ -16,7 +19,65 @@ | |
| _TERMINATOR = object() | ||
|
|
||
|
|
||
| class BackgroundWorker: | ||
| class Worker(ABC): | ||
| """ | ||
| Base class for all workers. | ||
|
|
||
| A worker is used to process events in the background and send them to Sentry. | ||
| """ | ||
|
|
||
| @property | ||
| @abstractmethod | ||
| def is_alive(self) -> bool: | ||
| """ | ||
| Checks whether the worker is alive and running. | ||
|
|
||
| Returns True if the worker is alive, False otherwise. | ||
| """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def kill(self) -> None: | ||
| """ | ||
| Kills the worker. | ||
|
|
||
| This method is used to kill the worker. The queue will be drained up to the point where the worker is killed. | ||
| The worker will not be able to process any more events. | ||
| """ | ||
| pass | ||
|
|
||
| def flush( | ||
| self, timeout: float, callback: Optional[Callable[[int, float], Any]] = None | ||
| ) -> None: | ||
| """ | ||
| Flush the worker. | ||
|
|
||
| This method blocks until the worker has flushed all events or the specified timeout is reached. | ||
| Default implementation is a no-op, since this method may only be relevant to some workers. | ||
| Subclasses should override this method if necessary. | ||
| """ | ||
| return None | ||
|
|
||
| @abstractmethod | ||
| def full(self) -> bool: | ||
| """ | ||
| Checks whether the worker's queue is full. | ||
|
|
||
| Returns True if the queue is full, False otherwise. | ||
| """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def submit(self, callback: Callable[[], Any]) -> bool: | ||
| """ | ||
| Schedule a callback to be executed by the worker. | ||
|
|
||
| Returns True if the callback was scheduled, False if the queue is full. | ||
| """ | ||
| pass | ||
|
|
||
|
|
||
| class BackgroundWorker(Worker): | ||
| def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: | ||
| self._queue: Queue = Queue(queue_size) | ||
| self._lock = threading.Lock() | ||
|
|
@@ -106,7 +167,7 @@ def _wait_flush(self, timeout: float, callback: Optional[Any]) -> None: | |
| pending = self._queue.qsize() + 1 | ||
| logger.error("flush timed out, dropped %s events", pending) | ||
|
|
||
| def submit(self, callback: Callable[[], None]) -> bool: | ||
| def submit(self, callback: Callable[[], Any]) -> bool: | ||
| self._ensure_thread() | ||
| try: | ||
| self._queue.put_nowait(callback) | ||
|
|
@@ -127,3 +188,138 @@ def _target(self) -> None: | |
| finally: | ||
| self._queue.task_done() | ||
| sleep(0) | ||
|
|
||
|
|
||
| class AsyncWorker(Worker): | ||
| def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: | ||
| self._queue: Optional[asyncio.Queue[Any]] = None | ||
| self._queue_size = queue_size | ||
| self._task: Optional[asyncio.Task[None]] = None | ||
| # Event loop needs to remain in the same process | ||
| self._task_for_pid: Optional[int] = None | ||
| self._loop: Optional[asyncio.AbstractEventLoop] = None | ||
| # Track active callback tasks so they have a strong reference and can be cancelled on kill | ||
| self._active_tasks: set[asyncio.Task[None]] = set() | ||
|
|
||
| @property | ||
| def is_alive(self) -> bool: | ||
| if self._task_for_pid != os.getpid(): | ||
| return False | ||
| if not self._task or not self._loop: | ||
| return False | ||
| return self._loop.is_running() and not self._task.done() | ||
|
|
||
| def kill(self) -> None: | ||
| if self._task: | ||
| if self._queue is not None: | ||
| try: | ||
| self._queue.put_nowait(_TERMINATOR) | ||
| except asyncio.QueueFull: | ||
| logger.debug("async worker queue full, kill failed") | ||
| # Also cancel any active callback tasks | ||
| # Avoid modifying the set while cancelling tasks | ||
| tasks_to_cancel = set(self._active_tasks) | ||
| for task in tasks_to_cancel: | ||
| task.cancel() | ||
| self._active_tasks.clear() | ||
| self._loop = None | ||
| self._task = None | ||
| self._task_for_pid = None | ||
|
|
||
| def start(self) -> None: | ||
| if not self.is_alive: | ||
| try: | ||
| self._loop = asyncio.get_running_loop() | ||
| if self._queue is None: | ||
| self._queue = asyncio.Queue(maxsize=self._queue_size) | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self._task = self._loop.create_task(self._target()) | ||
| self._task_for_pid = os.getpid() | ||
| except RuntimeError: | ||
| # There is no event loop running | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| logger.warning("No event loop running, async worker not started") | ||
| self._loop = None | ||
| self._task = None | ||
| self._task_for_pid = None | ||
|
|
||
| def full(self) -> bool: | ||
| if self._queue is None: | ||
| return True | ||
| return self._queue.full() | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def _ensure_task(self) -> None: | ||
| if not self.is_alive: | ||
| self.start() | ||
|
|
||
| async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None: | ||
| if not self._loop or not self._loop.is_running() or self._queue is None: | ||
| return | ||
|
|
||
| initial_timeout = min(0.1, timeout) | ||
|
|
||
| # Timeout on the join | ||
| try: | ||
| await asyncio.wait_for(self._queue.join(), timeout=initial_timeout) | ||
| except asyncio.TimeoutError: | ||
| pending = self._queue.qsize() + len(self._active_tasks) | ||
| logger.debug("%d event(s) pending on flush", pending) | ||
| if callback is not None: | ||
| callback(pending, timeout) | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| try: | ||
| remaining_timeout = timeout - initial_timeout | ||
| await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout) | ||
| except asyncio.TimeoutError: | ||
| pending = self._queue.qsize() + len(self._active_tasks) | ||
| logger.error("flush timed out, dropped %s events", pending) | ||
|
|
||
| def flush(self, timeout: float, callback: Optional[Any] = None) -> Optional[asyncio.Task[None]]: # type: ignore[override] | ||
| if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running(): | ||
| return self._loop.create_task(self._wait_flush(timeout, callback)) | ||
| return None | ||
|
|
||
| def submit(self, callback: Callable[[], Any]) -> bool: | ||
| self._ensure_task() | ||
| if self._queue is None: | ||
| return False | ||
| try: | ||
| self._queue.put_nowait(callback) | ||
| return True | ||
| except asyncio.QueueFull: | ||
| return False | ||
|
|
||
| async def _target(self) -> None: | ||
| if self._queue is None: | ||
| return | ||
| while True: | ||
| callback = await self._queue.get() | ||
| if callback is _TERMINATOR: | ||
| self._queue.task_done() | ||
| break | ||
| # Firing tasks instead of awaiting them allows for concurrent requests | ||
| task = asyncio.create_task(self._process_callback(callback)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this one is
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. _target is the function actually running in the task, whereas other parts where tasks get instantiated are called from the transport. This means that running it directly with asyncio will reference the correct loop that is executing target. I believe using self._loop here could introduce a race, as kill() resets the loop reference to None, which can happen before the task _submit reaches the TERMINATOR
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (for this reason also the other functions introduce an is_alive() check) |
||
| # Create a strong reference to the task so it can be cancelled on kill | ||
| # and does not get garbage collected while running | ||
| self._active_tasks.add(task) | ||
| task.add_done_callback(self._on_task_complete) | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # Yield to let the event loop run other tasks | ||
| await asyncio.sleep(0) | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| async def _process_callback(self, callback: Callable[[], Any]) -> None: | ||
| # Callback is an async coroutine, need to await it | ||
| if inspect.iscoroutinefunction(callback): | ||
| await callback() | ||
| else: | ||
| # Callback is a sync function, such as _flush_client_reports() | ||
| callback() | ||
antonpirker marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def _on_task_complete(self, task: asyncio.Task[None]) -> None: | ||
| try: | ||
| task.result() | ||
| except Exception: | ||
| logger.error("Failed processing job", exc_info=True) | ||
| finally: | ||
| # Mark the task as done and remove it from the active tasks set | ||
| # This happens only after the task has completed | ||
| if self._queue is not None: | ||
| self._queue.task_done() | ||
| self._active_tasks.discard(task) | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Uh oh!
There was an error while loading. Please reload this page.