1
1
import logging
2
- from threading import Thread
2
+ import signal
3
+ from threading import Event , Thread
3
4
from time import sleep
4
5
5
6
import schedule
6
7
from requests .exceptions import RequestException
7
8
from requests_cache import CachedSession
8
9
9
10
from sentry_dynamic_sampling_lib .shared import Config , Metric , MetricType
11
+ from sentry_dynamic_sampling_lib .utils import Singleton
12
+
13
+ try :
14
+ from celery .signals import worker_shutdown
15
+ except ModuleNotFoundError :
16
+ worker_shutdown = None
10
17
11
18
LOGGER = logging .getLogger ("SentryWrapper" )
12
19
13
20
21
+ def on_exit (* args , ** kwargs ):
22
+ ts = TraceSampler ()
23
+ ts .kill ()
24
+ raise KeyboardInterrupt
25
+
26
+
14
27
class ControllerClient (Thread ):
15
- def __init__ (self , config , metric , * args , ** kwargs ) -> None :
28
+ def __init__ (self , stop , config , metric , * args , ** kwargs ) -> None :
16
29
self .poll_interval = kwargs .pop ("poll_interval" )
17
30
self .metric_interval = kwargs .pop ("metric_interval" )
18
31
self .controller_endpoint = kwargs .pop ("controller_endpoint" )
19
32
self .metric_endpoint = kwargs .pop ("metric_endpoint" )
20
33
self .app_key = kwargs .pop ("app_key" )
34
+ self .stop : Event = stop
21
35
self .config : Config = config
22
36
self .metrics : Metric = metric
23
37
self .session = CachedSession (backend = "memory" , cache_control = True )
24
- super ().__init__ (* args , name = "SentryControllerClient" , daemon = True , ** kwargs )
38
+ super ().__init__ (* args , name = "SentryControllerClient" , ** kwargs )
25
39
26
40
def run (self ):
27
41
# HACK: Django change the timezone mid startup
@@ -30,7 +44,7 @@ def run(self):
30
44
sleep (5 )
31
45
schedule .every (self .poll_interval ).seconds .do (self .update_config )
32
46
schedule .every (self .metric_interval ).seconds .do (self .update_metrics )
33
- while True :
47
+ while not self . stop . is_set () :
34
48
schedule .run_pending ()
35
49
sleep (1 )
36
50
@@ -55,6 +69,8 @@ def update_config(self):
55
69
def update_metrics (self ):
56
70
for metric_type in MetricType :
57
71
counter = self .metrics .get_and_reset (metric_type )
72
+ if len (counter ) == 0 :
73
+ return
58
74
data = {
59
75
"app" : self .app_key ,
60
76
"type" : metric_type .value ,
@@ -70,13 +86,30 @@ def update_metrics(self):
70
86
return
71
87
72
88
73
- class TraceSampler :
89
+ class TraceSampler ( metaclass = Singleton ) :
74
90
def __init__ (self , * args , ** kwargs ) -> None :
91
+ self .stop = Event ()
75
92
self .config = Config ()
76
93
self .metrics = Metric ()
77
- self .controller = ControllerClient (* args , self .config , self .metrics , ** kwargs )
94
+ self .controller = ControllerClient (
95
+ * args , self .stop , self .config , self .metrics , ** kwargs
96
+ )
78
97
self .controller .start ()
79
98
99
+ signal .signal (signal .SIGINT , on_exit )
100
+
101
+ # HACK: Celery has a built in signal mechanism
102
+ # so we use it
103
+ if worker_shutdown :
104
+ worker_shutdown .connect (on_exit )
105
+
106
+ def __del__ (self ):
107
+ on_exit (self .stop , self .controller )
108
+
109
+ def kill (self ):
110
+ self .stop .set ()
111
+ self .controller .join ()
112
+
80
113
def __call__ (self , sampling_context ):
81
114
if sampling_context :
82
115
if "wsgi_environ" in sampling_context :
0 commit comments