Skip to content

Commit f63e46f

Browse files
committed
feat(transport): Add an async task-based worker for transport
Add a new implementation of the worker interface, implementing the worker as an async task. This is to be used by the upcoming async transport. GH-4581
1 parent ef780f3 commit f63e46f

File tree

1 file changed

+92
-0
lines changed

1 file changed

+92
-0
lines changed

sentry_sdk/worker.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from abc import ABC, abstractmethod
33
import os
44
import threading
5+
import asyncio
56

67
from time import sleep, time
78
from sentry_sdk._queue import Queue, FullError
@@ -186,3 +187,94 @@ def _target(self) -> None:
186187
finally:
187188
self._queue.task_done()
188189
sleep(0)
190+
191+
192+
class AsyncWorker(Worker):
193+
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
194+
self._queue: asyncio.Queue = asyncio.Queue(queue_size)
195+
self._task: Optional[asyncio.Task] = None
196+
# Event loop needs to remain in the same process
197+
self._task_for_pid: Optional[int] = None
198+
self._loop: Optional[asyncio.AbstractEventLoop] = None
199+
200+
@property
201+
def is_alive(self) -> bool:
202+
if self._task_for_pid != os.getpid():
203+
return False
204+
if not self._task or not self._loop:
205+
return False
206+
return self._loop.is_running() and not self._task.done()
207+
208+
def kill(self) -> None:
209+
if self._task:
210+
self._task.cancel()
211+
self._task = None
212+
self._task_for_pid = None
213+
214+
def start(self) -> None:
215+
if not self.is_alive:
216+
try:
217+
self._loop = asyncio.get_running_loop()
218+
self._task = self._loop.create_task(self._target())
219+
self._task_for_pid = os.getpid()
220+
except RuntimeError:
221+
# There is no event loop running
222+
self._loop = None
223+
self._task = None
224+
self._task_for_pid = None
225+
226+
def full(self) -> bool:
227+
return self._queue.full()
228+
229+
def _ensure_task(self) -> None:
230+
if not self.is_alive:
231+
self.start()
232+
233+
async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None:
234+
if not self._loop or not self._loop.is_running():
235+
return
236+
237+
initial_timeout = min(0.1, timeout)
238+
239+
# Timeout on the join
240+
try:
241+
await asyncio.wait_for(self._queue.join(), timeout=initial_timeout)
242+
except asyncio.TimeoutError:
243+
pending = self._queue.qsize() + 1
244+
logger.debug("%d event(s) pending on flush", pending)
245+
if callback is not None:
246+
callback(pending, timeout)
247+
248+
try:
249+
remaining_timeout = timeout - initial_timeout
250+
await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout)
251+
except asyncio.TimeoutError:
252+
pending = self._queue.qsize() + 1
253+
logger.error("flush timed out, dropped %s events", pending)
254+
255+
async def flush(self, timeout: float, callback: Optional[Any] = None) -> None:
256+
logger.debug("background worker got flush request")
257+
if self.is_alive and timeout > 0.0:
258+
await self._wait_flush(timeout, callback)
259+
logger.debug("background worker flushed")
260+
261+
def submit(self, callback: Callable[[], None]) -> bool:
262+
self._ensure_task()
263+
264+
try:
265+
self._queue.put_nowait(callback)
266+
return True
267+
except asyncio.QueueFull:
268+
return False
269+
270+
async def _target(self) -> None:
271+
while True:
272+
callback = await self._queue.get()
273+
try:
274+
callback()
275+
except Exception:
276+
logger.error("Failed processing job", exc_info=True)
277+
finally:
278+
self._queue.task_done()
279+
# Yield to let the event loop run other tasks
280+
await asyncio.sleep(0)

0 commit comments

Comments
 (0)