Skip to content

Commit 1e2c70d

Browse files
committed
feat(transport): Add async transport class
Add an implementation of Transport to work with the async background worker and HTTPCore async. GH-4582
1 parent 1a129f7 commit 1e2c70d

File tree

1 file changed

+182
-0
lines changed

1 file changed

+182
-0
lines changed

sentry_sdk/transport.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import socket
77
import ssl
88
import time
9+
import asyncio
910
from datetime import datetime, timedelta, timezone
1011
from collections import defaultdict
1112
from urllib.request import getproxies
@@ -566,6 +567,187 @@ def flush(
566567
self._worker.flush(timeout, callback)
567568

568569

570+
class AsyncHttpTransport(HttpTransportCore):
571+
def __init__(self: Self, options: Dict[str, Any]) -> None:
572+
super().__init__(options)
573+
# Requires event loop at init time
574+
self._loop = asyncio.get_running_loop()
575+
self.background_tasks = set()
576+
577+
async def _send_envelope(self: Self, envelope: Envelope) -> None:
578+
_prepared_envelope = self._prepare_envelope(envelope)
579+
if _prepared_envelope is None:
580+
return None
581+
envelope, body, headers = _prepared_envelope
582+
await self._send_request(
583+
body.getvalue(),
584+
headers=headers,
585+
endpoint_type=EndpointType.ENVELOPE,
586+
envelope=envelope,
587+
)
588+
return None
589+
590+
async def _send_request(
591+
self: Self,
592+
body: bytes,
593+
headers: Dict[str, str],
594+
endpoint_type: EndpointType,
595+
envelope: Optional[Envelope],
596+
) -> None:
597+
self._update_headers(headers)
598+
try:
599+
response = await self._request(
600+
"POST",
601+
endpoint_type,
602+
body,
603+
headers,
604+
)
605+
except Exception:
606+
self._handle_request_error(envelope=envelope, loss_reason="network")
607+
raise
608+
try:
609+
self._handle_response(response=response, envelope=envelope)
610+
finally:
611+
response.close()
612+
613+
async def _request(
614+
self: Self,
615+
method: str,
616+
endpoint_type: EndpointType,
617+
body: Any,
618+
headers: Mapping[str, str],
619+
) -> httpcore.Response:
620+
return await self._pool.request(
621+
method,
622+
self._auth.get_api_url(endpoint_type),
623+
content=body,
624+
headers=headers, # type: ignore
625+
)
626+
627+
def _flush_client_reports(self: Self, force: bool = False) -> None:
628+
client_report = self._fetch_pending_client_report(force=force, interval=60)
629+
if client_report is not None:
630+
self.capture_envelope(Envelope(items=[client_report]))
631+
632+
async def _capture_envelope(self: Self, envelope: Envelope) -> None:
633+
async def send_envelope_wrapper() -> None:
634+
with capture_internal_exceptions():
635+
await self._send_envelope(envelope)
636+
self._flush_client_reports()
637+
638+
if not self._worker.submit(send_envelope_wrapper):
639+
self.on_dropped_event("full_queue")
640+
for item in envelope.items:
641+
self.record_lost_event("queue_overflow", item=item)
642+
643+
def capture_envelope(self: Self, envelope: Envelope) -> None:
644+
# Synchronous entry point
645+
if asyncio.get_running_loop() is not None:
646+
# We are on the main thread running the event loop
647+
task = asyncio.create_task(self._capture_envelope(envelope))
648+
self.background_tasks.add(task)
649+
task.add_done_callback(self.background_tasks.discard)
650+
else:
651+
# We are in a background thread, not running an event loop,
652+
# have to launch the task on the loop in a threadsafe way.
653+
asyncio.run_coroutine_threadsafe(
654+
self._capture_envelope(envelope),
655+
self._loop,
656+
)
657+
658+
async def flush_async(
659+
self: Self,
660+
timeout: float,
661+
callback: Optional[Callable[[int, float], None]] = None,
662+
) -> None:
663+
logger.debug("Flushing HTTP transport")
664+
665+
if timeout > 0:
666+
self._worker.submit(lambda: self._flush_client_reports(force=True))
667+
await self._worker.flush_async(timeout, callback) # type: ignore
668+
669+
def _get_pool_options(self: Self) -> Dict[str, Any]:
670+
options: Dict[str, Any] = {
671+
"http2": False, # no HTTP2 for now
672+
"retries": 3,
673+
}
674+
675+
socket_options = (
676+
self.options["socket_options"]
677+
if self.options["socket_options"] is not None
678+
else []
679+
)
680+
681+
used_options = {(o[0], o[1]) for o in socket_options}
682+
for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
683+
if (default_option[0], default_option[1]) not in used_options:
684+
socket_options.append(default_option)
685+
686+
options["socket_options"] = socket_options
687+
688+
ssl_context = ssl.create_default_context()
689+
ssl_context.load_verify_locations(
690+
self.options["ca_certs"] # User-provided bundle from the SDK init
691+
or os.environ.get("SSL_CERT_FILE")
692+
or os.environ.get("REQUESTS_CA_BUNDLE")
693+
or certifi.where()
694+
)
695+
cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE")
696+
key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE")
697+
if cert_file is not None:
698+
ssl_context.load_cert_chain(cert_file, key_file)
699+
700+
options["ssl_context"] = ssl_context
701+
702+
return options
703+
704+
def _make_pool(
705+
self: Self,
706+
) -> Union[
707+
httpcore.AsyncSOCKSProxy, httpcore.AsyncHTTPProxy, httpcore.AsyncConnectionPool
708+
]:
709+
if self.parsed_dsn is None:
710+
raise ValueError("Cannot create HTTP-based transport without valid DSN")
711+
proxy = None
712+
no_proxy = self._in_no_proxy(self.parsed_dsn)
713+
714+
# try HTTPS first
715+
https_proxy = self.options["https_proxy"]
716+
if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
717+
proxy = https_proxy or (not no_proxy and getproxies().get("https"))
718+
719+
# maybe fallback to HTTP proxy
720+
http_proxy = self.options["http_proxy"]
721+
if not proxy and (http_proxy != ""):
722+
proxy = http_proxy or (not no_proxy and getproxies().get("http"))
723+
724+
opts = self._get_pool_options()
725+
726+
if proxy:
727+
proxy_headers = self.options["proxy_headers"]
728+
if proxy_headers:
729+
opts["proxy_headers"] = proxy_headers
730+
731+
if proxy.startswith("socks"):
732+
try:
733+
if "socket_options" in opts:
734+
socket_options = opts.pop("socket_options")
735+
if socket_options:
736+
logger.warning(
737+
"You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options."
738+
)
739+
return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts)
740+
except RuntimeError:
741+
logger.warning(
742+
"You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.",
743+
proxy,
744+
)
745+
else:
746+
return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts)
747+
748+
return httpcore.AsyncConnectionPool(**opts)
749+
750+
569751
class HttpTransport(BaseHttpTransport):
570752
if TYPE_CHECKING:
571753
_pool: Union[PoolManager, ProxyManager]

0 commit comments

Comments
 (0)