1
1
import logging
2
+ import os
2
3
import signal
3
4
from threading import Event , Thread
4
5
from time import sleep
6
+ from typing import Optional
5
7
6
8
import schedule
7
9
from requests .exceptions import RequestException
@@ -25,16 +27,28 @@ def on_exit(*args, **kwargs):
25
27
26
28
27
29
class ControllerClient (Thread ):
28
- def __init__ (self , stop , config , metric , * args , ** kwargs ) -> None :
29
- self .poll_interval = kwargs .pop ("poll_interval" )
30
- self .metric_interval = kwargs .pop ("metric_interval" )
31
- self .controller_endpoint = kwargs .pop ("controller_endpoint" )
32
- self .metric_endpoint = kwargs .pop ("metric_endpoint" )
33
- self .app_key = kwargs .pop ("app_key" )
34
- self .stop : Event = stop
35
- self .config : Config = config
36
- self .metrics : Metric = metric
30
+ def __init__ (
31
+ self ,
32
+ * args ,
33
+ poll_interval = None ,
34
+ metric_interval = None ,
35
+ controller_endpoint = None ,
36
+ metric_endpoint = None ,
37
+ app_key = None ,
38
+ ** kwargs
39
+ ) -> None :
40
+ self .poll_interval = poll_interval
41
+ self .metric_interval = metric_interval
42
+ self .controller_endpoint = controller_endpoint
43
+ self .metric_endpoint = metric_endpoint
44
+ self .app_key = app_key
45
+
46
+ self ._stop = Event ()
47
+ self .config = Config ()
48
+ self .metrics = Metric ()
37
49
self .session = CachedSession (backend = "memory" , cache_control = True )
50
+
51
+ LOGGER .debug ("ControllerClient Initialized" )
38
52
super ().__init__ (* args , name = "SentryControllerClient" , ** kwargs )
39
53
40
54
def run (self ):
@@ -44,10 +58,16 @@ def run(self):
44
58
sleep (5 )
45
59
schedule .every (self .poll_interval ).seconds .do (self .update_config )
46
60
schedule .every (self .metric_interval ).seconds .do (self .update_metrics )
47
- while not self .stop .is_set ():
61
+ LOGGER .debug ("ControllerClient Started" )
62
+ while not self ._stop .is_set ():
48
63
schedule .run_pending ()
49
64
sleep (1 )
50
65
66
+ def kill (self ):
67
+ self ._stop .set ()
68
+ if self .is_alive ():
69
+ self .join ()
70
+
51
71
def update_config (self ):
52
72
try :
53
73
resp = self .session .get (
@@ -59,8 +79,10 @@ def update_config(self):
59
79
return
60
80
61
81
if resp .from_cache :
82
+ LOGGER .debug ("Config Polled from cache" )
62
83
return
63
84
85
+ LOGGER .debug ("Config Polled" )
64
86
data = resp .json ()
65
87
self .config .update (data )
66
88
self .metrics .set_mode (MetricType .CELERY , data ["celery_collect_metrics" ])
@@ -71,6 +93,7 @@ def update_metrics(self):
71
93
# check if metric is enable
72
94
mode = self .metrics .get_mode (metric_type )
73
95
if not mode :
96
+ LOGGER .debug ("Metric %s disabled" , metric_type .value )
74
97
continue
75
98
76
99
counter = self .metrics .get_and_reset (metric_type )
@@ -86,36 +109,59 @@ def update_metrics(self):
86
109
self .metric_endpoint .format (self .app_key , metric_type .value ),
87
110
json = data ,
88
111
)
112
+ LOGGER .debug ("Metric %s pushed" , metric_type .value )
89
113
except RequestException as err :
90
114
LOGGER .warning ("Metric Request Failed: %s" , err )
91
115
return
92
116
93
117
94
118
class TraceSampler (metaclass = Singleton ):
95
119
def __init__ (self , * args , ** kwargs ) -> None :
96
- self .stop = Event ()
97
- self .config = Config ()
98
- self .metrics = Metric ()
99
- self .controller = ControllerClient (
100
- * args , self .stop , self .config , self .metrics , ** kwargs
101
- )
102
- self .controller .start ()
120
+ self .params = (args , kwargs )
121
+ self ._controller : Optional [ControllerClient ] = None
122
+ self ._tread_for_pid : Optional [int ] = None
103
123
104
124
signal .signal (signal .SIGINT , on_exit )
105
-
106
125
# HACK: Celery has a built in signal mechanism
107
126
# so we use it
108
127
if worker_shutdown :
109
128
worker_shutdown .connect (on_exit )
110
129
111
- def __del__ (self ):
112
- on_exit (self .stop , self .controller )
130
+ @property
131
+ def has_running_controller (self ):
132
+ if self ._tread_for_pid != os .getpid ():
133
+ return False
134
+ if not self ._controller :
135
+ return None
136
+ return self ._controller .is_alive ()
137
+
138
+ @property
139
+ def config (self ) -> Config :
140
+ return self ._controller .config
141
+
142
+ @property
143
+ def metrics (self ) -> Metric :
144
+ return self ._controller .metrics
113
145
114
146
def kill (self ):
115
- self .stop .set ()
116
- self .controller .join ()
147
+ if self ._controller :
148
+ self ._controller .kill ()
149
+
150
+ def _start_controller (self ):
151
+ args , kwargs = self .params
152
+ self ._controller = ControllerClient (* args , ** kwargs )
153
+ self ._controller .start ()
154
+ self ._tread_for_pid = os .getpid ()
155
+
156
+ def _ensure_controller (self ):
157
+ if not self .has_running_controller :
158
+ self ._start_controller ()
159
+
160
+ def __del__ (self ):
161
+ self .kill ()
117
162
118
163
def __call__ (self , sampling_context ):
164
+ self ._ensure_controller ()
119
165
if sampling_context :
120
166
if "wsgi_environ" in sampling_context :
121
167
path = sampling_context ["wsgi_environ" ].get ("PATH_INFO" , "" )
0 commit comments