Skip to content

Commit 86d6e36

Browse files
BYKclaude
andcommitted
fix: Address Bugbot feedback — stale terminator and flush components
- AsyncWorker.kill(): Reset queue to None instead of putting a stale _TERMINATOR (since we now cancel the task directly, the terminator was never consumed and would break restart) - close() with async transport: Call _flush_components() to flush session flusher, log/metrics/span batchers even when sync flush is skipped - Update test to verify fresh queue creation after kill Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8883b78 commit 86d6e36

File tree

3 files changed

+31
-28
lines changed

3 files changed

+31
-28
lines changed

sentry_sdk/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,6 +1045,7 @@ def close(
10451045
"Prefer close_async() for graceful async shutdown. "
10461046
"Performing synchronous best-effort cleanup."
10471047
)
1048+
self._flush_components()
10481049
else:
10491050
self.flush(timeout=timeout, callback=callback)
10501051
self._close_components()

sentry_sdk/worker.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,17 +213,14 @@ def kill(self) -> None:
213213
if self._task:
214214
# Cancel the main consumer task to prevent duplicate consumers
215215
self._task.cancel()
216-
if self._queue is not None:
217-
try:
218-
self._queue.put_nowait(_TERMINATOR)
219-
except asyncio.QueueFull:
220-
logger.debug("async worker queue full, kill failed")
221216
# Also cancel any active callback tasks
222217
# Avoid modifying the set while cancelling tasks
223218
tasks_to_cancel = set(self._active_tasks)
224219
for task in tasks_to_cancel:
225220
task.cancel()
226221
self._active_tasks.clear()
222+
# Reset queue to avoid stale terminators on restart
223+
self._queue = None
227224
self._loop = None
228225
self._task = None
229226
self._task_for_pid = None

tests/test_transport.py

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,24 +1028,27 @@ async def test_async_transport_background_thread_capture(
10281028
client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()])
10291029
assert isinstance(client.transport, AsyncHttpTransport)
10301030
sentry_sdk.get_global_scope().set_client(client)
1031-
captured_from_thread = []
1032-
exception_from_thread = []
1033-
1034-
def background_thread_work():
1035-
try:
1036-
# This should use run_coroutine_threadsafe path
1037-
capture_message("from background thread")
1038-
captured_from_thread.append(True)
1039-
except Exception as e:
1040-
exception_from_thread.append(e)
1041-
1042-
thread = threading.Thread(target=background_thread_work)
1043-
thread.start()
1044-
thread.join()
1045-
assert not exception_from_thread
1046-
assert captured_from_thread
1047-
await client.close_async(timeout=2.0)
1048-
assert capturing_server.captured
1031+
try:
1032+
captured_from_thread = []
1033+
exception_from_thread = []
1034+
1035+
def background_thread_work():
1036+
try:
1037+
# This should use run_coroutine_threadsafe path
1038+
capture_message("from background thread")
1039+
captured_from_thread.append(True)
1040+
except Exception as e:
1041+
exception_from_thread.append(e)
1042+
1043+
thread = threading.Thread(target=background_thread_work)
1044+
thread.start()
1045+
thread.join()
1046+
assert not exception_from_thread
1047+
assert captured_from_thread
1048+
await client.close_async(timeout=2.0)
1049+
assert capturing_server.captured
1050+
finally:
1051+
sentry_sdk.get_global_scope().set_client(None)
10491052

10501053

10511054
@skip_under_gevent
@@ -1276,18 +1279,20 @@ def test_async_worker_start_no_running_loop():
12761279
@pytest.mark.asyncio
12771280
@pytest.mark.skipif(not PY38, reason="AsyncWorker requires Python 3.8+")
12781281
@pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning")
1279-
async def test_async_worker_start_reuses_existing_queue():
1280-
"""Test start() reuses existing queue if already created."""
1282+
async def test_async_worker_start_creates_fresh_queue_after_kill():
1283+
"""Test start() creates a fresh queue after kill() resets it."""
12811284
from sentry_sdk.worker import AsyncWorker
12821285

12831286
worker = AsyncWorker(queue_size=10)
12841287
worker.start()
1285-
queue_ref = worker._queue
1286-
# Kill and restart — queue should be reused
1288+
assert worker._queue is not None
1289+
# Kill resets queue to None to avoid stale terminators
12871290
worker.kill()
12881291
await asyncio.sleep(0) # Allow cancelled tasks to be cleaned up
1292+
assert worker._queue is None
1293+
# Restart creates a fresh queue
12891294
worker.start()
1290-
assert worker._queue is queue_ref
1295+
assert worker._queue is not None
12911296
worker.kill()
12921297
await asyncio.sleep(0) # Allow cancelled tasks to be cleaned up
12931298

0 commit comments

Comments
 (0)