@@ -162,7 +162,7 @@ def _parse_rate_limits(
162
162
continue
163
163
164
164
165
- class BaseHttpTransport (Transport ):
165
+ class HttpTransportCore (Transport ):
166
166
"""The base HTTP transport."""
167
167
168
168
TIMEOUT = 30 # seconds
@@ -286,12 +286,8 @@ def _update_rate_limits(
286
286
seconds = retry_after
287
287
)
288
288
289
- def _send_request (
290
- self : Self ,
291
- body : bytes ,
292
- headers : Dict [str , str ],
293
- endpoint_type : EndpointType = EndpointType .ENVELOPE ,
294
- envelope : Optional [Envelope ] = None ,
289
+ def _handle_request_error (
290
+ self : Self , envelope : Optional [Envelope ], loss_reason : str = "network"
295
291
) -> None :
296
292
def record_loss (reason : str ) -> None :
297
293
if envelope is None :
@@ -300,45 +296,45 @@ def record_loss(reason: str) -> None:
300
296
for item in envelope .items :
301
297
self .record_lost_event (reason , item = item )
302
298
299
+ self .on_dropped_event (loss_reason )
300
+ record_loss ("network_error" )
301
+
302
+ def _handle_response (
303
+ self : Self ,
304
+ response : Union [urllib3 .BaseHTTPResponse , httpcore .Response ],
305
+ envelope : Optional [Envelope ],
306
+ ) -> None :
307
+ self ._update_rate_limits (response )
308
+
309
+ if response .status == 429 :
310
+ # if we hit a 429. Something was rate limited but we already
311
+ # acted on this in `self._update_rate_limits`. Note that we
312
+ # do not want to record event loss here as we will have recorded
313
+ # an outcome in relay already.
314
+ self .on_dropped_event ("status_429" )
315
+ pass
316
+
317
+ elif response .status >= 300 or response .status < 200 :
318
+ logger .error (
319
+ "Unexpected status code: %s (body: %s)" ,
320
+ response .status ,
321
+ getattr (response , "data" , getattr (response , "content" , None )),
322
+ )
323
+ self ._handle_request_error (
324
+ envelope = envelope , loss_reason = "status_{}" .format (response .status )
325
+ )
326
+
327
+ def _update_headers (
328
+ self : Self ,
329
+ headers : Dict [str , str ],
330
+ ) -> None :
331
+
303
332
headers .update (
304
333
{
305
334
"User-Agent" : str (self ._auth .client ),
306
335
"X-Sentry-Auth" : str (self ._auth .to_header ()),
307
336
}
308
337
)
309
- try :
310
- response = self ._request (
311
- "POST" ,
312
- endpoint_type ,
313
- body ,
314
- headers ,
315
- )
316
- except Exception :
317
- self .on_dropped_event ("network" )
318
- record_loss ("network_error" )
319
- raise
320
-
321
- try :
322
- self ._update_rate_limits (response )
323
-
324
- if response .status == 429 :
325
- # if we hit a 429. Something was rate limited but we already
326
- # acted on this in `self._update_rate_limits`. Note that we
327
- # do not want to record event loss here as we will have recorded
328
- # an outcome in relay already.
329
- self .on_dropped_event ("status_429" )
330
- pass
331
-
332
- elif response .status >= 300 or response .status < 200 :
333
- logger .error (
334
- "Unexpected status code: %s (body: %s)" ,
335
- response .status ,
336
- getattr (response , "data" , getattr (response , "content" , None )),
337
- )
338
- self .on_dropped_event ("status_{}" .format (response .status ))
339
- record_loss ("network_error" )
340
- finally :
341
- response .close ()
342
338
343
339
def on_dropped_event (self : Self , _reason : str ) -> None :
344
340
return None
@@ -375,11 +371,6 @@ def _fetch_pending_client_report(
375
371
type = "client_report" ,
376
372
)
377
373
378
- def _flush_client_reports (self : Self , force : bool = False ) -> None :
379
- client_report = self ._fetch_pending_client_report (force = force , interval = 60 )
380
- if client_report is not None :
381
- self .capture_envelope (Envelope (items = [client_report ]))
382
-
383
374
def _check_disabled (self : Self , category : EventDataCategory ) -> bool :
384
375
def _disabled (bucket : Optional [EventDataCategory ]) -> bool :
385
376
ts = self ._disabled_until .get (bucket )
@@ -398,9 +389,9 @@ def _is_worker_full(self: Self) -> bool:
398
389
def is_healthy (self : Self ) -> bool :
399
390
return not (self ._is_worker_full () or self ._is_rate_limited ())
400
391
401
- def _send_envelope ( self : Self , envelope : Envelope ) -> None :
402
-
403
- # remove all items from the envelope which are over quota
392
+ def _prepare_envelope (
393
+ self : Self , envelope : Envelope
394
+ ) -> Optional [ Tuple [ Envelope , io . BytesIO , Dict [ str , str ]]]:
404
395
new_items = []
405
396
for item in envelope .items :
406
397
if self ._check_disabled (item .data_category ):
@@ -442,13 +433,7 @@ def _send_envelope(self: Self, envelope: Envelope) -> None:
442
433
if content_encoding :
443
434
headers ["Content-Encoding" ] = content_encoding
444
435
445
- self ._send_request (
446
- body .getvalue (),
447
- headers = headers ,
448
- endpoint_type = EndpointType .ENVELOPE ,
449
- envelope = envelope ,
450
- )
451
- return None
436
+ return envelope , body , headers
452
437
453
438
def _serialize_envelope (
454
439
self : Self , envelope : Envelope
@@ -506,6 +491,54 @@ def _request(
506
491
) -> Union [urllib3 .BaseHTTPResponse , httpcore .Response ]:
507
492
raise NotImplementedError ()
508
493
494
+ def kill (self : Self ) -> None :
495
+ logger .debug ("Killing HTTP transport" )
496
+ self ._worker .kill ()
497
+
498
+
499
+ class BaseSyncHttpTransport (HttpTransportCore ):
500
+
501
+ def _send_envelope (self : Self , envelope : Envelope ) -> None :
502
+ _prepared_envelope = self ._prepare_envelope (envelope )
503
+ if _prepared_envelope is None : # TODO: check this behaviour in detail
504
+ return None
505
+ envelope , body , headers = _prepared_envelope
506
+ self ._send_request (
507
+ body .getvalue (),
508
+ headers = headers ,
509
+ endpoint_type = EndpointType .ENVELOPE ,
510
+ envelope = envelope ,
511
+ )
512
+ return None
513
+
514
+ def _send_request (
515
+ self : Self ,
516
+ body : bytes ,
517
+ headers : Dict [str , str ],
518
+ endpoint_type : EndpointType ,
519
+ envelope : Optional [Envelope ],
520
+ ) -> None :
521
+ self ._update_headers (headers )
522
+ try :
523
+ response = self ._request (
524
+ "POST" ,
525
+ endpoint_type ,
526
+ body ,
527
+ headers ,
528
+ )
529
+ except Exception :
530
+ self ._handle_request_error (envelope = envelope , loss_reason = "network" )
531
+ raise
532
+ try :
533
+ self ._handle_response (response = response , envelope = envelope )
534
+ finally :
535
+ response .close ()
536
+
537
+ def _flush_client_reports (self : Self , force : bool = False ) -> None :
538
+ client_report = self ._fetch_pending_client_report (force = force , interval = 60 )
539
+ if client_report is not None :
540
+ self .capture_envelope (Envelope (items = [client_report ]))
541
+
509
542
def capture_envelope (self : Self , envelope : Envelope ) -> None :
510
543
def send_envelope_wrapper () -> None :
511
544
with capture_internal_exceptions ():
@@ -528,12 +561,8 @@ def flush(
528
561
self ._worker .submit (lambda : self ._flush_client_reports (force = True ))
529
562
self ._worker .flush (timeout , callback )
530
563
531
- def kill (self : Self ) -> None :
532
- logger .debug ("Killing HTTP transport" )
533
- self ._worker .kill ()
534
-
535
564
536
- class HttpTransport (BaseHttpTransport ):
565
+ class HttpTransport (BaseSyncHttpTransport ):
537
566
if TYPE_CHECKING :
538
567
_pool : Union [PoolManager , ProxyManager ]
539
568
@@ -650,7 +679,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
650
679
651
680
else :
652
681
653
- class Http2Transport (BaseHttpTransport ): # type: ignore
682
+ class Http2Transport (BaseSyncHttpTransport ): # type: ignore
654
683
"""The HTTP2 transport based on httpcore."""
655
684
656
685
TIMEOUT = 15
0 commit comments