Skip to content

Commit 4341625

Browse files
committed
Integrate AsyncHttpTransport as a new experimental option
Add a new experimental option that uses AsyncHttpTransport and integrates it with synchronous client interfaces. GH-4601
1 parent 23b8ea2 commit 4341625

File tree

3 files changed

+53
-22
lines changed

3 files changed

+53
-22
lines changed

sentry_sdk/client.py

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import uuid
44
import random
55
import socket
6+
import asyncio
67
from collections.abc import Mapping
78
from datetime import datetime, timezone
89
from importlib import import_module
@@ -25,7 +26,7 @@
2526
)
2627
from sentry_sdk.serializer import serialize
2728
from sentry_sdk.tracing import trace
28-
from sentry_sdk.transport import HttpTransportCore, make_transport
29+
from sentry_sdk.transport import HttpTransportCore, make_transport, AsyncHttpTransport
2930
from sentry_sdk.consts import (
3031
SPANDATA,
3132
DEFAULT_MAX_VALUE_LENGTH,
@@ -914,36 +915,50 @@ def get_integration(
914915

915916
return self.integrations.get(integration_name)
916917

918+
def _close_components(self) -> None:
919+
"""Kill all client components in the correct order."""
920+
self.session_flusher.kill()
921+
if self.log_batcher is not None:
922+
self.log_batcher.kill()
923+
if self.monitor:
924+
self.monitor.kill()
925+
if self.transport is not None:
926+
self.transport.kill()
927+
self.transport = None
928+
917929
def close(
918930
self,
919931
timeout: Optional[float] = None,
920932
callback: Optional[Callable[[int, float], None]] = None,
921-
) -> None:
933+
) -> Optional[asyncio.Task[None]]:
922934
"""
923935
Close the client and shut down the transport. Arguments have the same
924-
semantics as :py:meth:`Client.flush`.
936+
semantics as :py:meth:`Client.flush`. When using the async transport, close needs to be awaited to block.
925937
"""
926938
if self.transport is not None:
927-
self.flush(timeout=timeout, callback=callback)
939+
if isinstance(self.transport, AsyncHttpTransport):
928940

929-
self.session_flusher.kill()
941+
def _on_flush_done(_: asyncio.Task[None]) -> None:
942+
self._close_components()
930943

931-
if self.log_batcher is not None:
932-
self.log_batcher.kill()
933-
934-
if self.monitor:
935-
self.monitor.kill()
936-
937-
self.transport.kill()
938-
self.transport = None
944+
flush_task = self.transport.loop.create_task(
945+
self._flush_async(timeout, callback)
946+
)
947+
# Enforce flush before shutdown
948+
flush_task.add_done_callback(_on_flush_done)
949+
return flush_task
950+
else:
951+
self.flush(timeout=timeout, callback=callback)
952+
self._close_components()
953+
return None
939954

940955
def flush(
941956
self,
942957
timeout: Optional[float] = None,
943958
callback: Optional[Callable[[int, float], None]] = None,
944-
) -> None:
959+
) -> Optional[asyncio.Task[None]]:
945960
"""
946-
Wait for the current events to be sent.
961+
Wait for the current events to be sent. When using the async transport, flush needs to be awaited to block.
947962
948963
:param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used.
949964
@@ -952,12 +967,28 @@ def flush(
952967
if self.transport is not None:
953968
if timeout is None:
954969
timeout = self.options["shutdown_timeout"]
970+
955971
self.session_flusher.flush()
956972

957973
if self.log_batcher is not None:
958974
self.log_batcher.flush()
959975

960-
self.transport.flush(timeout=timeout, callback=callback)
976+
if isinstance(self.transport, AsyncHttpTransport):
977+
return self.transport.loop.create_task(
978+
self._flush_async(timeout, callback)
979+
)
980+
else:
981+
self.transport.flush(timeout=timeout, callback=callback)
982+
983+
return None
984+
985+
async def _flush_async(
986+
self, timeout: float, callback: Optional[Callable[[int, float], None]]
987+
) -> None:
988+
self.session_flusher.flush()
989+
if self.log_batcher is not None:
990+
self.log_batcher.flush()
991+
await self.transport.flush_async(timeout=timeout, callback=callback) # type: ignore
961992

962993
def __enter__(self) -> _Client:
963994
return self

sentry_sdk/consts.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class CompressionAlgo(Enum):
7575
"transport_compression_algo": Optional[CompressionAlgo],
7676
"transport_num_pools": Optional[int],
7777
"transport_http2": Optional[bool],
78+
"transport_async": Optional[bool],
7879
"enable_logs": Optional[bool],
7980
"before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]],
8081
},

sentry_sdk/transport.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -647,18 +647,18 @@ async def send_envelope_wrapper() -> None:
647647
def capture_envelope(self: Self, envelope: Envelope) -> None:
648648
# Synchronous entry point
649649
try:
650-
asyncio.get_running_loop()
651650
# We are on the main thread running the event loop
651+
asyncio.get_running_loop()
652652
task = asyncio.create_task(self._capture_envelope(envelope))
653653
self.background_tasks.add(task)
654654
task.add_done_callback(self.background_tasks.discard)
655655
except RuntimeError:
656656
# We are in a background thread, not running an event loop,
657657
# have to launch the task on the loop in a threadsafe way.
658-
if self._loop and self._loop.is_running():
658+
if self.loop and self.loop.is_running():
659659
asyncio.run_coroutine_threadsafe(
660660
self._capture_envelope(envelope),
661-
self._loop,
661+
self.loop,
662662
)
663663
else:
664664
# The event loop is no longer running
@@ -680,7 +680,7 @@ async def flush_async(
680680

681681
def _get_pool_options(self: Self) -> Dict[str, Any]:
682682
options: Dict[str, Any] = {
683-
"http2": False, # no HTTP2 for now
683+
"http2": False, # no HTTP2 for now, should probably just work with this setting
684684
"retries": 3,
685685
}
686686

@@ -1022,8 +1022,7 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]:
10221022
if use_async_transport and asyncio.get_running_loop() is not None:
10231023
transport_cls: Type[Transport] = AsyncHttpTransport
10241024
else:
1025-
use_http2 = use_http2_transport
1026-
transport_cls = Http2Transport if use_http2 else HttpTransport
1025+
transport_cls = Http2Transport if use_http2_transport else HttpTransport
10271026

10281027
if isinstance(ref_transport, Transport):
10291028
return ref_transport

0 commit comments

Comments
 (0)