Skip to content

Add experimental async transport #4572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: potel-base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements-testing.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ asttokens
responses
pysocks
socksio
httpcore[http2]
httpcore[http2,asyncio]
setuptools
freezegun
Brotli
Expand Down
3 changes: 2 additions & 1 deletion scripts/populate_tox/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
"pytest-asyncio",
"python-multipart",
"requests",
"anyio<4",
"anyio>=3,<5",
],
# There's an incompatibility between FastAPI's TestClient, which is
# actually Starlette's TestClient, which is actually httpx's Client.
Expand All @@ -106,6 +106,7 @@
# FastAPI versions we use older httpx which still supports the
# deprecated argument.
"<0.110.1": ["httpx<0.28.0"],
"<0.80": ["anyio<4"],
"py3.6": ["aiocontextvars"],
},
},
Expand Down
2 changes: 1 addition & 1 deletion scripts/populate_tox/tox.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ deps =
httpx-v0.25: pytest-httpx==0.25.0
httpx: pytest-httpx
# anyio is a dep of httpx
httpx: anyio<4.0.0
httpx: anyio>=3,<5
httpx-v0.16: httpx~=0.16.0
httpx-v0.18: httpx~=0.18.0
httpx-v0.20: httpx~=0.20.0
Expand Down
8 changes: 8 additions & 0 deletions sentry_sdk/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,14 @@ def flush(
return get_client().flush(timeout=timeout, callback=callback)


@clientmethod
async def flush_async(
timeout: Optional[float] = None,
callback: Optional[Callable[[int, float], None]] = None,
) -> None:
return await get_client().flush_async(timeout=timeout, callback=callback)


def start_span(**kwargs: Any) -> Span:
"""
Start and return a span.
Expand Down
105 changes: 90 additions & 15 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
)
from sentry_sdk.serializer import serialize
from sentry_sdk.tracing import trace
from sentry_sdk.transport import BaseHttpTransport, make_transport
from sentry_sdk.transport import HttpTransportCore, make_transport, AsyncHttpTransport
from sentry_sdk.consts import (
SPANDATA,
DEFAULT_MAX_VALUE_LENGTH,
Expand Down Expand Up @@ -214,6 +214,12 @@ def close(self, *args: Any, **kwargs: Any) -> None:
def flush(self, *args: Any, **kwargs: Any) -> None:
return None

async def close_async(self, *args: Any, **kwargs: Any) -> None:
return None

async def flush_async(self, *args: Any, **kwargs: Any) -> None:
return None

def __enter__(self) -> BaseClient:
return self

Expand Down Expand Up @@ -406,7 +412,7 @@ def _capture_envelope(envelope: Envelope) -> None:
self.monitor
or self.log_batcher
or has_profiling_enabled(self.options)
or isinstance(self.transport, BaseHttpTransport)
or isinstance(self.transport, HttpTransportCore)
):
# If we have anything on that could spawn a background thread, we
# need to check if it's safe to use them.
Expand Down Expand Up @@ -917,6 +923,14 @@ def get_integration(

return self.integrations.get(integration_name)

def _close_components(self) -> None:
"""Kill all client components in the correct order."""
self.session_flusher.kill()
if self.log_batcher is not None:
self.log_batcher.kill()
if self.monitor:
self.monitor.kill()

def close(
self,
timeout: Optional[float] = None,
Expand All @@ -927,19 +941,43 @@ def close(
semantics as :py:meth:`Client.flush`.
"""
if self.transport is not None:
if isinstance(self.transport, AsyncHttpTransport) and hasattr(
self.transport, "loop"
):
logger.debug(
"close() used with AsyncHttpTransport, aborting. Please use close_async() instead."
)
return
self.flush(timeout=timeout, callback=callback)

self.session_flusher.kill()

if self.log_batcher is not None:
self.log_batcher.kill()

if self.monitor:
self.monitor.kill()

self._close_components()
self.transport.kill()
self.transport = None

async def close_async(
self,
timeout: Optional[float] = None,
callback: Optional[Callable[[int, float], None]] = None,
) -> None:
"""
Asynchronously close the client and shut down the transport. Arguments have the same
semantics as :py:meth:`Client.flush_async`.
"""
if self.transport is not None:
if not (
isinstance(self.transport, AsyncHttpTransport)
and hasattr(self.transport, "loop")
):
logger.debug(
"close_async() used with non-async transport, aborting. Please use close() instead."
)
return
await self.flush_async(timeout=timeout, callback=callback)
self._close_components()
kill_task = self.transport.kill() # type: ignore
if kill_task is not None:
await kill_task
self.transport = None

def flush(
self,
timeout: Optional[float] = None,
Expand All @@ -953,15 +991,52 @@ def flush(
:param callback: Is invoked with the number of pending events and the configured timeout.
"""
if self.transport is not None:
if isinstance(self.transport, AsyncHttpTransport) and hasattr(
self.transport, "loop"
):
logger.debug(
"flush() used with AsyncHttpTransport, aborting. Please use flush_async() instead."
)
return
if timeout is None:
timeout = self.options["shutdown_timeout"]
self.session_flusher.flush()

if self.log_batcher is not None:
self.log_batcher.flush()
self._flush_components()

self.transport.flush(timeout=timeout, callback=callback)

async def flush_async(
self,
timeout: Optional[float] = None,
callback: Optional[Callable[[int, float], None]] = None,
) -> None:
"""
Asynchronously wait for the current events to be sent.

:param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used.

:param callback: Is invoked with the number of pending events and the configured timeout.
"""
if self.transport is not None:
if not (
isinstance(self.transport, AsyncHttpTransport)
and hasattr(self.transport, "loop")
):
logger.debug(
"flush_async() used with non-async transport, aborting. Please use flush() instead."
)
return
if timeout is None:
timeout = self.options["shutdown_timeout"]
self._flush_components()
flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore
if flush_task is not None:
await flush_task

def _flush_components(self) -> None:
self.session_flusher.flush()
if self.log_batcher is not None:
self.log_batcher.flush()

def __enter__(self) -> _Client:
return self

Expand Down
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class CompressionAlgo(Enum):
"transport_compression_algo": Optional[CompressionAlgo],
"transport_num_pools": Optional[int],
"transport_http2": Optional[bool],
"transport_async": Optional[bool],
"enable_logs": Optional[bool],
"before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]],
},
Expand Down
105 changes: 85 additions & 20 deletions sentry_sdk/integrations/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
import sentry_sdk
from sentry_sdk.consts import OP
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.utils import event_from_exception, logger, reraise
from sentry_sdk.utils import (
event_from_exception,
logger,
reraise,
is_internal_task,
)
from sentry_sdk.transport import AsyncHttpTransport

try:
import asyncio
Expand All @@ -29,6 +35,72 @@ def get_name(coro: Any) -> str:
)


def patch_loop_close() -> None:
"""Patch loop.close to flush pending events before shutdown."""
# Atexit shutdown hook happens after the event loop is closed.
# Therefore, it is necessary to patch the loop.close method to ensure
# that pending events are flushed before the interpreter shuts down.
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop → cannot patch now
return

if getattr(loop, "_sentry_flush_patched", False):
return

async def _flush() -> None:
client = sentry_sdk.get_client()
if not client:
return

try:
if not isinstance(client.transport, AsyncHttpTransport):
return

await client.close_async()
except Exception:
logger.warning("Sentry flush failed during loop shutdown", exc_info=True)

orig_close = loop.close

def _patched_close() -> None:
try:
loop.run_until_complete(_flush())
finally:
orig_close()

loop.close = _patched_close # type: ignore
loop._sentry_flush_patched = True # type: ignore


def _create_task_with_factory(
orig_task_factory: Any,
loop: asyncio.AbstractEventLoop,
coro: Coroutine[Any, Any, Any],
**kwargs: Any,
) -> asyncio.Task[Any]:
task = None

# Trying to use user set task factory (if there is one)
if orig_task_factory:
task = orig_task_factory(loop, coro, **kwargs)

if task is None:
# The default task factory in `asyncio` does not have its own function
# but is just a couple of lines in `asyncio.base_events.create_task()`
# Those lines are copied here.

# WARNING:
# If the default behavior of the task creation in asyncio changes,
# this will break!
task = Task(coro, loop=loop, **kwargs)
if task._source_traceback: # type: ignore
del task._source_traceback[-1] # type: ignore

return task


def patch_asyncio() -> None:
orig_task_factory = None
try:
Expand All @@ -41,6 +113,14 @@ def _sentry_task_factory(
**kwargs: Any,
) -> asyncio.Future[Any]:

# Check if this is an internal Sentry task
is_internal = is_internal_task()

if is_internal:
return _create_task_with_factory(
orig_task_factory, loop, coro, **kwargs
)

async def _task_with_sentry_span_creation() -> Any:
result = None

Expand All @@ -58,25 +138,9 @@ async def _task_with_sentry_span_creation() -> Any:

return result

task = None

# Trying to use user set task factory (if there is one)
if orig_task_factory:
task = orig_task_factory(
loop, _task_with_sentry_span_creation(), **kwargs
)

if task is None:
# The default task factory in `asyncio` does not have its own function
# but is just a couple of lines in `asyncio.base_events.create_task()`
# Those lines are copied here.

# WARNING:
# If the default behavior of the task creation in asyncio changes,
# this will break!
task = Task(_task_with_sentry_span_creation(), loop=loop, **kwargs)
if task._source_traceback: # type: ignore
del task._source_traceback[-1] # type: ignore
task = _create_task_with_factory(
orig_task_factory, loop, _task_with_sentry_span_creation(), **kwargs
)

# Set the task name to include the original coroutine's name
try:
Expand Down Expand Up @@ -124,3 +188,4 @@ class AsyncioIntegration(Integration):
@staticmethod
def setup_once() -> None:
patch_asyncio()
patch_loop_close()
Loading