Skip to content

Commit b5eda0e

Browse files
committed
ref(worker): Refactor implementation to incorporate feedback
Refactor worker implementation to simplify callback processing, fix pending calculation and improve queue initialisation. GH-4581
1 parent 9a43d9b commit b5eda0e

File tree

1 file changed

+8
-10
lines changed

1 file changed

+8
-10
lines changed

sentry_sdk/worker.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import os
44
import threading
55
import asyncio
6-
import inspect
76

87
from time import sleep, time
98
from sentry_sdk._queue import Queue, FullError
@@ -192,7 +191,7 @@ def _target(self) -> None:
192191

193192
class AsyncWorker(Worker):
194193
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
195-
self._queue: asyncio.Queue[Any] = asyncio.Queue(queue_size)
194+
self._queue: asyncio.Queue[Any] = None
196195
self._task: Optional[asyncio.Task[None]] = None
197196
# Event loop needs to remain in the same process
198197
self._task_for_pid: Optional[int] = None
@@ -228,10 +227,13 @@ def start(self) -> None:
228227
if not self.is_alive:
229228
try:
230229
self._loop = asyncio.get_running_loop()
230+
if self._queue is None:
231+
self._queue = asyncio.Queue(maxsize=self._queue_size)
231232
self._task = self._loop.create_task(self._target())
232233
self._task_for_pid = os.getpid()
233234
except RuntimeError:
234235
# There is no event loop running
236+
logger.warning("No event loop running, async worker not started")
235237
self._loop = None
236238
self._task = None
237239
self._task_for_pid = None
@@ -253,7 +255,7 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N
253255
try:
254256
await asyncio.wait_for(self._queue.join(), timeout=initial_timeout)
255257
except asyncio.TimeoutError:
256-
pending = self._queue.qsize() + 1
258+
pending = self._queue.qsize() + len(self._active_tasks)
257259
logger.debug("%d event(s) pending on flush", pending)
258260
if callback is not None:
259261
callback(pending, timeout)
@@ -262,7 +264,7 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N
262264
remaining_timeout = timeout - initial_timeout
263265
await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout)
264266
except asyncio.TimeoutError:
265-
pending = self._queue.qsize() + 1
267+
pending = self._queue.qsize() + len(self._active_tasks)
266268
logger.error("flush timed out, dropped %s events", pending)
267269

268270
async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> None:
@@ -296,12 +298,8 @@ async def _target(self) -> None:
296298
await asyncio.sleep(0)
297299

298300
async def _process_callback(self, callback: Callable[[], Any]) -> None:
299-
if inspect.iscoroutinefunction(callback):
300-
# Callback is an async coroutine, need to await it
301-
await callback()
302-
else:
303-
# Callback is a sync function, need to call it
304-
callback()
301+
# Callback is an async coroutine, need to await it
302+
await callback()
305303

306304
def _on_task_complete(self, task: asyncio.Task[None]) -> None:
307305
try:

0 commit comments

Comments
 (0)