@@ -168,7 +168,7 @@ def _parse_rate_limits(
168
168
continue
169
169
170
170
171
- class BaseHttpTransport (Transport ):
171
+ class HttpTransportCore (Transport ):
172
172
"""The base HTTP transport."""
173
173
174
174
TIMEOUT = 30 # seconds
@@ -292,12 +292,8 @@ def _update_rate_limits(
292
292
seconds = retry_after
293
293
)
294
294
295
- def _send_request (
296
- self : Self ,
297
- body : bytes ,
298
- headers : Dict [str , str ],
299
- endpoint_type : EndpointType = EndpointType .ENVELOPE ,
300
- envelope : Optional [Envelope ] = None ,
295
+ def _handle_request_error (
296
+ self : Self , envelope : Optional [Envelope ], loss_reason : str = "network"
301
297
) -> None :
302
298
def record_loss (reason : str ) -> None :
303
299
if envelope is None :
@@ -306,45 +302,45 @@ def record_loss(reason: str) -> None:
306
302
for item in envelope .items :
307
303
self .record_lost_event (reason , item = item )
308
304
305
+ self .on_dropped_event (loss_reason )
306
+ record_loss ("network_error" )
307
+
308
+ def _handle_response (
309
+ self : Self ,
310
+ response : Union [urllib3 .BaseHTTPResponse , httpcore .Response ],
311
+ envelope : Optional [Envelope ],
312
+ ) -> None :
313
+ self ._update_rate_limits (response )
314
+
315
+ if response .status == 429 :
316
+ # if we hit a 429. Something was rate limited but we already
317
+ # acted on this in `self._update_rate_limits`. Note that we
318
+ # do not want to record event loss here as we will have recorded
319
+ # an outcome in relay already.
320
+ self .on_dropped_event ("status_429" )
321
+ pass
322
+
323
+ elif response .status >= 300 or response .status < 200 :
324
+ logger .error (
325
+ "Unexpected status code: %s (body: %s)" ,
326
+ response .status ,
327
+ getattr (response , "data" , getattr (response , "content" , None )),
328
+ )
329
+ self ._handle_request_error (
330
+ envelope = envelope , loss_reason = "status_{}" .format (response .status )
331
+ )
332
+
333
+ def _update_headers (
334
+ self : Self ,
335
+ headers : Dict [str , str ],
336
+ ) -> None :
337
+
309
338
headers .update (
310
339
{
311
340
"User-Agent" : str (self ._auth .client ),
312
341
"X-Sentry-Auth" : str (self ._auth .to_header ()),
313
342
}
314
343
)
315
- try :
316
- response = self ._request (
317
- "POST" ,
318
- endpoint_type ,
319
- body ,
320
- headers ,
321
- )
322
- except Exception :
323
- self .on_dropped_event ("network" )
324
- record_loss ("network_error" )
325
- raise
326
-
327
- try :
328
- self ._update_rate_limits (response )
329
-
330
- if response .status == 429 :
331
- # if we hit a 429. Something was rate limited but we already
332
- # acted on this in `self._update_rate_limits`. Note that we
333
- # do not want to record event loss here as we will have recorded
334
- # an outcome in relay already.
335
- self .on_dropped_event ("status_429" )
336
- pass
337
-
338
- elif response .status >= 300 or response .status < 200 :
339
- logger .error (
340
- "Unexpected status code: %s (body: %s)" ,
341
- response .status ,
342
- getattr (response , "data" , getattr (response , "content" , None )),
343
- )
344
- self .on_dropped_event ("status_{}" .format (response .status ))
345
- record_loss ("network_error" )
346
- finally :
347
- response .close ()
348
344
349
345
def on_dropped_event (self : Self , _reason : str ) -> None :
350
346
return None
@@ -381,11 +377,6 @@ def _fetch_pending_client_report(
381
377
type = "client_report" ,
382
378
)
383
379
384
- def _flush_client_reports (self : Self , force : bool = False ) -> None :
385
- client_report = self ._fetch_pending_client_report (force = force , interval = 60 )
386
- if client_report is not None :
387
- self .capture_envelope (Envelope (items = [client_report ]))
388
-
389
380
def _check_disabled (self : Self , category : EventDataCategory ) -> bool :
390
381
def _disabled (bucket : Optional [EventDataCategory ]) -> bool :
391
382
ts = self ._disabled_until .get (bucket )
@@ -404,9 +395,9 @@ def _is_worker_full(self: Self) -> bool:
404
395
def is_healthy (self : Self ) -> bool :
405
396
return not (self ._is_worker_full () or self ._is_rate_limited ())
406
397
407
- def _send_envelope ( self : Self , envelope : Envelope ) -> None :
408
-
409
- # remove all items from the envelope which are over quota
398
+ def _prepare_envelope (
399
+ self : Self , envelope : Envelope
400
+ ) -> Optional [ Tuple [ Envelope , io . BytesIO , Dict [ str , str ]]]:
410
401
new_items = []
411
402
for item in envelope .items :
412
403
if self ._check_disabled (item .data_category ):
@@ -448,13 +439,7 @@ def _send_envelope(self: Self, envelope: Envelope) -> None:
448
439
if content_encoding :
449
440
headers ["Content-Encoding" ] = content_encoding
450
441
451
- self ._send_request (
452
- body .getvalue (),
453
- headers = headers ,
454
- endpoint_type = EndpointType .ENVELOPE ,
455
- envelope = envelope ,
456
- )
457
- return None
442
+ return envelope , body , headers
458
443
459
444
def _serialize_envelope (
460
445
self : Self , envelope : Envelope
@@ -512,6 +497,54 @@ def _request(
512
497
) -> Union [urllib3 .BaseHTTPResponse , httpcore .Response ]:
513
498
raise NotImplementedError ()
514
499
500
+ def kill (self : Self ) -> None :
501
+ logger .debug ("Killing HTTP transport" )
502
+ self ._worker .kill ()
503
+
504
+
505
+ class BaseSyncHttpTransport (HttpTransportCore ):
506
+
507
+ def _send_envelope (self : Self , envelope : Envelope ) -> None :
508
+ _prepared_envelope = self ._prepare_envelope (envelope )
509
+ if _prepared_envelope is None : # TODO: check this behaviour in detail
510
+ return None
511
+ envelope , body , headers = _prepared_envelope
512
+ self ._send_request (
513
+ body .getvalue (),
514
+ headers = headers ,
515
+ endpoint_type = EndpointType .ENVELOPE ,
516
+ envelope = envelope ,
517
+ )
518
+ return None
519
+
520
+ def _send_request (
521
+ self : Self ,
522
+ body : bytes ,
523
+ headers : Dict [str , str ],
524
+ endpoint_type : EndpointType ,
525
+ envelope : Optional [Envelope ],
526
+ ) -> None :
527
+ self ._update_headers (headers )
528
+ try :
529
+ response = self ._request (
530
+ "POST" ,
531
+ endpoint_type ,
532
+ body ,
533
+ headers ,
534
+ )
535
+ except Exception :
536
+ self ._handle_request_error (envelope = envelope , loss_reason = "network" )
537
+ raise
538
+ try :
539
+ self ._handle_response (response = response , envelope = envelope )
540
+ finally :
541
+ response .close ()
542
+
543
+ def _flush_client_reports (self : Self , force : bool = False ) -> None :
544
+ client_report = self ._fetch_pending_client_report (force = force , interval = 60 )
545
+ if client_report is not None :
546
+ self .capture_envelope (Envelope (items = [client_report ]))
547
+
515
548
def capture_envelope (self : Self , envelope : Envelope ) -> None :
516
549
def send_envelope_wrapper () -> None :
517
550
with capture_internal_exceptions ():
@@ -534,12 +567,8 @@ def flush(
534
567
self ._worker .submit (lambda : self ._flush_client_reports (force = True ))
535
568
self ._worker .flush (timeout , callback )
536
569
537
- def kill (self : Self ) -> None :
538
- logger .debug ("Killing HTTP transport" )
539
- self ._worker .kill ()
540
-
541
570
542
- class HttpTransport (BaseHttpTransport ):
571
+ class HttpTransport (BaseSyncHttpTransport ):
543
572
if TYPE_CHECKING :
544
573
_pool : Union [PoolManager , ProxyManager ]
545
574
@@ -656,7 +685,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
656
685
657
686
else :
658
687
659
- class Http2Transport (BaseHttpTransport ): # type: ignore
688
+ class Http2Transport (BaseSyncHttpTransport ): # type: ignore
660
689
"""The HTTP2 transport based on httpcore."""
661
690
662
691
TIMEOUT = 15
0 commit comments