diff --git a/requirements-testing.txt b/requirements-testing.txt index 8e7bc47be0..e9a972680c 100644 --- a/requirements-testing.txt +++ b/requirements-testing.txt @@ -11,7 +11,7 @@ asttokens responses pysocks socksio -httpcore[http2] +httpcore[http2,asyncio] setuptools freezegun Brotli diff --git a/scripts/populate_tox/config.py b/scripts/populate_tox/config.py index 78bed91475..06eac2aa83 100644 --- a/scripts/populate_tox/config.py +++ b/scripts/populate_tox/config.py @@ -96,7 +96,7 @@ "pytest-asyncio", "python-multipart", "requests", - "anyio<4", + "anyio>=3,<5", ], # There's an incompatibility between FastAPI's TestClient, which is # actually Starlette's TestClient, which is actually httpx's Client. @@ -106,6 +106,7 @@ # FastAPI versions we use older httpx which still supports the # deprecated argument. "<0.110.1": ["httpx<0.28.0"], + "<0.80": ["anyio<4"], "py3.6": ["aiocontextvars"], }, }, diff --git a/scripts/populate_tox/tox.jinja b/scripts/populate_tox/tox.jinja index 66b1d7885a..514566ea46 100644 --- a/scripts/populate_tox/tox.jinja +++ b/scripts/populate_tox/tox.jinja @@ -207,7 +207,7 @@ deps = httpx-v0.25: pytest-httpx==0.25.0 httpx: pytest-httpx # anyio is a dep of httpx - httpx: anyio<4.0.0 + httpx: anyio>=3,<5 httpx-v0.16: httpx~=0.16.0 httpx-v0.18: httpx~=0.18.0 httpx-v0.20: httpx~=0.20.0 diff --git a/sentry_sdk/api.py b/sentry_sdk/api.py index 3aefc57f69..3252622746 100644 --- a/sentry_sdk/api.py +++ b/sentry_sdk/api.py @@ -226,6 +226,14 @@ def flush( return get_client().flush(timeout=timeout, callback=callback) +@clientmethod +async def flush_async( + timeout: Optional[float] = None, + callback: Optional[Callable[[int, float], None]] = None, +) -> None: + return await get_client().flush_async(timeout=timeout, callback=callback) + + def start_span(**kwargs: Any) -> Span: """ Start and return a span. diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index 98553d8993..b9d07e1402 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -25,7 +25,7 @@ ) from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace -from sentry_sdk.transport import BaseHttpTransport, make_transport +from sentry_sdk.transport import HttpTransportCore, make_transport, AsyncHttpTransport from sentry_sdk.consts import ( SPANDATA, DEFAULT_MAX_VALUE_LENGTH, @@ -214,6 +214,12 @@ def close(self, *args: Any, **kwargs: Any) -> None: def flush(self, *args: Any, **kwargs: Any) -> None: return None + async def close_async(self, *args: Any, **kwargs: Any) -> None: + return None + + async def flush_async(self, *args: Any, **kwargs: Any) -> None: + return None + def __enter__(self) -> BaseClient: return self @@ -406,7 +412,7 @@ def _capture_envelope(envelope: Envelope) -> None: self.monitor or self.log_batcher or has_profiling_enabled(self.options) - or isinstance(self.transport, BaseHttpTransport) + or isinstance(self.transport, HttpTransportCore) ): # If we have anything on that could spawn a background thread, we # need to check if it's safe to use them. @@ -917,6 +923,14 @@ def get_integration( return self.integrations.get(integration_name) + def _close_components(self) -> None: + """Kill all client components in the correct order.""" + self.session_flusher.kill() + if self.log_batcher is not None: + self.log_batcher.kill() + if self.monitor: + self.monitor.kill() + def close( self, timeout: Optional[float] = None, @@ -927,19 +941,43 @@ def close( semantics as :py:meth:`Client.flush`. """ if self.transport is not None: + if isinstance(self.transport, AsyncHttpTransport) and hasattr( + self.transport, "loop" + ): + logger.debug( + "close() used with AsyncHttpTransport, aborting. Please use close_async() instead." + ) + return self.flush(timeout=timeout, callback=callback) - - self.session_flusher.kill() - - if self.log_batcher is not None: - self.log_batcher.kill() - - if self.monitor: - self.monitor.kill() - + self._close_components() self.transport.kill() self.transport = None + async def close_async( + self, + timeout: Optional[float] = None, + callback: Optional[Callable[[int, float], None]] = None, + ) -> None: + """ + Asynchronously close the client and shut down the transport. Arguments have the same + semantics as :py:meth:`Client.flush_async`. + """ + if self.transport is not None: + if not ( + isinstance(self.transport, AsyncHttpTransport) + and hasattr(self.transport, "loop") + ): + logger.debug( + "close_async() used with non-async transport, aborting. Please use close() instead." + ) + return + await self.flush_async(timeout=timeout, callback=callback) + self._close_components() + kill_task = self.transport.kill() # type: ignore + if kill_task is not None: + await kill_task + self.transport = None + def flush( self, timeout: Optional[float] = None, @@ -953,15 +991,52 @@ def flush( :param callback: Is invoked with the number of pending events and the configured timeout. """ if self.transport is not None: + if isinstance(self.transport, AsyncHttpTransport) and hasattr( + self.transport, "loop" + ): + logger.debug( + "flush() used with AsyncHttpTransport, aborting. Please use flush_async() instead." + ) + return if timeout is None: timeout = self.options["shutdown_timeout"] - self.session_flusher.flush() - - if self.log_batcher is not None: - self.log_batcher.flush() + self._flush_components() self.transport.flush(timeout=timeout, callback=callback) + async def flush_async( + self, + timeout: Optional[float] = None, + callback: Optional[Callable[[int, float], None]] = None, + ) -> None: + """ + Asynchronously wait for the current events to be sent. + + :param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used. + + :param callback: Is invoked with the number of pending events and the configured timeout. + """ + if self.transport is not None: + if not ( + isinstance(self.transport, AsyncHttpTransport) + and hasattr(self.transport, "loop") + ): + logger.debug( + "flush_async() used with non-async transport, aborting. Please use flush() instead." + ) + return + if timeout is None: + timeout = self.options["shutdown_timeout"] + self._flush_components() + flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore + if flush_task is not None: + await flush_task + + def _flush_components(self) -> None: + self.session_flusher.flush() + if self.log_batcher is not None: + self.log_batcher.flush() + def __enter__(self) -> _Client: return self diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 2b81fc4a2b..643cbb871a 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -75,6 +75,7 @@ class CompressionAlgo(Enum): "transport_compression_algo": Optional[CompressionAlgo], "transport_num_pools": Optional[int], "transport_http2": Optional[bool], + "transport_async": Optional[bool], "enable_logs": Optional[bool], "before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]], }, diff --git a/sentry_sdk/integrations/asyncio.py b/sentry_sdk/integrations/asyncio.py index 719cbba1a8..f6f1d57c8b 100644 --- a/sentry_sdk/integrations/asyncio.py +++ b/sentry_sdk/integrations/asyncio.py @@ -4,7 +4,13 @@ import sentry_sdk from sentry_sdk.consts import OP from sentry_sdk.integrations import Integration, DidNotEnable -from sentry_sdk.utils import event_from_exception, logger, reraise +from sentry_sdk.utils import ( + event_from_exception, + logger, + reraise, + is_internal_task, +) +from sentry_sdk.transport import AsyncHttpTransport try: import asyncio @@ -29,6 +35,72 @@ def get_name(coro: Any) -> str: ) +def patch_loop_close() -> None: + """Patch loop.close to flush pending events before shutdown.""" + # Atexit shutdown hook happens after the event loop is closed. + # Therefore, it is necessary to patch the loop.close method to ensure + # that pending events are flushed before the interpreter shuts down. + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop → cannot patch now + return + + if getattr(loop, "_sentry_flush_patched", False): + return + + async def _flush() -> None: + client = sentry_sdk.get_client() + if not client: + return + + try: + if not isinstance(client.transport, AsyncHttpTransport): + return + + await client.close_async() + except Exception: + logger.warning("Sentry flush failed during loop shutdown", exc_info=True) + + orig_close = loop.close + + def _patched_close() -> None: + try: + loop.run_until_complete(_flush()) + finally: + orig_close() + + loop.close = _patched_close # type: ignore + loop._sentry_flush_patched = True # type: ignore + + +def _create_task_with_factory( + orig_task_factory: Any, + loop: asyncio.AbstractEventLoop, + coro: Coroutine[Any, Any, Any], + **kwargs: Any, +) -> asyncio.Task[Any]: + task = None + + # Trying to use user set task factory (if there is one) + if orig_task_factory: + task = orig_task_factory(loop, coro, **kwargs) + + if task is None: + # The default task factory in `asyncio` does not have its own function + # but is just a couple of lines in `asyncio.base_events.create_task()` + # Those lines are copied here. + + # WARNING: + # If the default behavior of the task creation in asyncio changes, + # this will break! + task = Task(coro, loop=loop, **kwargs) + if task._source_traceback: # type: ignore + del task._source_traceback[-1] # type: ignore + + return task + + def patch_asyncio() -> None: orig_task_factory = None try: @@ -41,6 +113,14 @@ def _sentry_task_factory( **kwargs: Any, ) -> asyncio.Future[Any]: + # Check if this is an internal Sentry task + is_internal = is_internal_task() + + if is_internal: + return _create_task_with_factory( + orig_task_factory, loop, coro, **kwargs + ) + async def _task_with_sentry_span_creation() -> Any: result = None @@ -58,25 +138,9 @@ async def _task_with_sentry_span_creation() -> Any: return result - task = None - - # Trying to use user set task factory (if there is one) - if orig_task_factory: - task = orig_task_factory( - loop, _task_with_sentry_span_creation(), **kwargs - ) - - if task is None: - # The default task factory in `asyncio` does not have its own function - # but is just a couple of lines in `asyncio.base_events.create_task()` - # Those lines are copied here. - - # WARNING: - # If the default behavior of the task creation in asyncio changes, - # this will break! - task = Task(_task_with_sentry_span_creation(), loop=loop, **kwargs) - if task._source_traceback: # type: ignore - del task._source_traceback[-1] # type: ignore + task = _create_task_with_factory( + orig_task_factory, loop, _task_with_sentry_span_creation(), **kwargs + ) # Set the task name to include the original coroutine's name try: @@ -124,3 +188,4 @@ class AsyncioIntegration(Integration): @staticmethod def setup_once() -> None: patch_asyncio() + patch_loop_close() diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index ac7a8c3522..6d7e4c4f84 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -6,6 +6,7 @@ import socket import ssl import time +import asyncio from datetime import datetime, timedelta, timezone from collections import defaultdict from urllib.request import getproxies @@ -17,18 +18,34 @@ try: import httpcore +except ImportError: + httpcore = None # type: ignore + +try: import h2 # noqa: F401 - HTTP2_ENABLED = True + HTTP2_ENABLED = httpcore is not None except ImportError: HTTP2_ENABLED = False +try: + import anyio # noqa: F401 + + ASYNC_TRANSPORT_ENABLED = httpcore is not None +except ImportError: + ASYNC_TRANSPORT_ENABLED = False + import urllib3 import certifi from sentry_sdk.consts import EndpointType -from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions -from sentry_sdk.worker import BackgroundWorker +from sentry_sdk.utils import ( + Dsn, + logger, + capture_internal_exceptions, + mark_sentry_task_internal, +) +from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker from sentry_sdk.envelope import Envelope, Item, PayloadRef from typing import TYPE_CHECKING @@ -162,8 +179,8 @@ def _parse_rate_limits( continue -class BaseHttpTransport(Transport): - """The base HTTP transport.""" +class HttpTransportCore(Transport): + """Shared base class for sync and async transports.""" TIMEOUT = 30 # seconds @@ -173,7 +190,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: Transport.__init__(self, options) assert self.parsed_dsn is not None self.options: Dict[str, Any] = options - self._worker = BackgroundWorker(queue_size=options["transport_queue_size"]) + self._worker = self._create_worker(options) self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) self._disabled_until: Dict[Optional[str], datetime] = {} # We only use this Retry() class for the `get_retry_after` method it exposes @@ -224,6 +241,9 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: elif self._compression_algo == "br": self._compression_level = 4 + def _create_worker(self, options: dict[str, Any]) -> Worker: + raise NotImplementedError() + def record_lost_event( self: Self, reason: str, @@ -286,12 +306,8 @@ def _update_rate_limits( seconds=retry_after ) - def _send_request( - self: Self, - body: bytes, - headers: Dict[str, str], - endpoint_type: EndpointType = EndpointType.ENVELOPE, - envelope: Optional[Envelope] = None, + def _handle_request_error( + self: Self, envelope: Optional[Envelope], loss_reason: str = "network" ) -> None: def record_loss(reason: str) -> None: if envelope is None: @@ -300,45 +316,45 @@ def record_loss(reason: str) -> None: for item in envelope.items: self.record_lost_event(reason, item=item) + self.on_dropped_event(loss_reason) + record_loss("network_error") + + def _handle_response( + self: Self, + response: Union[urllib3.BaseHTTPResponse, httpcore.Response], + envelope: Optional[Envelope], + ) -> None: + self._update_rate_limits(response) + + if response.status == 429: + # if we hit a 429. Something was rate limited but we already + # acted on this in `self._update_rate_limits`. Note that we + # do not want to record event loss here as we will have recorded + # an outcome in relay already. + self.on_dropped_event("status_429") + pass + + elif response.status >= 300 or response.status < 200: + logger.error( + "Unexpected status code: %s (body: %s)", + response.status, + getattr(response, "data", getattr(response, "content", None)), + ) + self._handle_request_error( + envelope=envelope, loss_reason="status_{}".format(response.status) + ) + + def _update_headers( + self: Self, + headers: Dict[str, str], + ) -> None: + headers.update( { "User-Agent": str(self._auth.client), "X-Sentry-Auth": str(self._auth.to_header()), } ) - try: - response = self._request( - "POST", - endpoint_type, - body, - headers, - ) - except Exception: - self.on_dropped_event("network") - record_loss("network_error") - raise - - try: - self._update_rate_limits(response) - - if response.status == 429: - # if we hit a 429. Something was rate limited but we already - # acted on this in `self._update_rate_limits`. Note that we - # do not want to record event loss here as we will have recorded - # an outcome in relay already. - self.on_dropped_event("status_429") - pass - - elif response.status >= 300 or response.status < 200: - logger.error( - "Unexpected status code: %s (body: %s)", - response.status, - getattr(response, "data", getattr(response, "content", None)), - ) - self.on_dropped_event("status_{}".format(response.status)) - record_loss("network_error") - finally: - response.close() def on_dropped_event(self: Self, _reason: str) -> None: return None @@ -375,11 +391,6 @@ def _fetch_pending_client_report( type="client_report", ) - def _flush_client_reports(self: Self, force: bool = False) -> None: - client_report = self._fetch_pending_client_report(force=force, interval=60) - if client_report is not None: - self.capture_envelope(Envelope(items=[client_report])) - def _check_disabled(self: Self, category: EventDataCategory) -> bool: def _disabled(bucket: Optional[EventDataCategory]) -> bool: ts = self._disabled_until.get(bucket) @@ -398,7 +409,9 @@ def _is_worker_full(self: Self) -> bool: def is_healthy(self: Self) -> bool: return not (self._is_worker_full() or self._is_rate_limited()) - def _send_envelope(self: Self, envelope: Envelope) -> None: + def _prepare_envelope( + self: Self, envelope: Envelope + ) -> Optional[Tuple[Envelope, io.BytesIO, Dict[str, str]]]: # remove all items from the envelope which are over quota new_items = [] @@ -442,13 +455,7 @@ def _send_envelope(self: Self, envelope: Envelope) -> None: if content_encoding: headers["Content-Encoding"] = content_encoding - self._send_request( - body.getvalue(), - headers=headers, - endpoint_type=EndpointType.ENVELOPE, - envelope=envelope, - ) - return None + return envelope, body, headers def _serialize_envelope( self: Self, envelope: Envelope @@ -494,6 +501,9 @@ def _make_pool( httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool, + httpcore.AsyncSOCKSProxy, + httpcore.AsyncHTTPProxy, + httpcore.AsyncConnectionPool, ]: raise NotImplementedError() @@ -506,6 +516,57 @@ def _request( ) -> Union[urllib3.BaseHTTPResponse, httpcore.Response]: raise NotImplementedError() + def kill(self: Self) -> None: + logger.debug("Killing HTTP transport") + self._worker.kill() + + +class BaseHttpTransport(HttpTransportCore): + """The base HTTP transport.""" + + def _send_envelope(self: Self, envelope: Envelope) -> None: + _prepared_envelope = self._prepare_envelope(envelope) + if _prepared_envelope is not None: + envelope, body, headers = _prepared_envelope + self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) + return None + + def _send_request( + self: Self, + body: bytes, + headers: Dict[str, str], + endpoint_type: EndpointType, + envelope: Optional[Envelope], + ) -> None: + self._update_headers(headers) + try: + response = self._request( + "POST", + endpoint_type, + body, + headers, + ) + except Exception: + self._handle_request_error(envelope=envelope, loss_reason="network") + raise + try: + self._handle_response(response=response, envelope=envelope) + finally: + response.close() + + def _create_worker(self: Self, options: dict[str, Any]) -> Worker: + return BackgroundWorker(queue_size=options["transport_queue_size"]) + + def _flush_client_reports(self: Self, force: bool = False) -> None: + client_report = self._fetch_pending_client_report(force=force, interval=60) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) + def capture_envelope(self: Self, envelope: Envelope) -> None: def send_envelope_wrapper() -> None: with capture_internal_exceptions(): @@ -528,10 +589,6 @@ def flush( self._worker.submit(lambda: self._flush_client_reports(force=True)) self._worker.flush(timeout, callback) - def kill(self: Self) -> None: - logger.debug("Killing HTTP transport") - self._worker.kill() - class HttpTransport(BaseHttpTransport): if TYPE_CHECKING: @@ -639,6 +696,223 @@ def _request( ) +if not ASYNC_TRANSPORT_ENABLED: + # Sorry, no AsyncHttpTransport for you + AsyncHttpTransport = HttpTransport + +else: + + class AsyncHttpTransport(HttpTransportCore): # type: ignore + def __init__(self: Self, options: Dict[str, Any]) -> None: + super().__init__(options) + # Requires event loop at init time + self.loop = asyncio.get_running_loop() + + def _create_worker(self: Self, options: dict[str, Any]) -> Worker: + return AsyncWorker(queue_size=options["transport_queue_size"]) + + def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: + return next( + ( + val.decode("ascii") + for key, val in response.headers + if key.decode("ascii").lower() == header + ), + None, + ) + + async def _send_envelope(self: Self, envelope: Envelope) -> None: + _prepared_envelope = self._prepare_envelope(envelope) + if _prepared_envelope is not None: + envelope, body, headers = _prepared_envelope + await self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) + return None + + async def _send_request( + self: Self, + body: bytes, + headers: Dict[str, str], + endpoint_type: EndpointType, + envelope: Optional[Envelope], + ) -> None: + self._update_headers(headers) + try: + response = await self._request( + "POST", + endpoint_type, + body, + headers, + ) + except Exception: + self._handle_request_error(envelope=envelope, loss_reason="network") + raise + try: + self._handle_response(response=response, envelope=envelope) + finally: + await response.aclose() + + async def _request( # type: ignore[override] + self: Self, + method: str, + endpoint_type: EndpointType, + body: Any, + headers: Mapping[str, str], + ) -> httpcore.Response: + return await self._pool.request( + method, + self._auth.get_api_url(endpoint_type), + content=body, + headers=headers, # type: ignore + extensions={ + "timeout": { + "pool": self.TIMEOUT, + "connect": self.TIMEOUT, + "write": self.TIMEOUT, + "read": self.TIMEOUT, + } + }, + ) + + async def _flush_client_reports(self: Self, force: bool = False) -> None: + client_report = self._fetch_pending_client_report(force=force, interval=60) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) + + def _capture_envelope(self: Self, envelope: Envelope) -> None: + async def send_envelope_wrapper() -> None: + with capture_internal_exceptions(): + await self._send_envelope(envelope) + await self._flush_client_reports() + + if not self._worker.submit(send_envelope_wrapper): + self.on_dropped_event("full_queue") + for item in envelope.items: + self.record_lost_event("queue_overflow", item=item) + + def capture_envelope(self: Self, envelope: Envelope) -> None: + # Synchronous entry point + if self.loop and self.loop.is_running(): + self.loop.call_soon_threadsafe(self._capture_envelope, envelope) + else: + # The event loop is no longer running + logger.warning("Async Transport is not running in an event loop.") + self.on_dropped_event("internal_sdk_error") + for item in envelope.items: + self.record_lost_event("internal_sdk_error", item=item) + + def flush( # type: ignore[override] + self: Self, + timeout: float, + callback: Optional[Callable[[int, float], None]] = None, + ) -> Optional[asyncio.Task[None]]: + logger.debug("Flushing HTTP transport") + + if timeout > 0: + self._worker.submit(lambda: self._flush_client_reports(force=True)) + return self._worker.flush(timeout, callback) # type: ignore[func-returns-value] + return None + + def _get_pool_options(self: Self) -> Dict[str, Any]: + options: Dict[str, Any] = { + "http2": False, # no HTTP2 for now + "retries": 3, + } + + socket_options = ( + self.options["socket_options"] + if self.options["socket_options"] is not None + else [] + ) + + used_options = {(o[0], o[1]) for o in socket_options} + for default_option in KEEP_ALIVE_SOCKET_OPTIONS: + if (default_option[0], default_option[1]) not in used_options: + socket_options.append(default_option) + + options["socket_options"] = socket_options + + ssl_context = ssl.create_default_context() + ssl_context.load_verify_locations( + self.options["ca_certs"] # User-provided bundle from the SDK init + or os.environ.get("SSL_CERT_FILE") + or os.environ.get("REQUESTS_CA_BUNDLE") + or certifi.where() + ) + cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE") + key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE") + if cert_file is not None: + ssl_context.load_cert_chain(cert_file, key_file) + + options["ssl_context"] = ssl_context + + return options + + def _make_pool( + self: Self, + ) -> Union[ + httpcore.AsyncSOCKSProxy, + httpcore.AsyncHTTPProxy, + httpcore.AsyncConnectionPool, + ]: + if self.parsed_dsn is None: + raise ValueError("Cannot create HTTP-based transport without valid DSN") + proxy = None + no_proxy = self._in_no_proxy(self.parsed_dsn) + + # try HTTPS first + https_proxy = self.options["https_proxy"] + if self.parsed_dsn.scheme == "https" and (https_proxy != ""): + proxy = https_proxy or (not no_proxy and getproxies().get("https")) + + # maybe fallback to HTTP proxy + http_proxy = self.options["http_proxy"] + if not proxy and (http_proxy != ""): + proxy = http_proxy or (not no_proxy and getproxies().get("http")) + + opts = self._get_pool_options() + + if proxy: + proxy_headers = self.options["proxy_headers"] + if proxy_headers: + opts["proxy_headers"] = proxy_headers + + if proxy.startswith("socks"): + try: + if "socket_options" in opts: + socket_options = opts.pop("socket_options") + if socket_options: + logger.warning( + "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options." + ) + return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts) + except RuntimeError: + logger.warning( + "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.", + proxy, + ) + else: + return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts) + + return httpcore.AsyncConnectionPool(**opts) + + def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore + + logger.debug("Killing HTTP transport") + self._worker.kill() + try: + # Return the pool cleanup task so caller can await it if needed + with mark_sentry_task_internal(): + return self.loop.create_task(self._pool.aclose()) # type: ignore + except RuntimeError: + logger.warning("Event loop not running, aborting kill.") + return None + + if not HTTP2_ENABLED: # Sorry, no Http2Transport for you class Http2Transport(HttpTransport): @@ -778,11 +1052,32 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: ref_transport = options["transport"] use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) + use_async_transport = options.get("_experiments", {}).get("transport_async", False) + async_integration = any( + integration.__class__.__name__ == "AsyncioIntegration" + for integration in options.get("integrations") or [] + ) # By default, we use the http transport class transport_cls: Type[Transport] = ( Http2Transport if use_http2_transport else HttpTransport ) + if use_async_transport and ASYNC_TRANSPORT_ENABLED: + try: + asyncio.get_running_loop() + if async_integration: + transport_cls = AsyncHttpTransport + else: + logger.warning( + "You tried to use AsyncHttpTransport but the AsyncioIntegration is not enabled. Falling back to sync transport." + ) + except RuntimeError: + # No event loop running, fall back to sync transport + logger.warning("No event loop running, falling back to sync transport.") + elif use_async_transport: + logger.warning( + "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport." + ) if isinstance(ref_transport, Transport): return ref_transport diff --git a/sentry_sdk/utils.py b/sentry_sdk/utils.py index 0842749baf..ee2cbfb5f1 100644 --- a/sentry_sdk/utils.py +++ b/sentry_sdk/utils.py @@ -1,5 +1,6 @@ from __future__ import annotations import base64 +import contextvars import json import linecache import logging @@ -12,6 +13,7 @@ import threading import time from collections import namedtuple +from contextlib import contextmanager from datetime import datetime, timezone from decimal import Decimal from functools import partial, partialmethod, wraps @@ -72,6 +74,25 @@ _installed_modules = None +_is_sentry_internal_task = contextvars.ContextVar( + "is_sentry_internal_task", default=False +) + + +def is_internal_task(): + return _is_sentry_internal_task.get() + + +@contextmanager +def mark_sentry_task_internal(): + """Context manager to mark a task as Sentry internal.""" + token = _is_sentry_internal_task.set(True) + try: + yield + finally: + _is_sentry_internal_task.reset(token) + + BASE64_ALPHABET = re.compile(r"^[a-zA-Z0-9/+=]*$") FALSY_ENV_VALUES = frozenset(("false", "f", "n", "no", "off", "0")) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index d911e15623..8a23fa3ee1 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -1,10 +1,12 @@ from __future__ import annotations +from abc import ABC, abstractmethod import os import threading +import asyncio from time import sleep, time from sentry_sdk._queue import Queue, FullError -from sentry_sdk.utils import logger +from sentry_sdk.utils import logger, mark_sentry_task_internal from sentry_sdk.consts import DEFAULT_QUEUE_SIZE from typing import TYPE_CHECKING @@ -16,7 +18,65 @@ _TERMINATOR = object() -class BackgroundWorker: +class Worker(ABC): + """ + Base class for all workers. + + A worker is used to process events in the background and send them to Sentry. + """ + + @property + @abstractmethod + def is_alive(self) -> bool: + """ + Checks whether the worker is alive and running. + + Returns True if the worker is alive, False otherwise. + """ + pass + + @abstractmethod + def kill(self) -> None: + """ + Kills the worker. + + This method is used to kill the worker. The queue will be drained up to the point where the worker is killed. + The worker will not be able to process any more events. + """ + pass + + def flush( + self, timeout: float, callback: Optional[Callable[[int, float], Any]] = None + ) -> None: + """ + Flush the worker. + + This method blocks until the worker has flushed all events or the specified timeout is reached. + Default implementation is a no-op, since this method may only be relevant to some workers. + Subclasses should override this method if necessary. + """ + return None + + @abstractmethod + def full(self) -> bool: + """ + Checks whether the worker's queue is full. + + Returns True if the queue is full, False otherwise. + """ + pass + + @abstractmethod + def submit(self, callback: Callable[[], Any]) -> bool: + """ + Schedule a callback to be executed by the worker. + + Returns True if the callback was scheduled, False if the queue is full. + """ + pass + + +class BackgroundWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: self._queue: Queue = Queue(queue_size) self._lock = threading.Lock() @@ -106,7 +166,7 @@ def _wait_flush(self, timeout: float, callback: Optional[Any]) -> None: pending = self._queue.qsize() + 1 logger.error("flush timed out, dropped %s events", pending) - def submit(self, callback: Callable[[], None]) -> bool: + def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_thread() try: self._queue.put_nowait(callback) @@ -127,3 +187,137 @@ def _target(self) -> None: finally: self._queue.task_done() sleep(0) + + +class AsyncWorker(Worker): + def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: + self._queue: Optional[asyncio.Queue[Any]] = None + self._queue_size = queue_size + self._task: Optional[asyncio.Task[None]] = None + # Event loop needs to remain in the same process + self._task_for_pid: Optional[int] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + # Track active callback tasks so they have a strong reference and can be cancelled on kill + self._active_tasks: set[asyncio.Task[None]] = set() + + @property + def is_alive(self) -> bool: + if self._task_for_pid != os.getpid(): + return False + if not self._task or not self._loop: + return False + return self._loop.is_running() and not self._task.done() + + def kill(self) -> None: + if self._task: + if self._queue is not None: + try: + self._queue.put_nowait(_TERMINATOR) + except asyncio.QueueFull: + logger.debug("async worker queue full, kill failed") + # Also cancel any active callback tasks + # Avoid modifying the set while cancelling tasks + tasks_to_cancel = set(self._active_tasks) + for task in tasks_to_cancel: + task.cancel() + self._active_tasks.clear() + self._loop = None + self._task = None + self._task_for_pid = None + + def start(self) -> None: + if not self.is_alive: + try: + self._loop = asyncio.get_running_loop() + if self._queue is None: + self._queue = asyncio.Queue(maxsize=self._queue_size) + with mark_sentry_task_internal(): + self._task = self._loop.create_task(self._target()) + self._task_for_pid = os.getpid() + except RuntimeError: + # There is no event loop running + logger.warning("No event loop running, async worker not started") + self._loop = None + self._task = None + self._task_for_pid = None + + def full(self) -> bool: + if self._queue is None: + return True + return self._queue.full() + + def _ensure_task(self) -> None: + if not self.is_alive: + self.start() + + async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None: + if not self._loop or not self._loop.is_running() or self._queue is None: + return + + initial_timeout = min(0.1, timeout) + + # Timeout on the join + try: + await asyncio.wait_for(self._queue.join(), timeout=initial_timeout) + except asyncio.TimeoutError: + pending = self._queue.qsize() + len(self._active_tasks) + logger.debug("%d event(s) pending on flush", pending) + if callback is not None: + callback(pending, timeout) + + try: + remaining_timeout = timeout - initial_timeout + await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout) + except asyncio.TimeoutError: + pending = self._queue.qsize() + len(self._active_tasks) + logger.error("flush timed out, dropped %s events", pending) + + def flush(self, timeout: float, callback: Optional[Any] = None) -> Optional[asyncio.Task[None]]: # type: ignore[override] + if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running(): + with mark_sentry_task_internal(): + return self._loop.create_task(self._wait_flush(timeout, callback)) + return None + + def submit(self, callback: Callable[[], Any]) -> bool: + self._ensure_task() + if self._queue is None: + return False + try: + self._queue.put_nowait(callback) + return True + except asyncio.QueueFull: + return False + + async def _target(self) -> None: + if self._queue is None: + return + while True: + callback = await self._queue.get() + if callback is _TERMINATOR: + self._queue.task_done() + break + # Firing tasks instead of awaiting them allows for concurrent requests + with mark_sentry_task_internal(): + task = asyncio.create_task(self._process_callback(callback)) + # Create a strong reference to the task so it can be cancelled on kill + # and does not get garbage collected while running + self._active_tasks.add(task) + task.add_done_callback(self._on_task_complete) + # Yield to let the event loop run other tasks + await asyncio.sleep(0) + + async def _process_callback(self, callback: Callable[[], Any]) -> None: + # Callback is an async coroutine, need to await it + await callback() + + def _on_task_complete(self, task: asyncio.Task[None]) -> None: + try: + task.result() + except Exception: + logger.error("Failed processing job", exc_info=True) + finally: + # Mark the task as done and remove it from the active tasks set + # This happens only after the task has completed + if self._queue is not None: + self._queue.task_done() + self._active_tasks.discard(task) diff --git a/setup.py b/setup.py index e4a29d858a..d2d0bf7bac 100644 --- a/setup.py +++ b/setup.py @@ -60,6 +60,7 @@ def get_file_text(file_name): "flask": ["flask>=0.11", "blinker>=1.1", "markupsafe"], "grpcio": ["grpcio>=1.21.1", "protobuf>=3.8.0"], "http2": ["httpcore[http2]==1.*"], + "asyncio": ["httpcore[asyncio]==1.*"], "httpx": ["httpx>=0.16.0"], "huey": ["huey>=2"], "huggingface_hub": ["huggingface_hub>=0.22"], diff --git a/tests/integrations/asyncio/test_asyncio.py b/tests/integrations/asyncio/test_asyncio.py index 2ae71f8f43..42d8626ff3 100644 --- a/tests/integrations/asyncio/test_asyncio.py +++ b/tests/integrations/asyncio/test_asyncio.py @@ -8,6 +8,8 @@ import sentry_sdk from sentry_sdk.consts import OP from sentry_sdk.integrations.asyncio import AsyncioIntegration, patch_asyncio +from sentry_sdk.utils import mark_sentry_task_internal + try: from contextvars import Context, ContextVar @@ -377,3 +379,109 @@ async def test_span_origin( assert event["contexts"]["trace"]["origin"] == "manual" assert event["spans"][0]["origin"] == "auto.function.asyncio" + + +@minimum_python_38 +@pytest.mark.asyncio(loop_scope="module") +async def test_internal_tasks_not_wrapped(sentry_init, capture_events): + + sentry_init(integrations=[AsyncioIntegration()], traces_sample_rate=1.0) + events = capture_events() + + # Create a user task that should be wrapped + async def user_task(): + await asyncio.sleep(0.01) + return "user_result" + + # Create an internal task that should NOT be wrapped + async def internal_task(): + await asyncio.sleep(0.01) + return "internal_result" + + with sentry_sdk.start_transaction(name="test_transaction"): + user_task_obj = asyncio.create_task(user_task()) + + with mark_sentry_task_internal(): + internal_task_obj = asyncio.create_task(internal_task()) + + user_result = await user_task_obj + internal_result = await internal_task_obj + + assert user_result == "user_result" + assert internal_result == "internal_result" + + assert len(events) == 1 + transaction = events[0] + + user_spans = [] + internal_spans = [] + + for span in transaction.get("spans", []): + if "user_task" in span.get("description", ""): + user_spans.append(span) + elif "internal_task" in span.get("description", ""): + internal_spans.append(span) + + assert ( + len(user_spans) > 0 + ), f"User task should have been traced. All spans: {[s.get('description') for s in transaction.get('spans', [])]}" + assert ( + len(internal_spans) == 0 + ), f"Internal task should NOT have been traced. All spans: {[s.get('description') for s in transaction.get('spans', [])]}" + + +@minimum_python_38 +def test_loop_close_patching(sentry_init): + sentry_init(integrations=[AsyncioIntegration()]) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + with patch("asyncio.get_running_loop", return_value=loop): + assert not hasattr(loop, "_sentry_flush_patched") + AsyncioIntegration.setup_once() + assert hasattr(loop, "_sentry_flush_patched") + assert loop._sentry_flush_patched is True + + finally: + if not loop.is_closed(): + loop.close() + + +@minimum_python_38 +def test_loop_close_flushes_async_transport(sentry_init): + from sentry_sdk.transport import AsyncHttpTransport + from unittest.mock import Mock, AsyncMock + + sentry_init(integrations=[AsyncioIntegration()]) + + # Save the current event loop to restore it later + try: + original_loop = asyncio.get_event_loop() + except RuntimeError: + original_loop = None + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + with patch("asyncio.get_running_loop", return_value=loop): + AsyncioIntegration.setup_once() + + mock_client = Mock() + mock_transport = Mock(spec=AsyncHttpTransport) + mock_client.transport = mock_transport + mock_client.close_async = AsyncMock(return_value=None) + + with patch("sentry_sdk.get_client", return_value=mock_client): + loop.close() + + mock_client.close_async.assert_called_once() + mock_client.close_async.assert_awaited_once() + + finally: + if not loop.is_closed(): + loop.close() + if original_loop: + asyncio.set_event_loop(original_loop) diff --git a/tests/test_client.py b/tests/test_client.py index 8290c8e575..25a3a8ab00 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -23,9 +23,12 @@ from sentry_sdk.spotlight import DEFAULT_SPOTLIGHT_URL from sentry_sdk.utils import capture_internal_exception from sentry_sdk.integrations.executing import ExecutingIntegration -from sentry_sdk.transport import Transport +from sentry_sdk.integrations.asyncio import AsyncioIntegration + +from sentry_sdk.transport import Transport, AsyncHttpTransport from sentry_sdk.serializer import MAX_DATABAG_BREADTH from sentry_sdk.consts import DEFAULT_MAX_BREADCRUMBS, DEFAULT_MAX_VALUE_LENGTH +from sentry_sdk._compat import PY38 from typing import TYPE_CHECKING @@ -1498,3 +1501,323 @@ def test_keep_alive(env_value, arg_value, expected_value): ) assert transport_cls.options["keep_alive"] is expected_value + + +@pytest.mark.parametrize( + "testcase", + [ + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "https://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": "", + "arg_https_proxy": "", + "expected_proxy_scheme": None, + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": "", + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": "", + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": "", + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + # NO_PROXY testcases + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "env_no_proxy": "example.com,sentry.io", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + "arg_proxy_headers": {"Test-Header": "foo-bar"}, + }, + ], +) +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_proxy(monkeypatch, testcase): + # These are just the same tests as the sync ones, but they need to be run in an event loop + # and respect the shutdown behavior of the async transport + if testcase["env_http_proxy"] is not None: + monkeypatch.setenv("HTTP_PROXY", testcase["env_http_proxy"]) + if testcase["env_https_proxy"] is not None: + monkeypatch.setenv("HTTPS_PROXY", testcase["env_https_proxy"]) + if testcase.get("env_no_proxy") is not None: + monkeypatch.setenv("NO_PROXY", testcase["env_no_proxy"]) + + kwargs = { + "_experiments": {"transport_async": True}, + "integrations": [AsyncioIntegration()], + } + + if testcase["arg_http_proxy"] is not None: + kwargs["http_proxy"] = testcase["arg_http_proxy"] + if testcase["arg_https_proxy"] is not None: + kwargs["https_proxy"] = testcase["arg_https_proxy"] + if testcase.get("arg_proxy_headers") is not None: + kwargs["proxy_headers"] = testcase["arg_proxy_headers"] + + client = Client(testcase["dsn"], **kwargs) + assert isinstance(client.transport, AsyncHttpTransport) + + proxy = getattr( + client.transport._pool, + "proxy", + getattr(client.transport._pool, "_proxy_url", None), + ) + if testcase["expected_proxy_scheme"] is None: + assert proxy is None + else: + scheme = ( + proxy.scheme.decode("ascii") + if isinstance(proxy.scheme, bytes) + else proxy.scheme + ) + assert scheme == testcase["expected_proxy_scheme"] + + if testcase.get("arg_proxy_headers") is not None: + proxy_headers = dict( + (k.decode("ascii"), v.decode("ascii")) + for k, v in client.transport._pool._proxy_headers + ) + assert proxy_headers == testcase["arg_proxy_headers"] + + await client.close_async() + + +@pytest.mark.parametrize( + "testcase", + [ + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": False, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks4a://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks4://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks5h://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks5://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks4a://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks4://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks5h://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks5://localhost/123", + "should_be_socks_proxy": True, + }, + ], +) +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_socks_proxy(testcase): + # These are just the same tests as the sync ones, but they need to be run in an event loop + # and respect the shutdown behavior of the async transport + + kwargs = { + "_experiments": {"transport_async": True}, + "integrations": [AsyncioIntegration()], + } + + if testcase["arg_http_proxy"] is not None: + kwargs["http_proxy"] = testcase["arg_http_proxy"] + if testcase["arg_https_proxy"] is not None: + kwargs["https_proxy"] = testcase["arg_https_proxy"] + + client = Client(testcase["dsn"], **kwargs) + assert isinstance(client.transport, AsyncHttpTransport) + + assert ("socks" in str(type(client.transport._pool)).lower()) == testcase[ + "should_be_socks_proxy" + ], ( + f"Expected {kwargs} to result in SOCKS == {testcase['should_be_socks_proxy']}" + f"but got {str(type(client.transport._pool))}" + ) + + await client.close_async() diff --git a/tests/test_transport.py b/tests/test_transport.py index 7e0cc6383c..53426795c6 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -3,6 +3,8 @@ import os import socket import sys +import asyncio +import threading from collections import defaultdict from datetime import datetime, timedelta, timezone from unittest import mock @@ -28,8 +30,10 @@ from sentry_sdk.transport import ( KEEP_ALIVE_SOCKET_OPTIONS, _parse_rate_limits, + AsyncHttpTransport, ) from sentry_sdk.integrations.logging import LoggingIntegration, ignore_logger +from sentry_sdk.integrations.asyncio import AsyncioIntegration server = None @@ -146,6 +150,89 @@ def test_transport_works( assert any("Sending envelope" in record.msg for record in caplog.records) == debug +@pytest.mark.asyncio +@pytest.mark.parametrize("debug", (True, False)) +@pytest.mark.parametrize("client_flush_method", ["close", "flush"]) +@pytest.mark.parametrize("use_pickle", (True, False)) +@pytest.mark.parametrize("compression_level", (0, 9, None)) +@pytest.mark.parametrize("compression_algo", ("gzip", "br", "", None)) +@pytest.mark.skipif(not PY38, reason="Async transport only supported in Python 3.8+") +async def test_transport_works_async( + capturing_server, + request, + capsys, + caplog, + debug, + make_client, + client_flush_method, + use_pickle, + compression_level, + compression_algo, +): + caplog.set_level(logging.DEBUG) + + experiments = {} + if compression_level is not None: + experiments["transport_compression_level"] = compression_level + + if compression_algo is not None: + experiments["transport_compression_algo"] = compression_algo + + # Enable async transport + experiments["transport_async"] = True + + client = make_client( + debug=debug, + _experiments=experiments, + integrations=[AsyncioIntegration()], + ) + + if use_pickle: + client = pickle.loads(pickle.dumps(client)) + + # Verify we're using async transport + assert isinstance( + client.transport, AsyncHttpTransport + ), "Expected AsyncHttpTransport" + + sentry_sdk.get_global_scope().set_client(client) + request.addfinalizer(lambda: sentry_sdk.get_global_scope().set_client(None)) + + add_breadcrumb( + level="info", message="i like bread", timestamp=datetime.now(timezone.utc) + ) + capture_message("löl") + + if client_flush_method == "close": + await client.close_async(timeout=2.0) + if client_flush_method == "flush": + await client.flush_async(timeout=2.0) + + out, err = capsys.readouterr() + assert not err and not out + assert capturing_server.captured + should_compress = ( + # default is to compress with brotli if available, gzip otherwise + (compression_level is None) + or ( + # setting compression level to 0 means don't compress + compression_level + > 0 + ) + ) and ( + # if we couldn't resolve to a known algo, we don't compress + compression_algo + != "" + ) + + assert capturing_server.captured[0].compressed == should_compress + # After flush, the worker task is still running, but the end of the test will shut down the event loop + # Therefore, we need to explicitly close the client to clean up the worker task + assert any("Sending envelope" in record.msg for record in caplog.records) == debug + if client_flush_method == "flush": + await client.close_async(timeout=2.0) + + @pytest.mark.parametrize( "num_pools,expected_num_pools", ( @@ -641,3 +728,214 @@ def test_record_lost_event_transaction_item(capturing_server, make_client, span_ "reason": "test", "quantity": span_count + 1, } in discarded_events + + +def test_handle_unexpected_status_invokes_handle_request_error( + make_client, monkeypatch +): + client = make_client() + transport = client.transport + + monkeypatch.setattr(transport._worker, "submit", lambda fn: fn() or True) + + def stub_request(method, endpoint, body=None, headers=None): + class MockResponse: + def __init__(self): + self.status = 500 # Integer + self.data = b"server error" + self.headers = {} + + def close(self): + pass + + return MockResponse() + + monkeypatch.setattr(transport, "_request", stub_request) + + seen = [] + monkeypatch.setattr( + transport, + "_handle_request_error", + lambda envelope, loss_reason: seen.append(loss_reason), + ) + + client.capture_event({"message": "test"}) + client.flush() + + assert seen == ["status_500"] + + +def test_handle_request_error_basic_coverage(make_client, monkeypatch): + client = make_client() + transport = client.transport + + monkeypatch.setattr(transport._worker, "submit", lambda fn: fn() or True) + + # Track method calls + calls = [] + + def mock_on_dropped_event(reason): + calls.append(("on_dropped_event", reason)) + + def mock_record_lost_event(reason, data_category=None, item=None): + calls.append(("record_lost_event", reason, data_category, item)) + + monkeypatch.setattr(transport, "on_dropped_event", mock_on_dropped_event) + monkeypatch.setattr(transport, "record_lost_event", mock_record_lost_event) + + # Test case 1: envelope is None + transport._handle_request_error(envelope=None, loss_reason="test_reason") + + assert len(calls) == 2 + assert calls[0] == ("on_dropped_event", "test_reason") + assert calls[1] == ("record_lost_event", "network_error", "error", None) + + # Reset + calls.clear() + + # Test case 2: envelope with items + envelope = Envelope() + envelope.add_item(mock.MagicMock()) # Simple mock item + envelope.add_item(mock.MagicMock()) # Another mock item + + transport._handle_request_error(envelope=envelope, loss_reason="connection_error") + + assert len(calls) == 3 + assert calls[0] == ("on_dropped_event", "connection_error") + assert calls[1][0:2] == ("record_lost_event", "network_error") + assert calls[2][0:2] == ("record_lost_event", "network_error") + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_background_thread_capture( + capturing_server, make_client, caplog +): + """Test capture_envelope from background threads uses run_coroutine_threadsafe""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + captured_from_thread = [] + exception_from_thread = [] + + def background_thread_work(): + try: + # This should use run_coroutine_threadsafe path + capture_message("from background thread") + captured_from_thread.append(True) + except Exception as e: + exception_from_thread.append(e) + + thread = threading.Thread(target=background_thread_work) + thread.start() + thread.join() + assert not exception_from_thread + assert captured_from_thread + await client.close_async(timeout=2.0) + assert capturing_server.captured + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_event_loop_closed_scenario( + capturing_server, make_client, caplog +): + """Test behavior when trying to capture after event loop context ends""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + sentry_sdk.get_global_scope().set_client(client) + original_loop = client.transport.loop + + with mock.patch("asyncio.get_running_loop", side_effect=RuntimeError("no loop")): + with mock.patch.object(client.transport.loop, "is_running", return_value=False): + with mock.patch("sentry_sdk.transport.logger") as mock_logger: + # This should trigger the "no_async_context" path + capture_message("after loop closed") + + mock_logger.warning.assert_called_with( + "Async Transport is not running in an event loop." + ) + + client.transport.loop = original_loop + await client.close_async(timeout=2.0) + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_concurrent_requests( + capturing_server, make_client, caplog +): + """Test multiple simultaneous envelope submissions""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + + num_messages = 15 + + async def send_message(i): + capture_message(f"concurrent message {i}") + + tasks = [send_message(i) for i in range(num_messages)] + await asyncio.gather(*tasks) + await client.close_async(timeout=2.0) + assert len(capturing_server.captured) == num_messages + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_rate_limiting_with_concurrency( + capturing_server, make_client, request +): + """Test async transport rate limiting with concurrent requests""" + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + request.addfinalizer(lambda: sentry_sdk.get_global_scope().set_client(None)) + capturing_server.respond_with( + code=429, headers={"X-Sentry-Rate-Limits": "60:error:organization"} + ) + + # Send one request first to trigger rate limiting + capture_message("initial message") + await asyncio.sleep(0.1) # Wait for request to execute + assert client.transport._check_disabled("error") is True + capturing_server.clear_captured() + + async def send_message(i): + capture_message(f"message {i}") + await asyncio.sleep(0.01) + + await asyncio.gather(*[send_message(i) for i in range(5)]) + await asyncio.sleep(0.1) + # New request should be dropped due to rate limiting + assert len(capturing_server.captured) == 0 + await client.close_async(timeout=2.0) + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_two_way_ssl_authentication(): + current_dir = os.path.dirname(__file__) + cert_file = f"{current_dir}/test.pem" + key_file = f"{current_dir}/test.key" + + client = Client( + "https://foo@sentry.io/123", + cert_file=cert_file, + key_file=key_file, + _experiments={"transport_async": True}, + integrations=[AsyncioIntegration()], + ) + assert isinstance(client.transport, AsyncHttpTransport) + + options = client.transport._get_pool_options() + assert options["ssl_context"] is not None + + await client.close_async() diff --git a/tox.ini b/tox.ini index fd52035fac..d6f5e173eb 100644 --- a/tox.ini +++ b/tox.ini @@ -10,7 +10,7 @@ # The file (and all resulting CI YAMLs) then need to be regenerated via # "scripts/generate-test-files.sh". # -# Last generated: 2025-07-23T07:24:30.467173+00:00 +# Last generated: 2025-07-30T13:59:12.959550+00:00 [tox] requires = @@ -125,16 +125,16 @@ envlist = # ~~~ Common ~~~ {py3.7,py3.8,py3.9}-common-v1.4.1 - {py3.7,py3.8,py3.9,py3.10,py3.11}-common-v1.14.0 - {py3.8,py3.9,py3.10,py3.11}-common-v1.24.0 - {py3.9,py3.10,py3.11,py3.12,py3.13}-common-v1.35.0 + {py3.7,py3.8,py3.9,py3.10,py3.11}-common-v1.15.0 + {py3.8,py3.9,py3.10,py3.11,py3.12}-common-v1.26.0 + {py3.9,py3.10,py3.11,py3.12,py3.13}-common-v1.36.0 # ~~~ AI ~~~ {py3.8,py3.11,py3.12}-anthropic-v0.16.0 - {py3.8,py3.11,py3.12}-anthropic-v0.30.1 - {py3.8,py3.11,py3.12}-anthropic-v0.44.0 - {py3.8,py3.12,py3.13}-anthropic-v0.58.2 + {py3.8,py3.11,py3.12}-anthropic-v0.31.2 + {py3.8,py3.11,py3.12}-anthropic-v0.46.0 + {py3.8,py3.12,py3.13}-anthropic-v0.60.0 {py3.9,py3.10,py3.11}-cohere-v5.4.0 {py3.9,py3.11,py3.12}-cohere-v5.9.4 @@ -143,12 +143,13 @@ envlist = {py3.10,py3.11,py3.12}-openai_agents-v0.0.19 {py3.10,py3.12,py3.13}-openai_agents-v0.1.0 - {py3.10,py3.12,py3.13}-openai_agents-v0.2.3 + {py3.10,py3.12,py3.13}-openai_agents-v0.2.4 {py3.8,py3.10,py3.11}-huggingface_hub-v0.22.2 {py3.8,py3.11,py3.12}-huggingface_hub-v0.26.5 {py3.8,py3.12,py3.13}-huggingface_hub-v0.30.2 - {py3.8,py3.12,py3.13}-huggingface_hub-v0.33.4 + {py3.8,py3.12,py3.13}-huggingface_hub-v0.34.3 + {py3.8,py3.12,py3.13}-huggingface_hub-v0.35.0rc0 # ~~~ DBs ~~~ @@ -164,7 +165,7 @@ envlist = {py3.7,py3.8,py3.9}-sqlalchemy-v1.3.24 {py3.7,py3.11,py3.12}-sqlalchemy-v1.4.54 - {py3.7,py3.12,py3.13}-sqlalchemy-v2.0.41 + {py3.7,py3.12,py3.13}-sqlalchemy-v2.0.42 # ~~~ Flags ~~~ @@ -179,7 +180,7 @@ envlist = {py3.7,py3.12,py3.13}-statsig-v0.55.3 {py3.7,py3.12,py3.13}-statsig-v0.57.3 {py3.7,py3.12,py3.13}-statsig-v0.59.1 - {py3.7,py3.12,py3.13}-statsig-v0.60.0 + {py3.7,py3.12,py3.13}-statsig-v0.61.0 {py3.8,py3.12,py3.13}-unleash-v6.0.1 {py3.8,py3.12,py3.13}-unleash-v6.1.0 @@ -210,8 +211,7 @@ envlist = {py3.7,py3.8}-grpc-v1.32.0 {py3.7,py3.9,py3.10}-grpc-v1.46.5 {py3.7,py3.11,py3.12}-grpc-v1.60.2 - {py3.9,py3.12,py3.13}-grpc-v1.73.1 - {py3.9,py3.12,py3.13}-grpc-v1.74.0rc1 + {py3.9,py3.12,py3.13}-grpc-v1.74.0 # ~~~ Tasks ~~~ @@ -262,7 +262,7 @@ envlist = {py3.7}-aiohttp-v3.4.4 {py3.7,py3.8,py3.9}-aiohttp-v3.7.4 {py3.8,py3.12,py3.13}-aiohttp-v3.10.11 - {py3.9,py3.12,py3.13}-aiohttp-v3.12.14 + {py3.9,py3.12,py3.13}-aiohttp-v3.12.15 {py3.7}-bottle-v0.12.25 {py3.8,py3.12,py3.13}-bottle-v0.13.4 @@ -378,7 +378,7 @@ deps = httpx-v0.25: pytest-httpx==0.25.0 httpx: pytest-httpx # anyio is a dep of httpx - httpx: anyio<4.0.0 + httpx: anyio>=3,<5 httpx-v0.16: httpx~=0.16.0 httpx-v0.18: httpx~=0.18.0 httpx-v0.20: httpx~=0.20.0 @@ -484,9 +484,9 @@ deps = # ~~~ Common ~~~ common-v1.4.1: opentelemetry-sdk==1.4.1 - common-v1.14.0: opentelemetry-sdk==1.14.0 - common-v1.24.0: opentelemetry-sdk==1.24.0 - common-v1.35.0: opentelemetry-sdk==1.35.0 + common-v1.15.0: opentelemetry-sdk==1.15.0 + common-v1.26.0: opentelemetry-sdk==1.26.0 + common-v1.36.0: opentelemetry-sdk==1.36.0 common: pytest common: pytest-asyncio py3.7-common: pytest<7.0.0 @@ -495,13 +495,13 @@ deps = # ~~~ AI ~~~ anthropic-v0.16.0: anthropic==0.16.0 - anthropic-v0.30.1: anthropic==0.30.1 - anthropic-v0.44.0: anthropic==0.44.0 - anthropic-v0.58.2: anthropic==0.58.2 + anthropic-v0.31.2: anthropic==0.31.2 + anthropic-v0.46.0: anthropic==0.46.0 + anthropic-v0.60.0: anthropic==0.60.0 anthropic: pytest-asyncio anthropic-v0.16.0: httpx<0.28.0 - anthropic-v0.30.1: httpx<0.28.0 - anthropic-v0.44.0: httpx<0.28.0 + anthropic-v0.31.2: httpx<0.28.0 + anthropic-v0.46.0: httpx<0.28.0 cohere-v5.4.0: cohere==5.4.0 cohere-v5.9.4: cohere==5.9.4 @@ -510,13 +510,14 @@ deps = openai_agents-v0.0.19: openai-agents==0.0.19 openai_agents-v0.1.0: openai-agents==0.1.0 - openai_agents-v0.2.3: openai-agents==0.2.3 + openai_agents-v0.2.4: openai-agents==0.2.4 openai_agents: pytest-asyncio huggingface_hub-v0.22.2: huggingface_hub==0.22.2 huggingface_hub-v0.26.5: huggingface_hub==0.26.5 huggingface_hub-v0.30.2: huggingface_hub==0.30.2 - huggingface_hub-v0.33.4: huggingface_hub==0.33.4 + huggingface_hub-v0.34.3: huggingface_hub==0.34.3 + huggingface_hub-v0.35.0rc0: huggingface_hub==0.35.0rc0 # ~~~ DBs ~~~ @@ -533,7 +534,7 @@ deps = sqlalchemy-v1.3.24: sqlalchemy==1.3.24 sqlalchemy-v1.4.54: sqlalchemy==1.4.54 - sqlalchemy-v2.0.41: sqlalchemy==2.0.41 + sqlalchemy-v2.0.42: sqlalchemy==2.0.42 # ~~~ Flags ~~~ @@ -548,7 +549,7 @@ deps = statsig-v0.55.3: statsig==0.55.3 statsig-v0.57.3: statsig==0.57.3 statsig-v0.59.1: statsig==0.59.1 - statsig-v0.60.0: statsig==0.60.0 + statsig-v0.61.0: statsig==0.61.0 statsig: typing_extensions unleash-v6.0.1: UnleashClient==6.0.1 @@ -592,8 +593,7 @@ deps = grpc-v1.32.0: grpcio==1.32.0 grpc-v1.46.5: grpcio==1.46.5 grpc-v1.60.2: grpcio==1.60.2 - grpc-v1.73.1: grpcio==1.73.1 - grpc-v1.74.0rc1: grpcio==1.74.0rc1 + grpc-v1.74.0: grpcio==1.74.0 grpc: protobuf grpc: mypy-protobuf grpc: types-protobuf @@ -686,10 +686,11 @@ deps = fastapi: pytest-asyncio fastapi: python-multipart fastapi: requests - fastapi: anyio<4 + fastapi: anyio>=3,<5 fastapi-v0.79.1: httpx<0.28.0 fastapi-v0.91.0: httpx<0.28.0 fastapi-v0.103.2: httpx<0.28.0 + fastapi-v0.79.1: anyio<4 py3.6-fastapi: aiocontextvars @@ -697,10 +698,10 @@ deps = aiohttp-v3.4.4: aiohttp==3.4.4 aiohttp-v3.7.4: aiohttp==3.7.4 aiohttp-v3.10.11: aiohttp==3.10.11 - aiohttp-v3.12.14: aiohttp==3.12.14 + aiohttp-v3.12.15: aiohttp==3.12.15 aiohttp: pytest-aiohttp aiohttp-v3.10.11: pytest-asyncio - aiohttp-v3.12.14: pytest-asyncio + aiohttp-v3.12.15: pytest-asyncio bottle-v0.12.25: bottle==0.12.25 bottle-v0.13.4: bottle==0.13.4