Skip to content

Commit fcc8040

Browse files
committed
fix(worker): Modify kill behaviour to mirror threaded worker
Changed kill to also use the _TERMINATOR sentinel, so the queue is still drained to this point on kill instead of cancelled immediately. This should also fix potential race conditions with flush_async. GH-4581
1 parent 744dc8a commit fcc8040

File tree

1 file changed

+10
-4
lines changed

1 file changed

+10
-4
lines changed

sentry_sdk/worker.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ def _target(self) -> None:
192192

193193
class AsyncWorker(Worker):
194194
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
195-
self._queue: asyncio.Queue[Callable[[], Any]] = asyncio.Queue(queue_size)
195+
self._queue: asyncio.Queue[Any] = asyncio.Queue(queue_size)
196196
self._task: Optional[asyncio.Task[None]] = None
197197
# Event loop needs to remain in the same process
198198
self._task_for_pid: Optional[int] = None
@@ -210,16 +210,19 @@ def is_alive(self) -> bool:
210210

211211
def kill(self) -> None:
212212
if self._task:
213-
self._task.cancel()
214-
self._task = None
215-
self._task_for_pid = None
213+
try:
214+
self._queue.put_nowait(_TERMINATOR)
215+
except asyncio.QueueFull:
216+
logger.debug("async worker queue full, kill failed")
216217
# Also cancel any active callback tasks
217218
# Avoid modifying the set while cancelling tasks
218219
tasks_to_cancel = set(self._active_tasks)
219220
for task in tasks_to_cancel:
220221
task.cancel()
221222
self._active_tasks.clear()
222223
self._loop = None
224+
self._task = None
225+
self._task_for_pid = None
223226

224227
def start(self) -> None:
225228
if not self.is_alive:
@@ -280,6 +283,9 @@ def submit(self, callback: Callable[[], Any]) -> bool:
280283
async def _target(self) -> None:
281284
while True:
282285
callback = await self._queue.get()
286+
if callback is _TERMINATOR:
287+
self._queue.task_done()
288+
break
283289
# Firing tasks instead of awaiting them allows for concurrent requests
284290
task = asyncio.create_task(self._process_callback(callback))
285291
# Create a strong reference to the task so it can be cancelled on kill

0 commit comments

Comments
 (0)