-
Notifications
You must be signed in to change notification settings - Fork 557
Add async transport #4614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async transport #4614
Changes from all commits
9f24136
001f36c
401b1bc
3f43d8f
15fa295
ef780f3
f63e46f
1804271
11da869
779a0d6
0895d23
bbf426b
744dc8a
fcc8040
9a43d9b
b5eda0e
9e380b8
ee44621
d9f7383
d2e647b
859a0e2
4a58ce7
cbecde7
c8bb55a
05a7de7
38246d0
8b226cb
823215e
4eed4fd
afd494d
fcc7ac3
f659514
8c542ce
30dde67
3392e0e
6c85500
ae5a864
9c537e6
f7554b2
6cb72ad
9171c5d
111861b
4744817
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
import socket | ||
import ssl | ||
import time | ||
import asyncio | ||
from datetime import datetime, timedelta, timezone | ||
from collections import defaultdict | ||
from urllib.request import getproxies | ||
|
@@ -17,18 +18,27 @@ | |
|
||
try: | ||
import httpcore | ||
except ImportError: | ||
httpcore = None # type: ignore | ||
|
||
try: | ||
import h2 # noqa: F401 | ||
|
||
HTTP2_ENABLED = True | ||
HTTP2_ENABLED = httpcore is not None | ||
except ImportError: | ||
HTTP2_ENABLED = False | ||
|
||
try: | ||
ASYNC_TRANSPORT_ENABLED = httpcore is not None | ||
except ImportError: | ||
sl0thentr0py marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ASYNC_TRANSPORT_ENABLED = False | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Unnecessary ImportError HandlingThe Locations (1)There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The import throwing the error happens in the next PR, #4615, apologies for the slightly unclean separation here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Redundant ImportError HandlingThe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Redundant Import Handling in Async TransportThe |
||
|
||
import urllib3 | ||
import certifi | ||
|
||
from sentry_sdk.consts import EndpointType | ||
from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions | ||
from sentry_sdk.worker import BackgroundWorker, Worker | ||
from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker | ||
from sentry_sdk.envelope import Envelope, Item, PayloadRef | ||
|
||
from typing import TYPE_CHECKING | ||
|
@@ -224,9 +234,8 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: | |
elif self._compression_algo == "br": | ||
self._compression_level = 4 | ||
|
||
def _create_worker(self: Self, options: Dict[str, Any]) -> Worker: | ||
# For now, we only support the threaded sync background worker. | ||
return BackgroundWorker(queue_size=options["transport_queue_size"]) | ||
def _create_worker(self, options: dict[str, Any]) -> Worker: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
raise NotImplementedError() | ||
|
||
def record_lost_event( | ||
self: Self, | ||
|
@@ -543,6 +552,9 @@ def _send_request( | |
finally: | ||
response.close() | ||
|
||
def _create_worker(self: Self, options: dict[str, Any]) -> Worker: | ||
return BackgroundWorker(queue_size=options["transport_queue_size"]) | ||
|
||
def _flush_client_reports(self: Self, force: bool = False) -> None: | ||
client_report = self._fetch_pending_client_report(force=force, interval=60) | ||
if client_report is not None: | ||
|
@@ -571,6 +583,222 @@ def flush( | |
self._worker.flush(timeout, callback) | ||
|
||
|
||
if not ASYNC_TRANSPORT_ENABLED: | ||
# Sorry, no AsyncHttpTransport for you | ||
AsyncHttpTransport = BaseHttpTransport | ||
|
||
else: | ||
|
||
class AsyncHttpTransport(HttpTransportCore): # type: ignore | ||
def __init__(self: Self, options: Dict[str, Any]) -> None: | ||
super().__init__(options) | ||
# Requires event loop at init time | ||
self.loop = asyncio.get_running_loop() | ||
|
||
def _create_worker(self: Self, options: dict[str, Any]) -> Worker: | ||
return AsyncWorker(queue_size=options["transport_queue_size"]) | ||
|
||
def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: | ||
return next( | ||
( | ||
val.decode("ascii") | ||
for key, val in response.headers | ||
if key.decode("ascii").lower() == header | ||
), | ||
None, | ||
) | ||
|
||
async def _send_envelope(self: Self, envelope: Envelope) -> None: | ||
_prepared_envelope = self._prepare_envelope(envelope) | ||
if _prepared_envelope is not None: | ||
envelope, body, headers = _prepared_envelope | ||
await self._send_request( | ||
body.getvalue(), | ||
headers=headers, | ||
endpoint_type=EndpointType.ENVELOPE, | ||
envelope=envelope, | ||
) | ||
return None | ||
|
||
async def _send_request( | ||
self: Self, | ||
body: bytes, | ||
headers: Dict[str, str], | ||
endpoint_type: EndpointType, | ||
envelope: Optional[Envelope], | ||
) -> None: | ||
self._update_headers(headers) | ||
try: | ||
response = await self._request( | ||
"POST", | ||
endpoint_type, | ||
body, | ||
headers, | ||
) | ||
except Exception: | ||
self._handle_request_error(envelope=envelope, loss_reason="network") | ||
raise | ||
try: | ||
self._handle_response(response=response, envelope=envelope) | ||
finally: | ||
await response.aclose() | ||
|
||
async def _request( # type: ignore[override] | ||
self: Self, | ||
method: str, | ||
endpoint_type: EndpointType, | ||
body: Any, | ||
headers: Mapping[str, str], | ||
) -> httpcore.Response: | ||
return await self._pool.request( | ||
method, | ||
self._auth.get_api_url(endpoint_type), | ||
content=body, | ||
headers=headers, # type: ignore | ||
extensions={ | ||
"timeout": { | ||
"pool": self.TIMEOUT, | ||
"connect": self.TIMEOUT, | ||
"write": self.TIMEOUT, | ||
"read": self.TIMEOUT, | ||
} | ||
}, | ||
) | ||
|
||
async def _flush_client_reports(self: Self, force: bool = False) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. think this does not need to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. merging it in, we will take care of this when we do the mixin There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is async because otherwise the worker process_callback needs the case distinction between coroutines and sync functions. See: |
||
client_report = self._fetch_pending_client_report(force=force, interval=60) | ||
if client_report is not None: | ||
self.capture_envelope(Envelope(items=[client_report])) | ||
|
||
def _capture_envelope(self: Self, envelope: Envelope) -> None: | ||
async def send_envelope_wrapper() -> None: | ||
with capture_internal_exceptions(): | ||
await self._send_envelope(envelope) | ||
await self._flush_client_reports() | ||
|
||
if not self._worker.submit(send_envelope_wrapper): | ||
self.on_dropped_event("full_queue") | ||
for item in envelope.items: | ||
self.record_lost_event("queue_overflow", item=item) | ||
|
||
def capture_envelope(self: Self, envelope: Envelope) -> None: | ||
# Synchronous entry point | ||
if self.loop and self.loop.is_running(): | ||
self.loop.call_soon_threadsafe(self._capture_envelope, envelope) | ||
else: | ||
# The event loop is no longer running | ||
logger.warning("Async Transport is not running in an event loop.") | ||
self.on_dropped_event("internal_sdk_error") | ||
for item in envelope.items: | ||
self.record_lost_event("internal_sdk_error", item=item) | ||
|
||
def flush( # type: ignore[override] | ||
self: Self, | ||
timeout: float, | ||
callback: Optional[Callable[[int, float], None]] = None, | ||
) -> Optional[asyncio.Task[None]]: | ||
logger.debug("Flushing HTTP transport") | ||
|
||
if timeout > 0: | ||
self._worker.submit(lambda: self._flush_client_reports(force=True)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Async Method Not Awaited in Synchronous ContextThe Locations (1)There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Async Method Not Awaited Causes Flush FailureThe |
||
return self._worker.flush(timeout, callback) # type: ignore[func-returns-value] | ||
return None | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def _get_pool_options(self: Self) -> Dict[str, Any]: | ||
options: Dict[str, Any] = { | ||
"http2": False, # no HTTP2 for now | ||
"retries": 3, | ||
} | ||
|
||
socket_options = ( | ||
self.options["socket_options"] | ||
if self.options["socket_options"] is not None | ||
else [] | ||
) | ||
|
||
used_options = {(o[0], o[1]) for o in socket_options} | ||
for default_option in KEEP_ALIVE_SOCKET_OPTIONS: | ||
if (default_option[0], default_option[1]) not in used_options: | ||
socket_options.append(default_option) | ||
|
||
options["socket_options"] = socket_options | ||
|
||
ssl_context = ssl.create_default_context() | ||
ssl_context.load_verify_locations( | ||
self.options["ca_certs"] # User-provided bundle from the SDK init | ||
or os.environ.get("SSL_CERT_FILE") | ||
or os.environ.get("REQUESTS_CA_BUNDLE") | ||
or certifi.where() | ||
) | ||
cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE") | ||
key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE") | ||
if cert_file is not None: | ||
ssl_context.load_cert_chain(cert_file, key_file) | ||
|
||
options["ssl_context"] = ssl_context | ||
|
||
return options | ||
|
||
def _make_pool( | ||
self: Self, | ||
) -> Union[ | ||
httpcore.AsyncSOCKSProxy, | ||
httpcore.AsyncHTTPProxy, | ||
httpcore.AsyncConnectionPool, | ||
]: | ||
if self.parsed_dsn is None: | ||
raise ValueError("Cannot create HTTP-based transport without valid DSN") | ||
proxy = None | ||
no_proxy = self._in_no_proxy(self.parsed_dsn) | ||
|
||
# try HTTPS first | ||
https_proxy = self.options["https_proxy"] | ||
if self.parsed_dsn.scheme == "https" and (https_proxy != ""): | ||
proxy = https_proxy or (not no_proxy and getproxies().get("https")) | ||
|
||
# maybe fallback to HTTP proxy | ||
http_proxy = self.options["http_proxy"] | ||
if not proxy and (http_proxy != ""): | ||
proxy = http_proxy or (not no_proxy and getproxies().get("http")) | ||
|
||
opts = self._get_pool_options() | ||
|
||
if proxy: | ||
proxy_headers = self.options["proxy_headers"] | ||
if proxy_headers: | ||
opts["proxy_headers"] = proxy_headers | ||
|
||
if proxy.startswith("socks"): | ||
try: | ||
if "socket_options" in opts: | ||
socket_options = opts.pop("socket_options") | ||
if socket_options: | ||
logger.warning( | ||
"You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options." | ||
) | ||
return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts) | ||
except RuntimeError: | ||
logger.warning( | ||
"You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.", | ||
proxy, | ||
) | ||
else: | ||
return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts) | ||
|
||
return httpcore.AsyncConnectionPool(**opts) | ||
|
||
def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore | ||
|
||
logger.debug("Killing HTTP transport") | ||
self._worker.kill() | ||
try: | ||
# Return the pool cleanup task so caller can await it if needed | ||
return self.loop.create_task(self._pool.aclose()) # type: ignore | ||
except RuntimeError: | ||
logger.warning("Event loop not running, aborting kill.") | ||
return None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Async Transport Flush IssuesThe Furthermore, the Lastly, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: AsyncHttpTransport Kill Method Fails Resource CleanupThe
Additional Locations (1)There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will remove the background task set, it should not be necessary anymore I think |
||
|
||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
class HttpTransport(BaseHttpTransport): | ||
if TYPE_CHECKING: | ||
_pool: Union[PoolManager, ProxyManager] | ||
|
@@ -816,11 +1044,22 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: | |
ref_transport = options["transport"] | ||
|
||
use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) | ||
|
||
use_async_transport = options.get("_experiments", {}).get("transport_async", False) | ||
# By default, we use the http transport class | ||
transport_cls: Type[Transport] = ( | ||
Http2Transport if use_http2_transport else HttpTransport | ||
) | ||
if use_async_transport and ASYNC_TRANSPORT_ENABLED: | ||
try: | ||
asyncio.get_running_loop() | ||
transport_cls = AsyncHttpTransport | ||
except RuntimeError: | ||
# No event loop running, fall back to sync transport | ||
logger.warning("No event loop running, falling back to sync transport.") | ||
elif use_async_transport: | ||
logger.warning( | ||
"You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport." | ||
) | ||
|
||
if isinstance(ref_transport, Transport): | ||
return ref_transport | ||
|
Uh oh!
There was an error while loading. Please reload this page.