1
- import os
2
1
import io
2
+ import os
3
+ import random
3
4
import re
4
5
import sys
5
6
import threading
6
- import random
7
7
import time
8
8
import zlib
9
+ from contextlib import contextmanager
9
10
from datetime import datetime
10
11
from functools import wraps , partial
11
- from threading import Event , Lock , Thread
12
- from contextlib import contextmanager
13
12
14
13
import sentry_sdk
15
- from sentry_sdk ._compat import text_type , utc_from_timestamp , iteritems
14
+ from sentry_sdk ._compat import PY2 , text_type , utc_from_timestamp , iteritems
16
15
from sentry_sdk .utils import (
16
+ ContextVar ,
17
17
now ,
18
18
nanosecond_time ,
19
19
to_timestamp ,
20
20
serialize_frame ,
21
21
json_dumps ,
22
+ is_gevent ,
22
23
)
23
24
from sentry_sdk .envelope import Envelope , Item
24
25
from sentry_sdk .tracing import (
53
54
from sentry_sdk ._types import MetricValue
54
55
55
56
56
- _thread_local = threading .local ()
57
+ try :
58
+ from gevent .monkey import get_original # type: ignore
59
+ from gevent .threadpool import ThreadPool # type: ignore
60
+ except ImportError :
61
+ import importlib
62
+
63
+ def get_original (module , name ):
64
+ # type: (str, str) -> Any
65
+ return getattr (importlib .import_module (module ), name )
66
+
67
+
68
+ _in_metrics = ContextVar ("in_metrics" )
57
69
_sanitize_key = partial (re .compile (r"[^a-zA-Z0-9_/.-]+" ).sub , "_" )
58
70
_sanitize_value = partial (re .compile (r"[^\w\d_:/@\.{}\[\]$-]+" , re .UNICODE ).sub , "_" )
59
71
_set = set # set is shadowed below
@@ -84,15 +96,12 @@ def get_code_location(stacklevel):
84
96
def recursion_protection ():
85
97
# type: () -> Generator[bool, None, None]
86
98
"""Enters recursion protection and returns the old flag."""
99
+ old_in_metrics = _in_metrics .get (False )
100
+ _in_metrics .set (True )
87
101
try :
88
- in_metrics = _thread_local .in_metrics
89
- except AttributeError :
90
- in_metrics = False
91
- _thread_local .in_metrics = True
92
- try :
93
- yield in_metrics
102
+ yield old_in_metrics
94
103
finally :
95
- _thread_local . in_metrics = in_metrics
104
+ _in_metrics . set ( old_in_metrics )
96
105
97
106
98
107
def metrics_noop (func ):
@@ -411,20 +420,30 @@ def __init__(
411
420
self ._pending_locations = {} # type: Dict[int, List[Tuple[MetricMetaKey, Any]]]
412
421
self ._buckets_total_weight = 0
413
422
self ._capture_func = capture_func
414
- self ._lock = Lock ()
415
423
self ._running = True
416
- self ._flush_event = Event ()
424
+ self ._lock = threading .Lock ()
425
+
426
+ if is_gevent () and PY2 :
427
+ # get_original on threading.Event in Python 2 incorrectly returns
428
+ # the gevent-patched class. Luckily, threading.Event is just an alias
429
+ # for threading._Event in Python 2, and get_original on
430
+ # threading._Event correctly gets us the stdlib original.
431
+ event_cls = get_original ("threading" , "_Event" )
432
+ else :
433
+ event_cls = get_original ("threading" , "Event" )
434
+ self ._flush_event = event_cls () # type: threading.Event
435
+
417
436
self ._force_flush = False
418
437
419
- # The aggregator shifts it's flushing by up to an entire rollup window to
438
+ # The aggregator shifts its flushing by up to an entire rollup window to
420
439
# avoid multiple clients trampling on end of a 10 second window as all the
421
440
# buckets are anchored to multiples of ROLLUP seconds. We randomize this
422
441
# number once per aggregator boot to achieve some level of offsetting
423
442
# across a fleet of deployed SDKs. Relay itself will also apply independent
424
443
# jittering.
425
444
self ._flush_shift = random .random () * self .ROLLUP_IN_SECONDS
426
445
427
- self ._flusher = None # type: Optional[Thread]
446
+ self ._flusher = None # type: Optional[Union[threading. Thread, ThreadPool] ]
428
447
self ._flusher_pid = None # type: Optional[int]
429
448
self ._ensure_thread ()
430
449
@@ -435,25 +454,35 @@ def _ensure_thread(self):
435
454
"""
436
455
if not self ._running :
437
456
return False
457
+
438
458
pid = os .getpid ()
439
459
if self ._flusher_pid == pid :
440
460
return True
461
+
441
462
with self ._lock :
442
463
self ._flusher_pid = pid
443
- self ._flusher = Thread (target = self ._flush_loop )
444
- self ._flusher .daemon = True
464
+
465
+ if not is_gevent ():
466
+ self ._flusher = threading .Thread (target = self ._flush_loop )
467
+ self ._flusher .daemon = True
468
+ start_flusher = self ._flusher .start
469
+ else :
470
+ self ._flusher = ThreadPool (1 )
471
+ start_flusher = partial (self ._flusher .spawn , func = self ._flush_loop )
472
+
445
473
try :
446
- self . _flusher . start ()
474
+ start_flusher ()
447
475
except RuntimeError :
448
476
# Unfortunately at this point the interpreter is in a state that no
449
477
# longer allows us to spawn a thread and we have to bail.
450
478
self ._running = False
451
479
return False
480
+
452
481
return True
453
482
454
483
def _flush_loop (self ):
455
484
# type: (...) -> None
456
- _thread_local . in_metrics = True
485
+ _in_metrics . set ( True )
457
486
while self ._running or self ._force_flush :
458
487
self ._flush ()
459
488
if self ._running :
@@ -608,7 +637,6 @@ def kill(self):
608
637
609
638
self ._running = False
610
639
self ._flush_event .set ()
611
- self ._flusher .join ()
612
640
self ._flusher = None
613
641
614
642
@metrics_noop
0 commit comments