|
29 | 29 |
|
30 | 30 | from sentry_sdk.consts import EndpointType
|
31 | 31 | from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
|
32 |
| -from sentry_sdk.worker import BackgroundWorker, Worker |
| 32 | +from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker |
33 | 33 | from sentry_sdk.envelope import Envelope, Item, PayloadRef
|
34 | 34 |
|
35 | 35 | from typing import TYPE_CHECKING
|
@@ -225,9 +225,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
|
225 | 225 | elif self._compression_algo == "br":
|
226 | 226 | self._compression_level = 4
|
227 | 227 |
|
228 |
| - def _create_worker(self: Self, options: Dict[str, Any]) -> Worker: |
229 |
| - # For now, we only support the threaded sync background worker. |
230 |
| - return BackgroundWorker(queue_size=options["transport_queue_size"]) |
| 228 | + def _create_worker(self, options: dict[str, Any]) -> Worker: |
| 229 | + async_enabled = options.get("_experiments", {}).get("transport_async", False) |
| 230 | + worker_cls = AsyncWorker if async_enabled else BackgroundWorker |
| 231 | + return worker_cls(queue_size=options["transport_queue_size"]) |
231 | 232 |
|
232 | 233 | def record_lost_event(
|
233 | 234 | self: Self,
|
@@ -647,18 +648,26 @@ async def send_envelope_wrapper() -> None:
|
647 | 648 |
|
648 | 649 | def capture_envelope(self: Self, envelope: Envelope) -> None:
|
649 | 650 | # Synchronous entry point
|
650 |
| - if asyncio.get_running_loop() is not None: |
| 651 | + try: |
| 652 | + asyncio.get_running_loop() |
651 | 653 | # We are on the main thread running the event loop
|
652 | 654 | task = asyncio.create_task(self._capture_envelope(envelope))
|
653 | 655 | self.background_tasks.add(task)
|
654 | 656 | task.add_done_callback(self.background_tasks.discard)
|
655 |
| - else: |
| 657 | + except RuntimeError: |
656 | 658 | # We are in a background thread, not running an event loop,
|
657 | 659 | # have to launch the task on the loop in a threadsafe way.
|
658 |
| - asyncio.run_coroutine_threadsafe( |
659 |
| - self._capture_envelope(envelope), |
660 |
| - self._loop, |
661 |
| - ) |
| 660 | + if self._loop and self._loop.is_running(): |
| 661 | + asyncio.run_coroutine_threadsafe( |
| 662 | + self._capture_envelope(envelope), |
| 663 | + self._loop, |
| 664 | + ) |
| 665 | + else: |
| 666 | + # The event loop is no longer running |
| 667 | + logger.warning("Async Transport is not running in an event loop.") |
| 668 | + self.on_dropped_event("no_async_context") |
| 669 | + for item in envelope.items: |
| 670 | + self.record_lost_event("no_async_context", item=item) |
662 | 671 |
|
663 | 672 | async def flush_async(
|
664 | 673 | self: Self,
|
@@ -998,11 +1007,13 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]:
|
998 | 1007 | ref_transport = options["transport"]
|
999 | 1008 |
|
1000 | 1009 | use_http2_transport = options.get("_experiments", {}).get("transport_http2", False)
|
1001 |
| - |
| 1010 | + use_async_transport = options.get("_experiments", {}).get("transport_async", False) |
1002 | 1011 | # By default, we use the http transport class
|
1003 |
| - transport_cls: Type[Transport] = ( |
1004 |
| - Http2Transport if use_http2_transport else HttpTransport |
1005 |
| - ) |
| 1012 | + if use_async_transport and asyncio.get_running_loop() is not None: |
| 1013 | + transport_cls: Type[Transport] = AsyncHttpTransport |
| 1014 | + else: |
| 1015 | + use_http2 = use_http2_transport |
| 1016 | + transport_cls = Http2Transport if use_http2 else HttpTransport |
1006 | 1017 |
|
1007 | 1018 | if isinstance(ref_transport, Transport):
|
1008 | 1019 | return ref_transport
|
|
0 commit comments