-
Notifications
You must be signed in to change notification settings - Fork 557
Integrate async transport with SDK #4615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 151 commits
63b1f24
666ff3a
748764e
ee6dbee
19405fd
3736c03
3607d44
0ba5a83
9bb628e
a81487e
8960e6f
0f7937b
268ea1a
b3c05cc
fb0ad18
7edbbaf
0f63d24
2430e2e
96fcd85
331e40b
5f67485
97c5e3d
8809b08
7c5dec0
b0390e6
f01b00d
23b8ea2
176a1d1
4fe61bf
52c9e36
6d69406
3629609
21cde52
9f24136
001f36c
401b1bc
3f43d8f
15fa295
ef780f3
c541bd7
2808062
ea5f557
c61eb02
38baead
e4ed773
236ae2c
9dd546c
f4ac157
ed392e9
50553d4
4a7b8ce
cd8a35f
b9f2ec7
98d74ed
9df5ec5
23d8740
a496787
32a9abd
a69f7bb
c80b095
5904968
09034b7
21b1cda
9d0cde4
f21e2ea
76aae83
4c1e99b
6df7037
25c04fc
e8d889c
f63e46f
1804271
11da869
779a0d6
0895d23
bbf426b
744dc8a
fcc8040
9a43d9b
b5eda0e
9e380b8
ee44621
d9f7383
a644465
c935e9e
b90daf4
e1d7cdb
90346a5
47416f4
73cdc6d
1ae8708
6f18657
87a9b2f
4fd7fa0
69734cd
f5ef707
fca8740
4b0d09b
e23efd7
d2e647b
55b606a
d89abed
5e1e0c6
8fdf43d
6619670
2eee1b1
6c787a4
b79d346
1717888
e1fd57a
b87c68e
a827d0d
ee0b440
70f228e
328d8ad
ef61134
ad93516
42d3a34
10d85f6
aaae195
de47da2
6e2c4f6
9da7be8
8a5ab06
295a0e9
e754a85
859a0e2
4a58ce7
cbecde7
c8bb55a
05a7de7
38246d0
8b226cb
823215e
4eed4fd
afd494d
fcc7ac3
f659514
8c542ce
30dde67
3392e0e
6c85500
ae5a864
9c537e6
f7554b2
6cb72ad
9f226cf
9171c5d
111861b
1334a29
a2aafb4
6dd8138
5f7af94
f44f690
1db746a
54b7fb6
e36e9fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -3,6 +3,7 @@ | |||||
import uuid | ||||||
import random | ||||||
import socket | ||||||
import asyncio | ||||||
from collections.abc import Mapping | ||||||
from datetime import datetime, timezone | ||||||
from importlib import import_module | ||||||
|
@@ -25,7 +26,7 @@ | |||||
) | ||||||
from sentry_sdk.serializer import serialize | ||||||
from sentry_sdk.tracing import trace | ||||||
from sentry_sdk.transport import HttpTransportCore, make_transport | ||||||
from sentry_sdk.transport import HttpTransportCore, make_transport, AsyncHttpTransport | ||||||
from sentry_sdk.consts import ( | ||||||
SPANDATA, | ||||||
DEFAULT_MAX_VALUE_LENGTH, | ||||||
|
@@ -917,51 +918,126 @@ def get_integration( | |||||
|
||||||
return self.integrations.get(integration_name) | ||||||
|
||||||
def close( | ||||||
def _close_transport(self) -> Optional[asyncio.Task[None]]: | ||||||
"""Close transport and return cleanup task if any.""" | ||||||
if self.transport is not None: | ||||||
cleanup_task = self.transport.kill() # type: ignore | ||||||
self.transport = None | ||||||
return cleanup_task | ||||||
return None | ||||||
|
||||||
def _close_components(self) -> None: | ||||||
"""Kill all client components in the correct order.""" | ||||||
self.session_flusher.kill() | ||||||
if self.log_batcher is not None: | ||||||
self.log_batcher.kill() | ||||||
if self.monitor: | ||||||
self.monitor.kill() | ||||||
|
||||||
async def _close_components_async(self) -> None: | ||||||
"""Async version of _close_components that properly awaits transport cleanup.""" | ||||||
self._close_components() | ||||||
cleanup_task = self._close_transport() | ||||||
if cleanup_task is not None: | ||||||
await cleanup_task | ||||||
|
||||||
def close( # type: ignore[override] | ||||||
self, | ||||||
timeout: Optional[float] = None, | ||||||
callback: Optional[Callable[[int, float], None]] = None, | ||||||
) -> None: | ||||||
) -> Optional[asyncio.Task[None]]: | ||||||
""" | ||||||
Close the client and shut down the transport. Arguments have the same | ||||||
semantics as :py:meth:`Client.flush`. | ||||||
semantics as :py:meth:`Client.flush`. When using the async transport, close needs to be awaited to block. | ||||||
""" | ||||||
if self.transport is not None: | ||||||
self.flush(timeout=timeout, callback=callback) | ||||||
|
||||||
self.session_flusher.kill() | ||||||
async def _flush_and_close( | ||||||
timeout: Optional[float], callback: Optional[Callable[[int, float], None]] | ||||||
) -> None: | ||||||
|
||||||
if self.log_batcher is not None: | ||||||
self.log_batcher.kill() | ||||||
await self._flush_async(timeout=timeout, callback=callback) | ||||||
await self._close_components_async() | ||||||
|
||||||
if self.monitor: | ||||||
self.monitor.kill() | ||||||
if self.transport is not None: | ||||||
if isinstance(self.transport, AsyncHttpTransport) and hasattr( | ||||||
self.transport, "loop" | ||||||
): | ||||||
|
||||||
self.transport.kill() | ||||||
self.transport = None | ||||||
try: | ||||||
flush_task = self.transport.loop.create_task( | ||||||
_flush_and_close(timeout, callback) | ||||||
) | ||||||
except RuntimeError: | ||||||
# Shutdown the components anyway | ||||||
self._close_components() | ||||||
self._close_transport() | ||||||
logger.warning("Event loop not running, aborting close.") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these logging statements can be removed entirely with the sync/async seperation, as the async versions are coroutines and therefore do not need to rely on spawning a task, meaning they will not run into this error in the client. |
||||||
return None | ||||||
srothh marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
# Enforce flush before shutdown | ||||||
return flush_task | ||||||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
else: | ||||||
self.flush(timeout=timeout, callback=callback) | ||||||
self._close_components() | ||||||
self._close_transport() | ||||||
|
||||||
return None | ||||||
|
||||||
def flush( | ||||||
def flush( # type: ignore[override] | ||||||
self, | ||||||
timeout: Optional[float] = None, | ||||||
callback: Optional[Callable[[int, float], None]] = None, | ||||||
) -> None: | ||||||
) -> Optional[asyncio.Task[None]]: | ||||||
""" | ||||||
Wait for the current events to be sent. | ||||||
Wait for the current events to be sent. When using the async transport, flush needs to be awaited to block. | ||||||
|
||||||
:param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used. | ||||||
|
||||||
:param callback: Is invoked with the number of pending events and the configured timeout. | ||||||
""" | ||||||
if self.transport is not None: | ||||||
if timeout is None: | ||||||
timeout = self.options["shutdown_timeout"] | ||||||
self.session_flusher.flush() | ||||||
if isinstance(self.transport, AsyncHttpTransport) and hasattr( | ||||||
self.transport, "loop" | ||||||
): | ||||||
try: | ||||||
return self.transport.loop.create_task( | ||||||
self._flush_async(timeout, callback) | ||||||
) | ||||||
except RuntimeError: | ||||||
logger.warning("Event loop not running, aborting flush.") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above |
||||||
return None | ||||||
else: | ||||||
self._flush_sync(timeout, callback) | ||||||
return None | ||||||
|
||||||
if self.log_batcher is not None: | ||||||
self.log_batcher.flush() | ||||||
def _flush_sync( | ||||||
self, timeout: Optional[float], callback: Optional[Callable[[int, float], None]] | ||||||
) -> None: | ||||||
"""Synchronous flush implementation.""" | ||||||
if timeout is None: | ||||||
timeout = self.options["shutdown_timeout"] | ||||||
|
||||||
self._flush_components() | ||||||
if self.transport is not None: | ||||||
self.transport.flush(timeout=timeout, callback=callback) | ||||||
|
||||||
async def _flush_async( | ||||||
self, timeout: Optional[float], callback: Optional[Callable[[int, float], None]] | ||||||
) -> None: | ||||||
"""Asynchronous flush implementation.""" | ||||||
if timeout is None: | ||||||
timeout = self.options["shutdown_timeout"] | ||||||
|
||||||
self._flush_components() | ||||||
if self.transport is not None: | ||||||
flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore | ||||||
if flush_task is not None: | ||||||
await flush_task | ||||||
|
||||||
def _flush_components(self) -> None: | ||||||
self.session_flusher.flush() | ||||||
if self.log_batcher is not None: | ||||||
self.log_batcher.flush() | ||||||
|
||||||
def __enter__(self) -> _Client: | ||||||
return self | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
from sentry_sdk.consts import OP | ||
from sentry_sdk.integrations import Integration, DidNotEnable | ||
from sentry_sdk.utils import event_from_exception, logger, reraise | ||
from sentry_sdk.transport import AsyncHttpTransport | ||
|
||
try: | ||
import asyncio | ||
|
@@ -29,6 +30,47 @@ def get_name(coro: Any) -> str: | |
) | ||
|
||
|
||
def patch_loop_close() -> None: | ||
"""Patch loop.close to flush pending events before shutdown.""" | ||
# Atexit shutdown hook happens after the event loop is closed. | ||
# Therefore, it is necessary to patch the loop.close method to ensure | ||
# that pending events are flushed before the interpreter shuts down. | ||
try: | ||
loop = asyncio.get_running_loop() | ||
except RuntimeError: | ||
# No running loop → cannot patch now | ||
return | ||
|
||
if getattr(loop, "_sentry_flush_patched", False): | ||
return | ||
|
||
async def _flush() -> None: | ||
client = sentry_sdk.get_client() | ||
if not client: | ||
return | ||
|
||
try: | ||
if not isinstance(client.transport, AsyncHttpTransport): | ||
return | ||
|
||
task = client.close() # type: ignore | ||
if task is not None: | ||
await task | ||
except Exception: | ||
logger.warning("Sentry flush failed during loop shutdown", exc_info=True) | ||
|
||
orig_close = loop.close | ||
|
||
def _patched_close() -> None: | ||
try: | ||
loop.run_until_complete(_flush()) | ||
finally: | ||
orig_close() | ||
|
||
loop.close = _patched_close # type: ignore | ||
loop._sentry_flush_patched = True # type: ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Async Loop Patching Fails on InitializationThe
|
||
|
||
|
||
def patch_asyncio() -> None: | ||
orig_task_factory = None | ||
try: | ||
|
@@ -124,3 +166,4 @@ class AsyncioIntegration(Integration): | |
@staticmethod | ||
def setup_once() -> None: | ||
patch_asyncio() | ||
patch_loop_close() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -377,3 +377,52 @@ async def test_span_origin( | |
|
||
assert event["contexts"]["trace"]["origin"] == "manual" | ||
assert event["spans"][0]["origin"] == "auto.function.asyncio" | ||
|
||
|
||
@minimum_python_38 | ||
def test_loop_close_patching(sentry_init): | ||
sentry_init(integrations=[AsyncioIntegration()]) | ||
|
||
loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(loop) | ||
|
||
try: | ||
with patch("asyncio.get_running_loop", return_value=loop): | ||
assert not hasattr(loop, "_sentry_flush_patched") | ||
AsyncioIntegration.setup_once() | ||
assert hasattr(loop, "_sentry_flush_patched") | ||
assert loop._sentry_flush_patched is True | ||
|
||
finally: | ||
if not loop.is_closed(): | ||
loop.close() | ||
|
||
|
||
@minimum_python_38 | ||
def test_loop_close_flushes_async_transport(sentry_init): | ||
from sentry_sdk.transport import AsyncHttpTransport | ||
from unittest.mock import Mock, AsyncMock | ||
|
||
sentry_init(integrations=[AsyncioIntegration()]) | ||
|
||
loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(loop) | ||
|
||
try: | ||
with patch("asyncio.get_running_loop", return_value=loop): | ||
AsyncioIntegration.setup_once() | ||
|
||
mock_client = Mock() | ||
mock_transport = Mock(spec=AsyncHttpTransport) | ||
mock_client.transport = mock_transport | ||
mock_client.close = AsyncMock(return_value=None) | ||
|
||
with patch("sentry_sdk.get_client", return_value=mock_client): | ||
loop.close() | ||
|
||
mock_client.close.assert_called_once() | ||
mock_client.close.assert_awaited_once() | ||
|
||
except Exception: | ||
if not loop.is_closed(): | ||
loop.close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Test Mismatch: Close vs Close_AsyncThe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is no longer used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, missed that one. Is removed now