@@ -162,7 +162,7 @@ def _parse_rate_limits(
162162 continue
163163
164164
165- class BaseHttpTransport (Transport ):
165+ class HttpTransportCore (Transport ):
166166 """The base HTTP transport."""
167167
168168 TIMEOUT = 30 # seconds
@@ -286,12 +286,8 @@ def _update_rate_limits(
286286 seconds = retry_after
287287 )
288288
289- def _send_request (
290- self : Self ,
291- body : bytes ,
292- headers : Dict [str , str ],
293- endpoint_type : EndpointType = EndpointType .ENVELOPE ,
294- envelope : Optional [Envelope ] = None ,
289+ def _handle_request_error (
290+ self : Self , envelope : Optional [Envelope ], loss_reason : str = "network"
295291 ) -> None :
296292 def record_loss (reason : str ) -> None :
297293 if envelope is None :
@@ -300,45 +296,45 @@ def record_loss(reason: str) -> None:
300296 for item in envelope .items :
301297 self .record_lost_event (reason , item = item )
302298
299+ self .on_dropped_event (loss_reason )
300+ record_loss ("network_error" )
301+
302+ def _handle_response (
303+ self : Self ,
304+ response : Union [urllib3 .BaseHTTPResponse , httpcore .Response ],
305+ envelope : Optional [Envelope ],
306+ ) -> None :
307+ self ._update_rate_limits (response )
308+
309+ if response .status == 429 :
310+ # if we hit a 429. Something was rate limited but we already
311+ # acted on this in `self._update_rate_limits`. Note that we
312+ # do not want to record event loss here as we will have recorded
313+ # an outcome in relay already.
314+ self .on_dropped_event ("status_429" )
315+ pass
316+
317+ elif response .status >= 300 or response .status < 200 :
318+ logger .error (
319+ "Unexpected status code: %s (body: %s)" ,
320+ response .status ,
321+ getattr (response , "data" , getattr (response , "content" , None )),
322+ )
323+ self ._handle_request_error (
324+ envelope = envelope , loss_reason = "status_{}" .format (response .status )
325+ )
326+
327+ def _update_headers (
328+ self : Self ,
329+ headers : Dict [str , str ],
330+ ) -> None :
331+
303332 headers .update (
304333 {
305334 "User-Agent" : str (self ._auth .client ),
306335 "X-Sentry-Auth" : str (self ._auth .to_header ()),
307336 }
308337 )
309- try :
310- response = self ._request (
311- "POST" ,
312- endpoint_type ,
313- body ,
314- headers ,
315- )
316- except Exception :
317- self .on_dropped_event ("network" )
318- record_loss ("network_error" )
319- raise
320-
321- try :
322- self ._update_rate_limits (response )
323-
324- if response .status == 429 :
325- # if we hit a 429. Something was rate limited but we already
326- # acted on this in `self._update_rate_limits`. Note that we
327- # do not want to record event loss here as we will have recorded
328- # an outcome in relay already.
329- self .on_dropped_event ("status_429" )
330- pass
331-
332- elif response .status >= 300 or response .status < 200 :
333- logger .error (
334- "Unexpected status code: %s (body: %s)" ,
335- response .status ,
336- getattr (response , "data" , getattr (response , "content" , None )),
337- )
338- self .on_dropped_event ("status_{}" .format (response .status ))
339- record_loss ("network_error" )
340- finally :
341- response .close ()
342338
343339 def on_dropped_event (self : Self , _reason : str ) -> None :
344340 return None
@@ -375,11 +371,6 @@ def _fetch_pending_client_report(
375371 type = "client_report" ,
376372 )
377373
378- def _flush_client_reports (self : Self , force : bool = False ) -> None :
379- client_report = self ._fetch_pending_client_report (force = force , interval = 60 )
380- if client_report is not None :
381- self .capture_envelope (Envelope (items = [client_report ]))
382-
383374 def _check_disabled (self : Self , category : EventDataCategory ) -> bool :
384375 def _disabled (bucket : Optional [EventDataCategory ]) -> bool :
385376 ts = self ._disabled_until .get (bucket )
@@ -398,9 +389,9 @@ def _is_worker_full(self: Self) -> bool:
398389 def is_healthy (self : Self ) -> bool :
399390 return not (self ._is_worker_full () or self ._is_rate_limited ())
400391
401- def _send_envelope ( self : Self , envelope : Envelope ) -> None :
402-
403- # remove all items from the envelope which are over quota
392+ def _prepare_envelope (
393+ self : Self , envelope : Envelope
394+ ) -> Optional [ Tuple [ Envelope , io . BytesIO , Dict [ str , str ]]]:
404395 new_items = []
405396 for item in envelope .items :
406397 if self ._check_disabled (item .data_category ):
@@ -442,13 +433,7 @@ def _send_envelope(self: Self, envelope: Envelope) -> None:
442433 if content_encoding :
443434 headers ["Content-Encoding" ] = content_encoding
444435
445- self ._send_request (
446- body .getvalue (),
447- headers = headers ,
448- endpoint_type = EndpointType .ENVELOPE ,
449- envelope = envelope ,
450- )
451- return None
436+ return envelope , body , headers
452437
453438 def _serialize_envelope (
454439 self : Self , envelope : Envelope
@@ -506,6 +491,54 @@ def _request(
506491 ) -> Union [urllib3 .BaseHTTPResponse , httpcore .Response ]:
507492 raise NotImplementedError ()
508493
494+ def kill (self : Self ) -> None :
495+ logger .debug ("Killing HTTP transport" )
496+ self ._worker .kill ()
497+
498+
499+ class BaseSyncHttpTransport (HttpTransportCore ):
500+
501+ def _send_envelope (self : Self , envelope : Envelope ) -> None :
502+ _prepared_envelope = self ._prepare_envelope (envelope )
503+ if _prepared_envelope is None : # TODO: check this behaviour in detail
504+ return None
505+ envelope , body , headers = _prepared_envelope
506+ self ._send_request (
507+ body .getvalue (),
508+ headers = headers ,
509+ endpoint_type = EndpointType .ENVELOPE ,
510+ envelope = envelope ,
511+ )
512+ return None
513+
514+ def _send_request (
515+ self : Self ,
516+ body : bytes ,
517+ headers : Dict [str , str ],
518+ endpoint_type : EndpointType ,
519+ envelope : Optional [Envelope ],
520+ ) -> None :
521+ self ._update_headers (headers )
522+ try :
523+ response = self ._request (
524+ "POST" ,
525+ endpoint_type ,
526+ body ,
527+ headers ,
528+ )
529+ except Exception :
530+ self ._handle_request_error (envelope = envelope , loss_reason = "network" )
531+ raise
532+ try :
533+ self ._handle_response (response = response , envelope = envelope )
534+ finally :
535+ response .close ()
536+
537+ def _flush_client_reports (self : Self , force : bool = False ) -> None :
538+ client_report = self ._fetch_pending_client_report (force = force , interval = 60 )
539+ if client_report is not None :
540+ self .capture_envelope (Envelope (items = [client_report ]))
541+
509542 def capture_envelope (self : Self , envelope : Envelope ) -> None :
510543 def send_envelope_wrapper () -> None :
511544 with capture_internal_exceptions ():
@@ -528,12 +561,8 @@ def flush(
528561 self ._worker .submit (lambda : self ._flush_client_reports (force = True ))
529562 self ._worker .flush (timeout , callback )
530563
531- def kill (self : Self ) -> None :
532- logger .debug ("Killing HTTP transport" )
533- self ._worker .kill ()
534-
535564
536- class HttpTransport (BaseHttpTransport ):
565+ class HttpTransport (BaseSyncHttpTransport ):
537566 if TYPE_CHECKING :
538567 _pool : Union [PoolManager , ProxyManager ]
539568
@@ -650,7 +679,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
650679
651680else :
652681
653- class Http2Transport (BaseHttpTransport ): # type: ignore
682+ class Http2Transport (BaseSyncHttpTransport ): # type: ignore
654683 """The HTTP2 transport based on httpcore."""
655684
656685 TIMEOUT = 15
0 commit comments