feat: Add experimental async transport (port of PR #4572)#5646
feat: Add experimental async transport (port of PR #4572)#5646
Conversation
Semver Impact of This PR🟡 Minor (new features) 📋 Changelog PreviewThis is how your changes will appear in the changelog. New Features ✨
Bug Fixes 🐛Anthropic
Documentation 📚
Internal Changes 🔧
Other
🤖 This preview updates automatically when you update the PR. |
Codecov Results 📊✅ 13 passed | Total: 13 | Pass Rate: 100% | Execution Time: 6.70s 📊 Comparison with Base Branch
✨ No test changes detected All tests are passing successfully. ❌ Patch coverage is 17.97%. Project has 14673 uncovered lines. Files with missing lines (7)
Coverage diff@@ Coverage Diff @@
## main #PR +/-##
==========================================
+ Coverage 25.33% 30.04% +4.71%
==========================================
Files 189 189 —
Lines 20613 20972 +359
Branches 6738 6844 +106
==========================================
+ Hits 5222 6299 +1077
- Misses 15391 14673 -718
- Partials 429 478 +49Generated by Codecov Action |
Codecov Results 📊Generated by Codecov Action |
Add an experimental async transport using httpcore's async backend,
enabled via `_experiments={"transport_async": True}`.
This is a manual port of PR #4572 (originally merged into `potel-base`)
onto the current `master` branch.
Key changes:
- Refactor `BaseHttpTransport` into `HttpTransportCore` (shared base) +
`BaseHttpTransport` (sync) + `AsyncHttpTransport` (async, conditional
on httpcore[asyncio])
- Add `Worker` ABC and `AsyncWorker` using asyncio.Queue/Task
- Add `close_async()` / `flush_async()` to client and public API
- Patch `loop.close` in asyncio integration to flush before shutdown
- Add `is_internal_task()` ContextVar to skip wrapping Sentry-internal tasks
- Add `asyncio` extras_require (`httpcore[asyncio]==1.*`)
- Widen anyio constraint to `>=3,<5` for httpx and FastAPI
Refs: GH-4568
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
8c808bf to
4f8a00c
Compare
The base class _make_pool returns a union of sync and async pool types, so mypy sees _pool.request() as possibly returning a non-awaitable. Add type: ignore[misc] since within AsyncHttpTransport the pool is always an async type. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The asyncio extra on httpcore pulls in anyio, which conflicts with starlette's anyio<4.0.0 pin and causes pip to downgrade httpcore to 0.18.0. That old version crashes on Python 3.14 due to typing.Union not having __module__. Keep httpcore[http2] in requirements-testing.txt (shared by all envs) and add httpcore[asyncio] only to linters, mypy, and common envs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- AsyncWorker.kill() now calls self._task.cancel() before clearing the reference, preventing duplicate consumers if submit() is called later - close() with AsyncHttpTransport now does best-effort sync cleanup (kill transport, close components) instead of silently returning - flush()/close() log warnings instead of debug when async transport used - Add __aenter__/__aexit__ to _Client for 'async with' support Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Asyncio and gevent don't mix — async tests using asyncio.run() fail under gevent's monkey-patching. Add skip_under_gevent decorator to all async tests in test_transport.py and test_client.py. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Python 3.6 doesn't support PEP 563 (from __future__ import annotations). Use string-quoted annotations instead, matching the convention used in the rest of the SDK. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add 77 tests covering: - AsyncWorker lifecycle (init, start, kill, submit, flush, is_alive) - AsyncWorker edge cases (no loop, queue full, cancelled tasks, pid mismatch) - HttpTransportCore methods (_handle_request_error, _handle_response, _update_headers, _prepare_envelope) - make_transport() async detection (with/without loop, integration, http2) - AsyncHttpTransport specifics (header parsing, capture_envelope, kill) - Client async methods (close_async, flush_async, __aenter__/__aexit__) - Client component helpers (_close_components, _flush_components) - asyncio integration (patch_loop_close, _create_task_with_factory) - ContextVar utilities (is_internal_task, mark_sentry_task_internal) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use a sync test to test the no-running-loop path — there's genuinely no running loop in a sync test, so no mock needed and no leaked coroutines. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
After AsyncWorker.kill() cancels tasks, the event loop needs a tick to actually process the cancellations. Without this, pytest reports PytestUnraisableExceptionWarning for never-awaited coroutines. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When kill() cancels the _target task while it's waiting on queue.get(), the CancelledError propagates through the coroutine. Without catching it, the coroutine gets garbage collected with an unhandled exception, causing pytest's PytestUnraisableExceptionWarning. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
On Python 3.8, cancelled asyncio coroutines that were awaiting Queue.get() raise GeneratorExit during garbage collection, triggering PytestUnraisableExceptionWarning. This is a Python 3.8 asyncio limitation, not a real bug. Suppress the warning for async worker tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolve tox.ini conflict: keep both httpx-latest line from master and our anyio>=3,<5 change.
Add tests that use asyncio.run() instead of @pytest.mark.asyncio to ensure coverage is properly tracked in the main thread. This covers: - AsyncWorker lifecycle (start, submit, flush, kill) - AsyncHttpTransport (creation, pool options, header parsing, capture_envelope, flush, kill) - Client async methods (close_async, flush_async, __aenter__/__aexit__) - make_transport() async detection - patch_loop_close, _create_task_with_factory - is_internal_task / mark_sentry_task_internal - Full task factory integration with internal task detection Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- AsyncWorker.kill(): Reset queue to None instead of putting a stale _TERMINATOR (since we now cancel the task directly, the terminator was never consumed and would break restart) - close() with async transport: Call _flush_components() to flush session flusher, log/metrics/span batchers even when sync flush is skipped - Update test to verify fresh queue creation after kill Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
99e3031 to
86d6e36
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| @staticmethod | ||
| def setup_once() -> None: | ||
| patch_asyncio() | ||
| patch_loop_close() |
There was a problem hiding this comment.
enable_asyncio_integration omits new patch_loop_close call
Medium Severity
setup_once() now calls both patch_asyncio() and patch_loop_close(), but the existing enable_asyncio_integration() function (designed for deferred setup when no loop exists at init time) was not updated to also call patch_loop_close(). Users who initialize Sentry before the event loop and later call enable_asyncio_integration() will never get the loop-close flush patch applied.
| if (default_option[0], default_option[1]) not in used_options: | ||
| socket_options.append(default_option) | ||
|
|
||
| options["socket_options"] = socket_options |
There was a problem hiding this comment.
Async transport ignores keep_alive configuration option
Low Severity
AsyncHttpTransport._get_pool_options unconditionally appends KEEP_ALIVE_SOCKET_OPTIONS to socket options, ignoring the user's keep_alive setting. The sync HttpTransport._get_pool_options correctly checks self.options["keep_alive"] before adding keep-alive options. A user setting keep_alive=False would have no effect with the async transport.
| async def _flush_client_reports(self: "Self", force: bool = False) -> None: | ||
| client_report = self._fetch_pending_client_report(force=force, interval=60) | ||
| if client_report is not None: | ||
| self.capture_envelope(Envelope(items=[client_report])) |
There was a problem hiding this comment.
Async flush loses client reports via deferred scheduling
Low Severity
AsyncHttpTransport._flush_client_reports calls the sync capture_envelope, which defers work via call_soon_threadsafe. During flush, the forced client report is scheduled but not yet queued when queue.join() completes, so close_async's subsequent kill() discards it. The sync transport doesn't have this issue because its capture_envelope submits directly to the queue.
Additional Locations (1)
| # Firing tasks instead of awaiting them allows for concurrent requests | ||
| with mark_sentry_task_internal(): | ||
| task = asyncio.create_task(self._process_callback(callback)) | ||
| # Create a strong reference to the task so it can be cancelled on kill | ||
| # and does not get garbage collected while running | ||
| self._active_tasks.add(task) | ||
| task.add_done_callback(self._on_task_complete) | ||
| # Yield to let the event loop run other tasks | ||
| await asyncio.sleep(0) | ||
| except asyncio.CancelledError: | ||
| pass # Expected during kill() |
There was a problem hiding this comment.
Bug: A race condition in AsyncWorker.kill() can cause queue.join() to hang. kill() nullifies the queue before cancelled tasks can call task_done(), preventing the task counter from decrementing.
Severity: CRITICAL
Suggested Fix
The _on_task_complete method should be modified to call task_done() regardless of whether self._queue is None. The check if self._queue is not None: should be removed. This ensures that even when tasks are cancelled during a kill operation, they correctly decrement the queue's unfinished task counter, allowing queue.join() to resolve correctly and preventing shutdown hangs.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: sentry_sdk/worker.py#L295-L314
Potential issue: A race condition exists in the `AsyncWorker`. When `kill()` is called,
it cancels pending tasks and immediately sets `self._queue` to `None`. However, the
`_on_task_complete` callback for these cancelled tasks checks if `self._queue` is not
`None` before calling `self._queue.task_done()`. Because the queue is already nullified,
`task_done()` is never called for these tasks. This prevents the queue's internal
counter from decrementing, causing any concurrent call to `await self._queue.join()`
(e.g., from a `flush_async` operation) to hang indefinitely until it times out.


Add an experimental async transport using httpcore's async backend,
enabled via
_experiments={"transport_async": True}.This is a manual port of PR #4572 (originally merged into
potel-base)onto the current
masterbranch.Key changes
transport.py: Refactor
BaseHttpTransportintoHttpTransportCore(shared base) +
BaseHttpTransport(sync) +AsyncHttpTransport(async, conditional on
httpcore[asyncio]). Extract shared helpers:_handle_request_error,_handle_response,_update_headers,_prepare_envelope. Updatemake_transport()to detect thetransport_asyncexperiment.worker.py: Add
WorkerABC base class andAsyncWorkerimplementation using
asyncio.Queue/asyncio.Task.client.py: Add
close_async()/flush_async()with async-vs-synctransport detection. Extract
_close_components()/_flush_components().api.py: Expose
flush_async()as a public API.integrations/asyncio.py: Patch
loop.closeto flush pending eventsbefore shutdown. Skip span wrapping for internal Sentry tasks.
utils.py: Add
is_internal_task()/mark_sentry_task_internal()via ContextVar for async task filtering.
setup.py: Add
"asyncio"extras_require (httpcore[asyncio]==1.*).config.py / tox.ini: Widen anyio to
>=3,<5for httpx and FastAPI.Notes
tox.iniwas manually edited (the generation script requires afree-threaded Python interpreter). A full regeneration should be done
before merge.
Refs: GH-4568