Skip to content

Commit 94d387d

Browse files
committed
feat(transport): Add an async task-based worker for transport
Add a new implementation of the worker interface, implementing the worker as an async task. This is to be used by the upcoming async transport. GH-4581
1 parent 2896602 commit 94d387d

File tree

1 file changed

+92
-0
lines changed

1 file changed

+92
-0
lines changed

sentry_sdk/worker.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from abc import ABC, abstractmethod
33
import os
44
import threading
5+
import asyncio
56

67
from time import sleep, time
78
from sentry_sdk._queue import Queue, FullError
@@ -170,3 +171,94 @@ def _target(self) -> None:
170171
finally:
171172
self._queue.task_done()
172173
sleep(0)
174+
175+
176+
class AsyncWorker(Worker):
177+
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
178+
self._queue: asyncio.Queue = asyncio.Queue(queue_size)
179+
self._task: Optional[asyncio.Task] = None
180+
# Event loop needs to remain in the same process
181+
self._task_for_pid: Optional[int] = None
182+
self._loop: Optional[asyncio.AbstractEventLoop] = None
183+
184+
@property
185+
def is_alive(self) -> bool:
186+
if self._task_for_pid != os.getpid():
187+
return False
188+
if not self._task or not self._loop:
189+
return False
190+
return self._loop.is_running() and not self._task.done()
191+
192+
def kill(self) -> None:
193+
if self._task:
194+
self._task.cancel()
195+
self._task = None
196+
self._task_for_pid = None
197+
198+
def start(self) -> None:
199+
if not self.is_alive:
200+
try:
201+
self._loop = asyncio.get_running_loop()
202+
self._task = self._loop.create_task(self._target())
203+
self._task_for_pid = os.getpid()
204+
except RuntimeError:
205+
# There is no event loop running
206+
self._loop = None
207+
self._task = None
208+
self._task_for_pid = None
209+
210+
def full(self) -> bool:
211+
return self._queue.full()
212+
213+
def _ensure_task(self) -> None:
214+
if not self.is_alive:
215+
self.start()
216+
217+
async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None:
218+
if not self._loop or not self._loop.is_running():
219+
return
220+
221+
initial_timeout = min(0.1, timeout)
222+
223+
# Timeout on the join
224+
try:
225+
await asyncio.wait_for(self._queue.join(), timeout=initial_timeout)
226+
except asyncio.TimeoutError:
227+
pending = self._queue.qsize() + 1
228+
logger.debug("%d event(s) pending on flush", pending)
229+
if callback is not None:
230+
callback(pending, timeout)
231+
232+
try:
233+
remaining_timeout = timeout - initial_timeout
234+
await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout)
235+
except asyncio.TimeoutError:
236+
pending = self._queue.qsize() + 1
237+
logger.error("flush timed out, dropped %s events", pending)
238+
239+
async def flush(self, timeout: float, callback: Optional[Any] = None) -> None:
240+
logger.debug("background worker got flush request")
241+
if self.is_alive and timeout > 0.0:
242+
await self._wait_flush(timeout, callback)
243+
logger.debug("background worker flushed")
244+
245+
def submit(self, callback: Callable[[], None]) -> bool:
246+
self._ensure_task()
247+
248+
try:
249+
self._queue.put_nowait(callback)
250+
return True
251+
except asyncio.QueueFull:
252+
return False
253+
254+
async def _target(self) -> None:
255+
while True:
256+
callback = await self._queue.get()
257+
try:
258+
callback()
259+
except Exception:
260+
logger.error("Failed processing job", exc_info=True)
261+
finally:
262+
self._queue.task_done()
263+
# Yield to let the event loop run other tasks
264+
await asyncio.sleep(0)

0 commit comments

Comments
 (0)