Skip to content

Commit 79a1bb6

Browse files
committed
feat(transport): Add an async task-based worker for transport
Add a new implementation of the worker interface, implementing the worker as an async task. This is to be used by the upcoming async transport. GH-4581
1 parent a6be4a3 commit 79a1bb6

File tree

1 file changed

+92
-0
lines changed

1 file changed

+92
-0
lines changed

sentry_sdk/worker.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from abc import ABC, abstractmethod
33
import os
44
import threading
5+
import asyncio
56

67
from time import sleep, time
78
from sentry_sdk._queue import Queue, FullError
@@ -169,3 +170,94 @@ def _target(self) -> None:
169170
finally:
170171
self._queue.task_done()
171172
sleep(0)
173+
174+
175+
class AsyncWorker(Worker):
176+
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
177+
self._queue: asyncio.Queue = asyncio.Queue(queue_size)
178+
self._task: Optional[asyncio.Task] = None
179+
# Event loop needs to remain in the same process
180+
self._task_for_pid: Optional[int] = None
181+
self._loop: Optional[asyncio.AbstractEventLoop] = None
182+
183+
@property
184+
def is_alive(self) -> bool:
185+
if self._task_for_pid != os.getpid():
186+
return False
187+
if not self._task or not self._loop:
188+
return False
189+
return self._loop.is_running() and not self._task.done()
190+
191+
def kill(self) -> None:
192+
if self._task:
193+
self._task.cancel()
194+
self._task = None
195+
self._task_for_pid = None
196+
197+
def start(self) -> None:
198+
if not self.is_alive:
199+
try:
200+
self._loop = asyncio.get_running_loop()
201+
self._task = self._loop.create_task(self._target())
202+
self._task_for_pid = os.getpid()
203+
except RuntimeError:
204+
# There is no event loop running
205+
self._loop = None
206+
self._task = None
207+
self._task_for_pid = None
208+
209+
def full(self) -> bool:
210+
return self._queue.full()
211+
212+
def _ensure_task(self) -> None:
213+
if not self.is_alive:
214+
self.start()
215+
216+
async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None:
217+
if not self._loop or not self._loop.is_running():
218+
return
219+
220+
initial_timeout = min(0.1, timeout)
221+
222+
# Timeout on the join
223+
try:
224+
await asyncio.wait_for(self._queue.join(), timeout=initial_timeout)
225+
except asyncio.TimeoutError:
226+
pending = self._queue.qsize() + 1
227+
logger.debug("%d event(s) pending on flush", pending)
228+
if callback is not None:
229+
callback(pending, timeout)
230+
231+
try:
232+
remaining_timeout = timeout - initial_timeout
233+
await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout)
234+
except asyncio.TimeoutError:
235+
pending = self._queue.qsize() + 1
236+
logger.error("flush timed out, dropped %s events", pending)
237+
238+
async def flush(self, timeout: float, callback: Optional[Any] = None) -> None:
239+
logger.debug("background worker got flush request")
240+
if self.is_alive and timeout > 0.0:
241+
await self._wait_flush(timeout, callback)
242+
logger.debug("background worker flushed")
243+
244+
def submit(self, callback: Callable[[], None]) -> bool:
245+
self._ensure_task()
246+
247+
try:
248+
self._queue.put_nowait(callback)
249+
return True
250+
except asyncio.QueueFull:
251+
return False
252+
253+
async def _target(self) -> None:
254+
while True:
255+
callback = await self._queue.get()
256+
try:
257+
callback()
258+
except Exception:
259+
logger.error("Failed processing job", exc_info=True)
260+
finally:
261+
self._queue.task_done()
262+
# Yield to let the event loop run other tasks
263+
await asyncio.sleep(0)

0 commit comments

Comments
 (0)