@@ -238,17 +238,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
238
238
self ._compression_level = 4
239
239
240
240
def _create_worker (self , options : dict [str , Any ]) -> Worker :
241
- async_enabled = options .get ("_experiments" , {}).get ("transport_async" , False )
242
- try :
243
- asyncio .get_running_loop ()
244
- worker_cls = (
245
- AsyncWorker
246
- if async_enabled and ASYNC_TRANSPORT_ENABLED
247
- else BackgroundWorker
248
- )
249
- except RuntimeError :
250
- worker_cls = BackgroundWorker
251
- return worker_cls (queue_size = options ["transport_queue_size" ])
241
+ raise NotImplementedError ()
252
242
253
243
def record_lost_event (
254
244
self : Self ,
@@ -565,6 +555,9 @@ def _send_request(
565
555
finally :
566
556
response .close ()
567
557
558
+ def _create_worker (self : Self , options : dict [str , Any ]) -> Worker :
559
+ return BackgroundWorker (queue_size = options ["transport_queue_size" ])
560
+
568
561
def _flush_client_reports (self : Self , force : bool = False ) -> None :
569
562
client_report = self ._fetch_pending_client_report (force = force , interval = 60 )
570
563
if client_report is not None :
@@ -595,12 +588,7 @@ def flush(
595
588
596
589
if not ASYNC_TRANSPORT_ENABLED :
597
590
# Sorry, no AsyncHttpTransport for you
598
- class AsyncHttpTransport (BaseHttpTransport ):
599
- def __init__ (self : Self , options : Dict [str , Any ]) -> None :
600
- super ().__init__ (options )
601
- logger .warning (
602
- "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport."
603
- )
591
+ AsyncHttpTransport = BaseHttpTransport
604
592
605
593
else :
606
594
@@ -611,6 +599,9 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
611
599
self .loop = asyncio .get_running_loop ()
612
600
self .background_tasks : set [asyncio .Task [None ]] = set ()
613
601
602
+ def _create_worker (self : Self , options : dict [str , Any ]) -> Worker :
603
+ return AsyncWorker (queue_size = options ["transport_queue_size" ])
604
+
614
605
def _get_header_value (self : Self , response : Any , header : str ) -> Optional [str ]:
615
606
return next (
616
607
(
@@ -683,7 +674,7 @@ async def _flush_client_reports(self: Self, force: bool = False) -> None:
683
674
if client_report is not None :
684
675
self .capture_envelope (Envelope (items = [client_report ]))
685
676
686
- async def _capture_envelope (self : Self , envelope : Envelope ) -> None :
677
+ def _capture_envelope (self : Self , envelope : Envelope ) -> None :
687
678
async def send_envelope_wrapper () -> None :
688
679
with capture_internal_exceptions ():
689
680
await self ._send_envelope (envelope )
@@ -696,26 +687,14 @@ async def send_envelope_wrapper() -> None:
696
687
697
688
def capture_envelope (self : Self , envelope : Envelope ) -> None :
698
689
# Synchronous entry point
699
- try :
700
- asyncio .get_running_loop ()
701
- # We are on the main thread running the event loop
702
- task = asyncio .create_task (self ._capture_envelope (envelope ))
703
- self .background_tasks .add (task )
704
- task .add_done_callback (self .background_tasks .discard )
705
- except RuntimeError :
706
- # We are in a background thread, not running an event loop,
707
- # have to launch the task on the loop in a threadsafe way.
708
- if self .loop and self .loop .is_running ():
709
- asyncio .run_coroutine_threadsafe (
710
- self ._capture_envelope (envelope ),
711
- self .loop ,
712
- )
713
- else :
714
- # The event loop is no longer running
715
- logger .warning ("Async Transport is not running in an event loop." )
716
- self .on_dropped_event ("internal_sdk_error" )
717
- for item in envelope .items :
718
- self .record_lost_event ("internal_sdk_error" , item = item )
690
+ if self .loop and self .loop .is_running ():
691
+ self .loop .call_soon_threadsafe (self ._capture_envelope , envelope )
692
+ else :
693
+ # The event loop is no longer running
694
+ logger .warning ("Async Transport is not running in an event loop." )
695
+ self .on_dropped_event ("internal_sdk_error" )
696
+ for item in envelope .items :
697
+ self .record_lost_event ("internal_sdk_error" , item = item )
719
698
720
699
def flush ( # type: ignore[override]
721
700
self : Self ,
@@ -1074,16 +1053,20 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]:
1074
1053
use_http2_transport = options .get ("_experiments" , {}).get ("transport_http2" , False )
1075
1054
use_async_transport = options .get ("_experiments" , {}).get ("transport_async" , False )
1076
1055
# By default, we use the http transport class
1077
- if use_async_transport :
1056
+ transport_cls : Type [Transport ] = (
1057
+ Http2Transport if use_http2_transport else HttpTransport
1058
+ )
1059
+ if use_async_transport and ASYNC_TRANSPORT_ENABLED :
1078
1060
try :
1079
1061
asyncio .get_running_loop ()
1080
- transport_cls : Type [ Transport ] = AsyncHttpTransport
1062
+ transport_cls = AsyncHttpTransport
1081
1063
except RuntimeError :
1082
1064
# No event loop running, fall back to sync transport
1083
1065
logger .warning ("No event loop running, falling back to sync transport." )
1084
- transport_cls = Http2Transport if use_http2_transport else HttpTransport
1085
- else :
1086
- transport_cls = Http2Transport if use_http2_transport else HttpTransport
1066
+ elif use_async_transport :
1067
+ logger .warning (
1068
+ "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport."
1069
+ )
1087
1070
1088
1071
if isinstance (ref_transport , Transport ):
1089
1072
return ref_transport
0 commit comments