Skip to content

Commit 7889074

Browse files
srothhsl0thentr0py
andauthored
Add async task background worker (#4591)
Add a new implementation of the transport background worker based on an async task. This worker mostly mirrors the same functionality as the thread-based worker, with the exception that it exposes a non-blocking async flush (which can be awaited from an async context). Furthermore, the worker itself is not thread-safe and should be called using [run_coroutine_threadsafe](https://docs.python.org/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe) or similar when called from another thread (this is fixed and handled by the transport). I have kept the fork check from the threaded worker, but I am not sure if it is necessary as forking in an async application would also break the event loop. GH-4581 --------- Co-authored-by: Neel Shah <[email protected]>
1 parent c4a986b commit 7889074

File tree

1 file changed

+132
-0
lines changed

1 file changed

+132
-0
lines changed

sentry_sdk/worker.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from abc import ABC, abstractmethod
33
import os
44
import threading
5+
import asyncio
56

67
from time import sleep, time
78
from sentry_sdk._queue import Queue, FullError
@@ -186,3 +187,134 @@ def _target(self) -> None:
186187
finally:
187188
self._queue.task_done()
188189
sleep(0)
190+
191+
192+
class AsyncWorker(Worker):
193+
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
194+
self._queue: Optional[asyncio.Queue[Any]] = None
195+
self._queue_size = queue_size
196+
self._task: Optional[asyncio.Task[None]] = None
197+
# Event loop needs to remain in the same process
198+
self._task_for_pid: Optional[int] = None
199+
self._loop: Optional[asyncio.AbstractEventLoop] = None
200+
# Track active callback tasks so they have a strong reference and can be cancelled on kill
201+
self._active_tasks: set[asyncio.Task[None]] = set()
202+
203+
@property
204+
def is_alive(self) -> bool:
205+
if self._task_for_pid != os.getpid():
206+
return False
207+
if not self._task or not self._loop:
208+
return False
209+
return self._loop.is_running() and not self._task.done()
210+
211+
def kill(self) -> None:
212+
if self._task:
213+
if self._queue is not None:
214+
try:
215+
self._queue.put_nowait(_TERMINATOR)
216+
except asyncio.QueueFull:
217+
logger.debug("async worker queue full, kill failed")
218+
# Also cancel any active callback tasks
219+
# Avoid modifying the set while cancelling tasks
220+
tasks_to_cancel = set(self._active_tasks)
221+
for task in tasks_to_cancel:
222+
task.cancel()
223+
self._active_tasks.clear()
224+
self._loop = None
225+
self._task = None
226+
self._task_for_pid = None
227+
228+
def start(self) -> None:
229+
if not self.is_alive:
230+
try:
231+
self._loop = asyncio.get_running_loop()
232+
if self._queue is None:
233+
self._queue = asyncio.Queue(maxsize=self._queue_size)
234+
self._task = self._loop.create_task(self._target())
235+
self._task_for_pid = os.getpid()
236+
except RuntimeError:
237+
# There is no event loop running
238+
logger.warning("No event loop running, async worker not started")
239+
self._loop = None
240+
self._task = None
241+
self._task_for_pid = None
242+
243+
def full(self) -> bool:
244+
if self._queue is None:
245+
return True
246+
return self._queue.full()
247+
248+
def _ensure_task(self) -> None:
249+
if not self.is_alive:
250+
self.start()
251+
252+
async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None:
253+
if not self._loop or not self._loop.is_running() or self._queue is None:
254+
return
255+
256+
initial_timeout = min(0.1, timeout)
257+
258+
# Timeout on the join
259+
try:
260+
await asyncio.wait_for(self._queue.join(), timeout=initial_timeout)
261+
except asyncio.TimeoutError:
262+
pending = self._queue.qsize() + len(self._active_tasks)
263+
logger.debug("%d event(s) pending on flush", pending)
264+
if callback is not None:
265+
callback(pending, timeout)
266+
267+
try:
268+
remaining_timeout = timeout - initial_timeout
269+
await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout)
270+
except asyncio.TimeoutError:
271+
pending = self._queue.qsize() + len(self._active_tasks)
272+
logger.error("flush timed out, dropped %s events", pending)
273+
274+
def flush(self, timeout: float, callback: Optional[Any] = None) -> Optional[asyncio.Task[None]]: # type: ignore[override]
275+
if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running():
276+
return self._loop.create_task(self._wait_flush(timeout, callback))
277+
return None
278+
279+
def submit(self, callback: Callable[[], Any]) -> bool:
280+
self._ensure_task()
281+
if self._queue is None:
282+
return False
283+
try:
284+
self._queue.put_nowait(callback)
285+
return True
286+
except asyncio.QueueFull:
287+
return False
288+
289+
async def _target(self) -> None:
290+
if self._queue is None:
291+
return
292+
while True:
293+
callback = await self._queue.get()
294+
if callback is _TERMINATOR:
295+
self._queue.task_done()
296+
break
297+
# Firing tasks instead of awaiting them allows for concurrent requests
298+
task = asyncio.create_task(self._process_callback(callback))
299+
# Create a strong reference to the task so it can be cancelled on kill
300+
# and does not get garbage collected while running
301+
self._active_tasks.add(task)
302+
task.add_done_callback(self._on_task_complete)
303+
# Yield to let the event loop run other tasks
304+
await asyncio.sleep(0)
305+
306+
async def _process_callback(self, callback: Callable[[], Any]) -> None:
307+
# Callback is an async coroutine, need to await it
308+
await callback()
309+
310+
def _on_task_complete(self, task: asyncio.Task[None]) -> None:
311+
try:
312+
task.result()
313+
except Exception:
314+
logger.error("Failed processing job", exc_info=True)
315+
finally:
316+
# Mark the task as done and remove it from the active tasks set
317+
# This happens only after the task has completed
318+
if self._queue is not None:
319+
self._queue.task_done()
320+
self._active_tasks.discard(task)

0 commit comments

Comments
 (0)