Skip to content

Commit 8809b08

Browse files
committed
feat(transport): Add async transport class
Add an implementation of Transport to work with the async background worker and HTTPCore async. GH-4582
1 parent 97c5e3d commit 8809b08

File tree

1 file changed

+182
-0
lines changed

1 file changed

+182
-0
lines changed

sentry_sdk/transport.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import socket
77
import ssl
88
import time
9+
import asyncio
910
from datetime import datetime, timedelta, timezone
1011
from collections import defaultdict
1112
from urllib.request import getproxies
@@ -569,6 +570,187 @@ def flush(
569570
self._worker.flush(timeout, callback)
570571

571572

573+
class AsyncHttpTransport(HttpTransportCore):
574+
def __init__(self: Self, options: Dict[str, Any]) -> None:
575+
super().__init__(options)
576+
# Requires event loop at init time
577+
self._loop = asyncio.get_running_loop()
578+
self.background_tasks = set()
579+
580+
async def _send_envelope(self: Self, envelope: Envelope) -> None:
581+
_prepared_envelope = self._prepare_envelope(envelope)
582+
if _prepared_envelope is None:
583+
return None
584+
envelope, body, headers = _prepared_envelope
585+
await self._send_request(
586+
body.getvalue(),
587+
headers=headers,
588+
endpoint_type=EndpointType.ENVELOPE,
589+
envelope=envelope,
590+
)
591+
return None
592+
593+
async def _send_request(
594+
self: Self,
595+
body: bytes,
596+
headers: Dict[str, str],
597+
endpoint_type: EndpointType,
598+
envelope: Optional[Envelope],
599+
) -> None:
600+
self._update_headers(headers)
601+
try:
602+
response = await self._request(
603+
"POST",
604+
endpoint_type,
605+
body,
606+
headers,
607+
)
608+
except Exception:
609+
self._handle_request_error(envelope=envelope, loss_reason="network")
610+
raise
611+
try:
612+
self._handle_response(response=response, envelope=envelope)
613+
finally:
614+
response.close()
615+
616+
async def _request(
617+
self: Self,
618+
method: str,
619+
endpoint_type: EndpointType,
620+
body: Any,
621+
headers: Mapping[str, str],
622+
) -> httpcore.Response:
623+
return await self._pool.request(
624+
method,
625+
self._auth.get_api_url(endpoint_type),
626+
content=body,
627+
headers=headers, # type: ignore
628+
)
629+
630+
def _flush_client_reports(self: Self, force: bool = False) -> None:
631+
client_report = self._fetch_pending_client_report(force=force, interval=60)
632+
if client_report is not None:
633+
self.capture_envelope(Envelope(items=[client_report]))
634+
635+
async def _capture_envelope(self: Self, envelope: Envelope) -> None:
636+
async def send_envelope_wrapper() -> None:
637+
with capture_internal_exceptions():
638+
await self._send_envelope(envelope)
639+
self._flush_client_reports()
640+
641+
if not self._worker.submit(send_envelope_wrapper):
642+
self.on_dropped_event("full_queue")
643+
for item in envelope.items:
644+
self.record_lost_event("queue_overflow", item=item)
645+
646+
def capture_envelope(self: Self, envelope: Envelope) -> None:
647+
# Synchronous entry point
648+
if asyncio.get_running_loop() is not None:
649+
# We are on the main thread running the event loop
650+
task = asyncio.create_task(self._capture_envelope(envelope))
651+
self.background_tasks.add(task)
652+
task.add_done_callback(self.background_tasks.discard)
653+
else:
654+
# We are in a background thread, not running an event loop,
655+
# have to launch the task on the loop in a threadsafe way.
656+
asyncio.run_coroutine_threadsafe(
657+
self._capture_envelope(envelope),
658+
self._loop,
659+
)
660+
661+
async def flush_async(
662+
self: Self,
663+
timeout: float,
664+
callback: Optional[Callable[[int, float], None]] = None,
665+
) -> None:
666+
logger.debug("Flushing HTTP transport")
667+
668+
if timeout > 0:
669+
self._worker.submit(lambda: self._flush_client_reports(force=True))
670+
await self._worker.flush_async(timeout, callback) # type: ignore
671+
672+
def _get_pool_options(self: Self) -> Dict[str, Any]:
673+
options: Dict[str, Any] = {
674+
"http2": False, # no HTTP2 for now
675+
"retries": 3,
676+
}
677+
678+
socket_options = (
679+
self.options["socket_options"]
680+
if self.options["socket_options"] is not None
681+
else []
682+
)
683+
684+
used_options = {(o[0], o[1]) for o in socket_options}
685+
for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
686+
if (default_option[0], default_option[1]) not in used_options:
687+
socket_options.append(default_option)
688+
689+
options["socket_options"] = socket_options
690+
691+
ssl_context = ssl.create_default_context()
692+
ssl_context.load_verify_locations(
693+
self.options["ca_certs"] # User-provided bundle from the SDK init
694+
or os.environ.get("SSL_CERT_FILE")
695+
or os.environ.get("REQUESTS_CA_BUNDLE")
696+
or certifi.where()
697+
)
698+
cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE")
699+
key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE")
700+
if cert_file is not None:
701+
ssl_context.load_cert_chain(cert_file, key_file)
702+
703+
options["ssl_context"] = ssl_context
704+
705+
return options
706+
707+
def _make_pool(
708+
self: Self,
709+
) -> Union[
710+
httpcore.AsyncSOCKSProxy, httpcore.AsyncHTTPProxy, httpcore.AsyncConnectionPool
711+
]:
712+
if self.parsed_dsn is None:
713+
raise ValueError("Cannot create HTTP-based transport without valid DSN")
714+
proxy = None
715+
no_proxy = self._in_no_proxy(self.parsed_dsn)
716+
717+
# try HTTPS first
718+
https_proxy = self.options["https_proxy"]
719+
if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
720+
proxy = https_proxy or (not no_proxy and getproxies().get("https"))
721+
722+
# maybe fallback to HTTP proxy
723+
http_proxy = self.options["http_proxy"]
724+
if not proxy and (http_proxy != ""):
725+
proxy = http_proxy or (not no_proxy and getproxies().get("http"))
726+
727+
opts = self._get_pool_options()
728+
729+
if proxy:
730+
proxy_headers = self.options["proxy_headers"]
731+
if proxy_headers:
732+
opts["proxy_headers"] = proxy_headers
733+
734+
if proxy.startswith("socks"):
735+
try:
736+
if "socket_options" in opts:
737+
socket_options = opts.pop("socket_options")
738+
if socket_options:
739+
logger.warning(
740+
"You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options."
741+
)
742+
return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts)
743+
except RuntimeError:
744+
logger.warning(
745+
"You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.",
746+
proxy,
747+
)
748+
else:
749+
return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts)
750+
751+
return httpcore.AsyncConnectionPool(**opts)
752+
753+
572754
class HttpTransport(BaseHttpTransport):
573755
if TYPE_CHECKING:
574756
_pool: Union[PoolManager, ProxyManager]

0 commit comments

Comments
 (0)