Skip to content

Commit a644465

Browse files
committed
feat(transport): Add async transport class
Add an implementation of Transport to work with the async background worker and HTTPCore async. GH-4582
1 parent d9f7383 commit a644465

File tree

1 file changed

+182
-0
lines changed

1 file changed

+182
-0
lines changed

sentry_sdk/transport.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import socket
77
import ssl
88
import time
9+
import asyncio
910
from datetime import datetime, timedelta, timezone
1011
from collections import defaultdict
1112
from urllib.request import getproxies
@@ -571,6 +572,187 @@ def flush(
571572
self._worker.flush(timeout, callback)
572573

573574

575+
class AsyncHttpTransport(HttpTransportCore):
576+
def __init__(self: Self, options: Dict[str, Any]) -> None:
577+
super().__init__(options)
578+
# Requires event loop at init time
579+
self._loop = asyncio.get_running_loop()
580+
self.background_tasks = set()
581+
582+
async def _send_envelope(self: Self, envelope: Envelope) -> None:
583+
_prepared_envelope = self._prepare_envelope(envelope)
584+
if _prepared_envelope is None:
585+
return None
586+
envelope, body, headers = _prepared_envelope
587+
await self._send_request(
588+
body.getvalue(),
589+
headers=headers,
590+
endpoint_type=EndpointType.ENVELOPE,
591+
envelope=envelope,
592+
)
593+
return None
594+
595+
async def _send_request(
596+
self: Self,
597+
body: bytes,
598+
headers: Dict[str, str],
599+
endpoint_type: EndpointType,
600+
envelope: Optional[Envelope],
601+
) -> None:
602+
self._update_headers(headers)
603+
try:
604+
response = await self._request(
605+
"POST",
606+
endpoint_type,
607+
body,
608+
headers,
609+
)
610+
except Exception:
611+
self._handle_request_error(envelope=envelope, loss_reason="network")
612+
raise
613+
try:
614+
self._handle_response(response=response, envelope=envelope)
615+
finally:
616+
response.close()
617+
618+
async def _request(
619+
self: Self,
620+
method: str,
621+
endpoint_type: EndpointType,
622+
body: Any,
623+
headers: Mapping[str, str],
624+
) -> httpcore.Response:
625+
return await self._pool.request(
626+
method,
627+
self._auth.get_api_url(endpoint_type),
628+
content=body,
629+
headers=headers, # type: ignore
630+
)
631+
632+
def _flush_client_reports(self: Self, force: bool = False) -> None:
633+
client_report = self._fetch_pending_client_report(force=force, interval=60)
634+
if client_report is not None:
635+
self.capture_envelope(Envelope(items=[client_report]))
636+
637+
async def _capture_envelope(self: Self, envelope: Envelope) -> None:
638+
async def send_envelope_wrapper() -> None:
639+
with capture_internal_exceptions():
640+
await self._send_envelope(envelope)
641+
self._flush_client_reports()
642+
643+
if not self._worker.submit(send_envelope_wrapper):
644+
self.on_dropped_event("full_queue")
645+
for item in envelope.items:
646+
self.record_lost_event("queue_overflow", item=item)
647+
648+
def capture_envelope(self: Self, envelope: Envelope) -> None:
649+
# Synchronous entry point
650+
if asyncio.get_running_loop() is not None:
651+
# We are on the main thread running the event loop
652+
task = asyncio.create_task(self._capture_envelope(envelope))
653+
self.background_tasks.add(task)
654+
task.add_done_callback(self.background_tasks.discard)
655+
else:
656+
# We are in a background thread, not running an event loop,
657+
# have to launch the task on the loop in a threadsafe way.
658+
asyncio.run_coroutine_threadsafe(
659+
self._capture_envelope(envelope),
660+
self._loop,
661+
)
662+
663+
async def flush_async(
664+
self: Self,
665+
timeout: float,
666+
callback: Optional[Callable[[int, float], None]] = None,
667+
) -> None:
668+
logger.debug("Flushing HTTP transport")
669+
670+
if timeout > 0:
671+
self._worker.submit(lambda: self._flush_client_reports(force=True))
672+
await self._worker.flush_async(timeout, callback) # type: ignore
673+
674+
def _get_pool_options(self: Self) -> Dict[str, Any]:
675+
options: Dict[str, Any] = {
676+
"http2": False, # no HTTP2 for now
677+
"retries": 3,
678+
}
679+
680+
socket_options = (
681+
self.options["socket_options"]
682+
if self.options["socket_options"] is not None
683+
else []
684+
)
685+
686+
used_options = {(o[0], o[1]) for o in socket_options}
687+
for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
688+
if (default_option[0], default_option[1]) not in used_options:
689+
socket_options.append(default_option)
690+
691+
options["socket_options"] = socket_options
692+
693+
ssl_context = ssl.create_default_context()
694+
ssl_context.load_verify_locations(
695+
self.options["ca_certs"] # User-provided bundle from the SDK init
696+
or os.environ.get("SSL_CERT_FILE")
697+
or os.environ.get("REQUESTS_CA_BUNDLE")
698+
or certifi.where()
699+
)
700+
cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE")
701+
key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE")
702+
if cert_file is not None:
703+
ssl_context.load_cert_chain(cert_file, key_file)
704+
705+
options["ssl_context"] = ssl_context
706+
707+
return options
708+
709+
def _make_pool(
710+
self: Self,
711+
) -> Union[
712+
httpcore.AsyncSOCKSProxy, httpcore.AsyncHTTPProxy, httpcore.AsyncConnectionPool
713+
]:
714+
if self.parsed_dsn is None:
715+
raise ValueError("Cannot create HTTP-based transport without valid DSN")
716+
proxy = None
717+
no_proxy = self._in_no_proxy(self.parsed_dsn)
718+
719+
# try HTTPS first
720+
https_proxy = self.options["https_proxy"]
721+
if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
722+
proxy = https_proxy or (not no_proxy and getproxies().get("https"))
723+
724+
# maybe fallback to HTTP proxy
725+
http_proxy = self.options["http_proxy"]
726+
if not proxy and (http_proxy != ""):
727+
proxy = http_proxy or (not no_proxy and getproxies().get("http"))
728+
729+
opts = self._get_pool_options()
730+
731+
if proxy:
732+
proxy_headers = self.options["proxy_headers"]
733+
if proxy_headers:
734+
opts["proxy_headers"] = proxy_headers
735+
736+
if proxy.startswith("socks"):
737+
try:
738+
if "socket_options" in opts:
739+
socket_options = opts.pop("socket_options")
740+
if socket_options:
741+
logger.warning(
742+
"You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options."
743+
)
744+
return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts)
745+
except RuntimeError:
746+
logger.warning(
747+
"You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.",
748+
proxy,
749+
)
750+
else:
751+
return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts)
752+
753+
return httpcore.AsyncConnectionPool(**opts)
754+
755+
574756
class HttpTransport(BaseHttpTransport):
575757
if TYPE_CHECKING:
576758
_pool: Union[PoolManager, ProxyManager]

0 commit comments

Comments
 (0)