Skip to content

Commit 35d7078

Browse files
srothhsl0thentr0py
andauthored
Add async transport (#4614)
Add async implementation of the abstract Transport class. This transport utilizes the async task worker as well as the httpcore async functionality. Thread Safety: As capture_envelope is registered by the client as a callback for several background threads in the sdk, which are not running the event loop, capture_envelope in the transport is made to be thread safe and allow for execution on the event loop from other threads. The same is currently not the case for flush, as there does not seem to be a usage from background threads, however if necessary, it can also be added. HTTP2 support: Currently not activated, but from the looks of the [httpcore docs](https://www.encode.io/httpcore/http2/) it should be as simple as setting the http2 in the init of the pool to true. This likely makes sense to support, as HTTP2 shows great performance improvements with concurrent requests. Kill: The kill method is sync, but the pool needs to be closed asynchronously. Currently, this is done by launching a task. However, the task cannot be awaited in sync code without deadlocking, therefore kill followed by an immediate loop shutdown could technically lead to resource leakage. Therefore, I decided to make kill optionally return the async task, so it can be awaited if called from an async context. Note also that parts of the code are very similar to the HTTP2 integration, as they both use the httpcore library. Maybe in a later PR there could be a shared superclass to avoid code duplication? GH-4582 --------- Co-authored-by: Neel Shah <[email protected]>
1 parent 7889074 commit 35d7078

File tree

2 files changed

+246
-6
lines changed

2 files changed

+246
-6
lines changed

sentry_sdk/transport.py

Lines changed: 245 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import socket
77
import ssl
88
import time
9+
import asyncio
910
from datetime import datetime, timedelta, timezone
1011
from collections import defaultdict
1112
from urllib.request import getproxies
@@ -17,18 +18,27 @@
1718

1819
try:
1920
import httpcore
21+
except ImportError:
22+
httpcore = None # type: ignore
23+
24+
try:
2025
import h2 # noqa: F401
2126

22-
HTTP2_ENABLED = True
27+
HTTP2_ENABLED = httpcore is not None
2328
except ImportError:
2429
HTTP2_ENABLED = False
2530

31+
try:
32+
ASYNC_TRANSPORT_ENABLED = httpcore is not None
33+
except ImportError:
34+
ASYNC_TRANSPORT_ENABLED = False
35+
2636
import urllib3
2737
import certifi
2838

2939
from sentry_sdk.consts import EndpointType
3040
from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
31-
from sentry_sdk.worker import BackgroundWorker, Worker
41+
from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker
3242
from sentry_sdk.envelope import Envelope, Item, PayloadRef
3343

3444
from typing import TYPE_CHECKING
@@ -224,9 +234,8 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
224234
elif self._compression_algo == "br":
225235
self._compression_level = 4
226236

227-
def _create_worker(self: Self, options: Dict[str, Any]) -> Worker:
228-
# For now, we only support the threaded sync background worker.
229-
return BackgroundWorker(queue_size=options["transport_queue_size"])
237+
def _create_worker(self, options: dict[str, Any]) -> Worker:
238+
raise NotImplementedError()
230239

231240
def record_lost_event(
232241
self: Self,
@@ -543,6 +552,9 @@ def _send_request(
543552
finally:
544553
response.close()
545554

555+
def _create_worker(self: Self, options: dict[str, Any]) -> Worker:
556+
return BackgroundWorker(queue_size=options["transport_queue_size"])
557+
546558
def _flush_client_reports(self: Self, force: bool = False) -> None:
547559
client_report = self._fetch_pending_client_report(force=force, interval=60)
548560
if client_report is not None:
@@ -571,6 +583,222 @@ def flush(
571583
self._worker.flush(timeout, callback)
572584

573585

586+
if not ASYNC_TRANSPORT_ENABLED:
587+
# Sorry, no AsyncHttpTransport for you
588+
AsyncHttpTransport = BaseHttpTransport
589+
590+
else:
591+
592+
class AsyncHttpTransport(HttpTransportCore): # type: ignore
593+
def __init__(self: Self, options: Dict[str, Any]) -> None:
594+
super().__init__(options)
595+
# Requires event loop at init time
596+
self.loop = asyncio.get_running_loop()
597+
598+
def _create_worker(self: Self, options: dict[str, Any]) -> Worker:
599+
return AsyncWorker(queue_size=options["transport_queue_size"])
600+
601+
def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]:
602+
return next(
603+
(
604+
val.decode("ascii")
605+
for key, val in response.headers
606+
if key.decode("ascii").lower() == header
607+
),
608+
None,
609+
)
610+
611+
async def _send_envelope(self: Self, envelope: Envelope) -> None:
612+
_prepared_envelope = self._prepare_envelope(envelope)
613+
if _prepared_envelope is not None:
614+
envelope, body, headers = _prepared_envelope
615+
await self._send_request(
616+
body.getvalue(),
617+
headers=headers,
618+
endpoint_type=EndpointType.ENVELOPE,
619+
envelope=envelope,
620+
)
621+
return None
622+
623+
async def _send_request(
624+
self: Self,
625+
body: bytes,
626+
headers: Dict[str, str],
627+
endpoint_type: EndpointType,
628+
envelope: Optional[Envelope],
629+
) -> None:
630+
self._update_headers(headers)
631+
try:
632+
response = await self._request(
633+
"POST",
634+
endpoint_type,
635+
body,
636+
headers,
637+
)
638+
except Exception:
639+
self._handle_request_error(envelope=envelope, loss_reason="network")
640+
raise
641+
try:
642+
self._handle_response(response=response, envelope=envelope)
643+
finally:
644+
await response.aclose()
645+
646+
async def _request( # type: ignore[override]
647+
self: Self,
648+
method: str,
649+
endpoint_type: EndpointType,
650+
body: Any,
651+
headers: Mapping[str, str],
652+
) -> httpcore.Response:
653+
return await self._pool.request(
654+
method,
655+
self._auth.get_api_url(endpoint_type),
656+
content=body,
657+
headers=headers, # type: ignore
658+
extensions={
659+
"timeout": {
660+
"pool": self.TIMEOUT,
661+
"connect": self.TIMEOUT,
662+
"write": self.TIMEOUT,
663+
"read": self.TIMEOUT,
664+
}
665+
},
666+
)
667+
668+
async def _flush_client_reports(self: Self, force: bool = False) -> None:
669+
client_report = self._fetch_pending_client_report(force=force, interval=60)
670+
if client_report is not None:
671+
self.capture_envelope(Envelope(items=[client_report]))
672+
673+
def _capture_envelope(self: Self, envelope: Envelope) -> None:
674+
async def send_envelope_wrapper() -> None:
675+
with capture_internal_exceptions():
676+
await self._send_envelope(envelope)
677+
await self._flush_client_reports()
678+
679+
if not self._worker.submit(send_envelope_wrapper):
680+
self.on_dropped_event("full_queue")
681+
for item in envelope.items:
682+
self.record_lost_event("queue_overflow", item=item)
683+
684+
def capture_envelope(self: Self, envelope: Envelope) -> None:
685+
# Synchronous entry point
686+
if self.loop and self.loop.is_running():
687+
self.loop.call_soon_threadsafe(self._capture_envelope, envelope)
688+
else:
689+
# The event loop is no longer running
690+
logger.warning("Async Transport is not running in an event loop.")
691+
self.on_dropped_event("internal_sdk_error")
692+
for item in envelope.items:
693+
self.record_lost_event("internal_sdk_error", item=item)
694+
695+
def flush( # type: ignore[override]
696+
self: Self,
697+
timeout: float,
698+
callback: Optional[Callable[[int, float], None]] = None,
699+
) -> Optional[asyncio.Task[None]]:
700+
logger.debug("Flushing HTTP transport")
701+
702+
if timeout > 0:
703+
self._worker.submit(lambda: self._flush_client_reports(force=True))
704+
return self._worker.flush(timeout, callback) # type: ignore[func-returns-value]
705+
return None
706+
707+
def _get_pool_options(self: Self) -> Dict[str, Any]:
708+
options: Dict[str, Any] = {
709+
"http2": False, # no HTTP2 for now
710+
"retries": 3,
711+
}
712+
713+
socket_options = (
714+
self.options["socket_options"]
715+
if self.options["socket_options"] is not None
716+
else []
717+
)
718+
719+
used_options = {(o[0], o[1]) for o in socket_options}
720+
for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
721+
if (default_option[0], default_option[1]) not in used_options:
722+
socket_options.append(default_option)
723+
724+
options["socket_options"] = socket_options
725+
726+
ssl_context = ssl.create_default_context()
727+
ssl_context.load_verify_locations(
728+
self.options["ca_certs"] # User-provided bundle from the SDK init
729+
or os.environ.get("SSL_CERT_FILE")
730+
or os.environ.get("REQUESTS_CA_BUNDLE")
731+
or certifi.where()
732+
)
733+
cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE")
734+
key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE")
735+
if cert_file is not None:
736+
ssl_context.load_cert_chain(cert_file, key_file)
737+
738+
options["ssl_context"] = ssl_context
739+
740+
return options
741+
742+
def _make_pool(
743+
self: Self,
744+
) -> Union[
745+
httpcore.AsyncSOCKSProxy,
746+
httpcore.AsyncHTTPProxy,
747+
httpcore.AsyncConnectionPool,
748+
]:
749+
if self.parsed_dsn is None:
750+
raise ValueError("Cannot create HTTP-based transport without valid DSN")
751+
proxy = None
752+
no_proxy = self._in_no_proxy(self.parsed_dsn)
753+
754+
# try HTTPS first
755+
https_proxy = self.options["https_proxy"]
756+
if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
757+
proxy = https_proxy or (not no_proxy and getproxies().get("https"))
758+
759+
# maybe fallback to HTTP proxy
760+
http_proxy = self.options["http_proxy"]
761+
if not proxy and (http_proxy != ""):
762+
proxy = http_proxy or (not no_proxy and getproxies().get("http"))
763+
764+
opts = self._get_pool_options()
765+
766+
if proxy:
767+
proxy_headers = self.options["proxy_headers"]
768+
if proxy_headers:
769+
opts["proxy_headers"] = proxy_headers
770+
771+
if proxy.startswith("socks"):
772+
try:
773+
if "socket_options" in opts:
774+
socket_options = opts.pop("socket_options")
775+
if socket_options:
776+
logger.warning(
777+
"You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options."
778+
)
779+
return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts)
780+
except RuntimeError:
781+
logger.warning(
782+
"You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.",
783+
proxy,
784+
)
785+
else:
786+
return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts)
787+
788+
return httpcore.AsyncConnectionPool(**opts)
789+
790+
def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore
791+
792+
logger.debug("Killing HTTP transport")
793+
self._worker.kill()
794+
try:
795+
# Return the pool cleanup task so caller can await it if needed
796+
return self.loop.create_task(self._pool.aclose()) # type: ignore
797+
except RuntimeError:
798+
logger.warning("Event loop not running, aborting kill.")
799+
return None
800+
801+
574802
class HttpTransport(BaseHttpTransport):
575803
if TYPE_CHECKING:
576804
_pool: Union[PoolManager, ProxyManager]
@@ -816,11 +1044,22 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]:
8161044
ref_transport = options["transport"]
8171045

8181046
use_http2_transport = options.get("_experiments", {}).get("transport_http2", False)
819-
1047+
use_async_transport = options.get("_experiments", {}).get("transport_async", False)
8201048
# By default, we use the http transport class
8211049
transport_cls: Type[Transport] = (
8221050
Http2Transport if use_http2_transport else HttpTransport
8231051
)
1052+
if use_async_transport and ASYNC_TRANSPORT_ENABLED:
1053+
try:
1054+
asyncio.get_running_loop()
1055+
transport_cls = AsyncHttpTransport
1056+
except RuntimeError:
1057+
# No event loop running, fall back to sync transport
1058+
logger.warning("No event loop running, falling back to sync transport.")
1059+
elif use_async_transport:
1060+
logger.warning(
1061+
"You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport."
1062+
)
8241063

8251064
if isinstance(ref_transport, Transport):
8261065
return ref_transport

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def get_file_text(file_name):
6060
"flask": ["flask>=0.11", "blinker>=1.1", "markupsafe"],
6161
"grpcio": ["grpcio>=1.21.1", "protobuf>=3.8.0"],
6262
"http2": ["httpcore[http2]==1.*"],
63+
"asyncio": ["httpcore[asyncio]==1.*"],
6364
"httpx": ["httpx>=0.16.0"],
6465
"huey": ["huey>=2"],
6566
"huggingface_hub": ["huggingface_hub>=0.22"],

0 commit comments

Comments
 (0)