@@ -235,17 +235,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
235235 self ._compression_level = 4
236236
237237 def _create_worker (self , options : dict [str , Any ]) -> Worker :
238- async_enabled = options .get ("_experiments" , {}).get ("transport_async" , False )
239- try :
240- asyncio .get_running_loop ()
241- worker_cls = (
242- AsyncWorker
243- if async_enabled and ASYNC_TRANSPORT_ENABLED
244- else BackgroundWorker
245- )
246- except RuntimeError :
247- worker_cls = BackgroundWorker
248- return worker_cls (queue_size = options ["transport_queue_size" ])
238+ raise NotImplementedError ()
249239
250240 def record_lost_event (
251241 self : Self ,
@@ -562,6 +552,9 @@ def _send_request(
562552 finally :
563553 response .close ()
564554
555+ def _create_worker (self : Self , options : dict [str , Any ]) -> Worker :
556+ return BackgroundWorker (queue_size = options ["transport_queue_size" ])
557+
565558 def _flush_client_reports (self : Self , force : bool = False ) -> None :
566559 client_report = self ._fetch_pending_client_report (force = force , interval = 60 )
567560 if client_report is not None :
@@ -592,12 +585,7 @@ def flush(
592585
593586if not ASYNC_TRANSPORT_ENABLED :
594587 # Sorry, no AsyncHttpTransport for you
595- class AsyncHttpTransport (BaseHttpTransport ):
596- def __init__ (self : Self , options : Dict [str , Any ]) -> None :
597- super ().__init__ (options )
598- logger .warning (
599- "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport."
600- )
588+ AsyncHttpTransport = BaseHttpTransport
601589
602590else :
603591
@@ -608,6 +596,9 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
608596 self .loop = asyncio .get_running_loop ()
609597 self .background_tasks : set [asyncio .Task [None ]] = set ()
610598
599+ def _create_worker (self : Self , options : dict [str , Any ]) -> Worker :
600+ return AsyncWorker (queue_size = options ["transport_queue_size" ])
601+
611602 def _get_header_value (self : Self , response : Any , header : str ) -> Optional [str ]:
612603 return next (
613604 (
@@ -680,7 +671,7 @@ async def _flush_client_reports(self: Self, force: bool = False) -> None:
680671 if client_report is not None :
681672 self .capture_envelope (Envelope (items = [client_report ]))
682673
683- async def _capture_envelope (self : Self , envelope : Envelope ) -> None :
674+ def _capture_envelope (self : Self , envelope : Envelope ) -> None :
684675 async def send_envelope_wrapper () -> None :
685676 with capture_internal_exceptions ():
686677 await self ._send_envelope (envelope )
@@ -693,26 +684,14 @@ async def send_envelope_wrapper() -> None:
693684
694685 def capture_envelope (self : Self , envelope : Envelope ) -> None :
695686 # Synchronous entry point
696- try :
697- asyncio .get_running_loop ()
698- # We are on the main thread running the event loop
699- task = asyncio .create_task (self ._capture_envelope (envelope ))
700- self .background_tasks .add (task )
701- task .add_done_callback (self .background_tasks .discard )
702- except RuntimeError :
703- # We are in a background thread, not running an event loop,
704- # have to launch the task on the loop in a threadsafe way.
705- if self .loop and self .loop .is_running ():
706- asyncio .run_coroutine_threadsafe (
707- self ._capture_envelope (envelope ),
708- self .loop ,
709- )
710- else :
711- # The event loop is no longer running
712- logger .warning ("Async Transport is not running in an event loop." )
713- self .on_dropped_event ("internal_sdk_error" )
714- for item in envelope .items :
715- self .record_lost_event ("internal_sdk_error" , item = item )
687+ if self .loop and self .loop .is_running ():
688+ self .loop .call_soon_threadsafe (self ._capture_envelope , envelope )
689+ else :
690+ # The event loop is no longer running
691+ logger .warning ("Async Transport is not running in an event loop." )
692+ self .on_dropped_event ("internal_sdk_error" )
693+ for item in envelope .items :
694+ self .record_lost_event ("internal_sdk_error" , item = item )
716695
717696 def flush ( # type: ignore[override]
718697 self : Self ,
@@ -1071,16 +1050,20 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]:
10711050 use_http2_transport = options .get ("_experiments" , {}).get ("transport_http2" , False )
10721051 use_async_transport = options .get ("_experiments" , {}).get ("transport_async" , False )
10731052 # By default, we use the http transport class
1074- if use_async_transport :
1053+ transport_cls : Type [Transport ] = (
1054+ Http2Transport if use_http2_transport else HttpTransport
1055+ )
1056+ if use_async_transport and ASYNC_TRANSPORT_ENABLED :
10751057 try :
10761058 asyncio .get_running_loop ()
1077- transport_cls : Type [ Transport ] = AsyncHttpTransport
1059+ transport_cls = AsyncHttpTransport
10781060 except RuntimeError :
10791061 # No event loop running, fall back to sync transport
10801062 logger .warning ("No event loop running, falling back to sync transport." )
1081- transport_cls = Http2Transport if use_http2_transport else HttpTransport
1082- else :
1083- transport_cls = Http2Transport if use_http2_transport else HttpTransport
1063+ elif use_async_transport :
1064+ logger .warning (
1065+ "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport."
1066+ )
10841067
10851068 if isinstance (ref_transport , Transport ):
10861069 return ref_transport
0 commit comments