Skip to content

Commit a6ce48d

Browse files
authored
initialize event processor thread on demand (#636)
* initialize event processor thread on demand Initialization is protected by a lock, as well as a PID check. * refactor starting of background threads to be centrally owned by the agent instance * add additional PID check when starting the transport thread
1 parent f400afd commit a6ce48d

File tree

11 files changed

+195
-206
lines changed

11 files changed

+195
-206
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ This will be the last minor release to support the following versions:
4343
[float]
4444
===== New Features
4545
46+
* Refactored spawning of background threads {pull}636[#636]
4647
* Added support for aiohttp client and server {pull}659[#659]
4748
* Added support for tornado web framework {pull}661[#661]
4849
* Added support for starlette/fastapi {pull}694[#694]

elasticapm/base.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,20 @@
3737
import os
3838
import platform
3939
import sys
40+
import threading
4041
import time
4142
import warnings
4243
from copy import deepcopy
4344

4445
import elasticapm
45-
from elasticapm.conf import Config, VersionedConfig, constants, update_config
46+
from elasticapm.conf import Config, VersionedConfig, constants
4647
from elasticapm.conf.constants import ERROR
4748
from elasticapm.metrics.base_metrics import MetricsRegistry
4849
from elasticapm.traces import Tracer, execution_context
4950
from elasticapm.utils import cgroup, compat, is_master_process, stacks, varmap
5051
from elasticapm.utils.encoding import enforce_label_format, keyword_field, shorten, transform
5152
from elasticapm.utils.logging import get_logger
5253
from elasticapm.utils.module_import import import_string
53-
from elasticapm.utils.threading import IntervalTimer
5454

5555
__all__ = ("Client",)
5656

@@ -92,6 +92,10 @@ def __init__(self, config=None, **inline):
9292
self.logger = get_logger("%s.%s" % (cls.__module__, cls.__name__))
9393
self.error_logger = get_logger("elasticapm.errors")
9494

95+
self._pid = None
96+
self._thread_starter_lock = threading.Lock()
97+
self._thread_managers = {}
98+
9599
self.tracer = None
96100
self.processors = []
97101
self.filter_exception_types_dict = {}
@@ -141,6 +145,8 @@ def __init__(self, config=None, **inline):
141145
)
142146
transport_class = import_string(self.config.transport_class)
143147
self._transport = transport_class(self._api_endpoint_url, self, **transport_kwargs)
148+
self.config.transport = self._transport
149+
self._thread_managers["transport"] = self._transport
144150

145151
for exc_to_filter in self.config.filter_exception_types or []:
146152
exc_to_filter_type = exc_to_filter.split(".")[-1]
@@ -188,15 +194,25 @@ def __init__(self, config=None, **inline):
188194
self._metrics.register(path)
189195
if self.config.breakdown_metrics:
190196
self._metrics.register("elasticapm.metrics.sets.breakdown.BreakdownMetricSet")
197+
self._thread_managers["metrics"] = self._metrics
191198
compat.atexit_register(self.close)
192199
if self.config.central_config:
193-
self._config_updater = IntervalTimer(
194-
update_config, 1, "eapm conf updater", daemon=True, args=(self,), evaluate_function_interval=True
195-
)
196-
self._config_updater.start()
200+
self._thread_managers["config"] = self.config
197201
else:
198202
self._config_updater = None
199203

204+
self.start_threads()
205+
206+
def start_threads(self):
207+
with self._thread_starter_lock:
208+
current_pid = os.getpid()
209+
if self._pid != current_pid:
210+
self.logger.debug("Detected PID change from %d to %d, starting threads", self._pid, current_pid)
211+
for manager_type, manager in self._thread_managers.items():
212+
self.logger.debug("Starting %s thread", manager_type)
213+
manager.start_thread()
214+
self._pid = current_pid
215+
200216
def get_handler(self, name):
201217
return import_string(name)
202218

@@ -243,6 +259,7 @@ def capture_exception(self, exc_info=None, handled=True, **kwargs):
243259
def queue(self, event_type, data, flush=False):
244260
if self.config.disable_send:
245261
return
262+
self.start_threads()
246263
if flush and is_master_process():
247264
# don't flush in uWSGI master process to avoid ending up in an unpredictable threading state
248265
flush = False
@@ -272,11 +289,9 @@ def end_transaction(self, name=None, result="", duration=None):
272289
return transaction
273290

274291
def close(self):
275-
if self._metrics:
276-
self._metrics._stop_collect_timer()
277-
if self._config_updater:
278-
self._config_updater.cancel()
279-
self._transport.close()
292+
with self._thread_starter_lock:
293+
for _manager_type, manager in self._thread_managers.items():
294+
manager.stop_thread()
280295

281296
def get_service_info(self):
282297
if self._service_info:

elasticapm/conf/__init__.py

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,11 @@
3737

3838
from elasticapm.utils import compat, starmatch_to_regex
3939
from elasticapm.utils.logging import get_logger
40+
from elasticapm.utils.threading import IntervalTimer, ThreadManager
4041

4142
__all__ = ("setup_logging", "Config")
4243

44+
4345
logger = get_logger("elasticapm.conf")
4446

4547

@@ -338,22 +340,24 @@ class Config(_ConfigBase):
338340
use_elastic_traceparent_header = _BoolConfigValue("USE_ELASTIC_TRACEPARENT_HEADER", default=True)
339341

340342

341-
class VersionedConfig(object):
343+
class VersionedConfig(ThreadManager):
342344
"""
343345
A thin layer around Config that provides versioning
344346
"""
345347

346-
__slots__ = ("_config", "_version", "_first_config", "_first_version", "_lock")
348+
__slots__ = ("_config", "_version", "_first_config", "_first_version", "_lock", "transport", "_update_thread")
347349

348-
def __init__(self, config_object, version):
350+
def __init__(self, config_object, version, transport=None):
349351
"""
350352
Create a new VersionedConfig with an initial Config object
351353
:param config_object: the initial Config object
352354
:param version: a version identifier for the configuration
353355
"""
354356
self._config = self._first_config = config_object
355357
self._version = self._first_version = version
358+
self.transport = transport
356359
self._lock = threading.Lock()
360+
self._update_thread = None
357361

358362
def update(self, version, **config):
359363
"""
@@ -399,32 +403,44 @@ def __setattr__(self, name, value):
399403
def config_version(self):
400404
return self._version
401405

402-
403-
def update_config(agent):
404-
logger.debug("Checking for new config...")
405-
transport = agent._transport
406-
keys = {"service": {"name": agent.config.service_name}}
407-
if agent.config.environment:
408-
keys["service"]["environment"] = agent.config.environment
409-
new_version, new_config, next_run = transport.get_config(agent.config.config_version, keys)
410-
if new_version and new_config:
411-
errors = agent.config.update(new_version, **new_config)
412-
if errors:
413-
logger.error("Error applying new configuration: %s", repr(errors))
414-
else:
415-
logger.info(
416-
"Applied new configuration: %s",
417-
"; ".join(
418-
"%s=%s" % (compat.text_type(k), compat.text_type(v)) for k, v in compat.iteritems(new_config)
419-
),
420-
)
421-
elif new_version == agent.config.config_version:
422-
logger.debug("Remote config unchanged")
423-
elif not new_config and agent.config.changed:
424-
logger.debug("Remote config disappeared, resetting to original")
425-
agent.config.reset()
426-
427-
return next_run
406+
def update_config(self):
407+
if not self.transport:
408+
logger.warning("No transport set for config updates, skipping")
409+
return
410+
logger.debug("Checking for new config...")
411+
keys = {"service": {"name": self.service_name}}
412+
if self.environment:
413+
keys["service"]["environment"] = self.environment
414+
new_version, new_config, next_run = self.transport.get_config(self.config_version, keys)
415+
if new_version and new_config:
416+
errors = self.update(new_version, **new_config)
417+
if errors:
418+
logger.error("Error applying new configuration: %s", repr(errors))
419+
else:
420+
logger.info(
421+
"Applied new configuration: %s",
422+
"; ".join(
423+
"%s=%s" % (compat.text_type(k), compat.text_type(v)) for k, v in compat.iteritems(new_config)
424+
),
425+
)
426+
elif new_version == self.config_version:
427+
logger.debug("Remote config unchanged")
428+
elif not new_config and self.changed:
429+
logger.debug("Remote config disappeared, resetting to original")
430+
self.reset()
431+
432+
return next_run
433+
434+
def start_thread(self):
435+
self._update_thread = IntervalTimer(
436+
self.update_config, 1, "eapm conf updater", daemon=True, evaluate_function_interval=True
437+
)
438+
self._update_thread.start()
439+
440+
def stop_thread(self):
441+
if self._update_thread:
442+
self._update_thread.cancel()
443+
self._update_thread = None
428444

429445

430446
def setup_logging(handler, exclude=("gunicorn", "south", "elasticapm.errors")):

elasticapm/metrics/base_metrics.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@
3333
from collections import defaultdict
3434

3535
from elasticapm.conf import constants
36-
from elasticapm.utils import compat, is_master_process
36+
from elasticapm.utils import compat
3737
from elasticapm.utils.logging import get_logger
3838
from elasticapm.utils.module_import import import_string
39-
from elasticapm.utils.threading import IntervalTimer
39+
from elasticapm.utils.threading import IntervalTimer, ThreadManager
4040

4141
logger = get_logger("elasticapm.metrics")
4242

4343
DISTINCT_LABEL_LIMIT = 1000
4444

4545

46-
class MetricsRegistry(object):
46+
class MetricsRegistry(ThreadManager):
4747
def __init__(self, collect_interval, queue_func, tags=None, ignore_patterns=None):
4848
"""
4949
Creates a new metric registry
@@ -58,13 +58,6 @@ def __init__(self, collect_interval, queue_func, tags=None, ignore_patterns=None
5858
self._tags = tags or {}
5959
self._collect_timer = None
6060
self._ignore_patterns = ignore_patterns or ()
61-
if self._collect_interval:
62-
# we only start the thread if we are not in a uwsgi master process
63-
if not is_master_process():
64-
self._start_collect_timer()
65-
else:
66-
# If we _are_ in a uwsgi master process, we use the postfork hook to start the thread after the fork
67-
compat.postfork(lambda: self._start_collect_timer())
6861

6962
def register(self, class_path):
7063
"""
@@ -97,16 +90,19 @@ def collect(self):
9790
for data in metricset.collect():
9891
self._queue_func(constants.METRICSET, data)
9992

100-
def _start_collect_timer(self, timeout=None):
101-
timeout = timeout or self._collect_interval
102-
self._collect_timer = IntervalTimer(self.collect, timeout, name="eapm metrics collect timer", daemon=True)
103-
logger.debug("Starting metrics collect timer")
104-
self._collect_timer.start()
105-
106-
def _stop_collect_timer(self):
107-
if self._collect_timer:
93+
def start_thread(self):
94+
if self._collect_interval:
95+
self._collect_timer = IntervalTimer(
96+
self.collect, self._collect_interval, name="eapm metrics collect timer", daemon=True
97+
)
98+
logger.debug("Starting metrics collect timer")
99+
self._collect_timer.start()
100+
101+
def stop_thread(self):
102+
if self._collect_timer and self._collect_timer.is_alive():
108103
logger.debug("Cancelling collect timer")
109104
self._collect_timer.cancel()
105+
self._collect_timer = None
110106

111107

112108
class MetricsSet(object):

elasticapm/transport/base.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,17 @@
3131
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3232

3333
import gzip
34+
import os
3435
import random
3536
import threading
3637
import time
3738
import timeit
3839
from collections import defaultdict
3940

4041
from elasticapm.contrib.async_worker import AsyncWorker
41-
from elasticapm.utils import compat, is_master_process, json_encoder
42+
from elasticapm.utils import compat, json_encoder
4243
from elasticapm.utils.logging import get_logger
44+
from elasticapm.utils.threading import ThreadManager
4345

4446
logger = get_logger("elasticapm.transport")
4547

@@ -51,7 +53,7 @@ def __init__(self, message, data=None, print_trace=True):
5153
self.print_trace = print_trace
5254

5355

54-
class Transport(object):
56+
class Transport(ThreadManager):
5557
"""
5658
All transport implementations need to subclass this class
5759
@@ -93,18 +95,12 @@ def __init__(
9395
self._queued_data = None
9496
self._event_queue = self._init_event_queue(chill_until=queue_chill_count, max_chill_time=queue_chill_time)
9597
self._is_chilled_queue = isinstance(self._event_queue, ChilledQueue)
96-
self._event_process_thread = None
98+
self._thread = None
9799
self._last_flush = timeit.default_timer()
98100
self._counts = defaultdict(int)
99101
self._flushed = threading.Event()
100102
self._closed = False
101103
self._processors = processors if processors is not None else []
102-
# only start the event processing thread if we are not in a uwsgi master process
103-
if not is_master_process():
104-
self._start_event_processor()
105-
else:
106-
# if we _are_ in a uwsgi master process, use the postfork mixup to start the thread after the fork
107-
compat.postfork(lambda: self._start_event_processor())
108104

109105
def queue(self, event_type, data, flush=False):
110106
try:
@@ -231,14 +227,14 @@ def _flush(self, buffer):
231227
except Exception as e:
232228
self.handle_transport_fail(e)
233229

234-
def _start_event_processor(self):
235-
if (not self._event_process_thread or not self._event_process_thread.is_alive()) and not self._closed:
230+
def start_thread(self):
231+
current_pid = os.getpid()
232+
if (not self._thread or current_pid != self._thread.pid) and not self._closed:
236233
try:
237-
self._event_process_thread = threading.Thread(
238-
target=self._process_queue, name="eapm event processor thread"
239-
)
240-
self._event_process_thread.daemon = True
241-
self._event_process_thread.start()
234+
self._thread = threading.Thread(target=self._process_queue, name="eapm event processor thread")
235+
self._thread.daemon = True
236+
self._thread.pid = current_pid
237+
self._thread.start()
242238
except RuntimeError:
243239
pass
244240

@@ -254,13 +250,15 @@ def close(self):
254250
Cleans up resources and closes connection
255251
:return:
256252
"""
257-
if self._closed:
253+
if self._closed or (not self._thread or self._thread.pid != os.getpid()):
258254
return
259255
self._closed = True
260256
self.queue("close", None)
261257
if not self._flushed.wait(timeout=self._max_flush_time):
262258
raise ValueError("close timed out")
263259

260+
stop_thread = close
261+
264262
def flush(self):
265263
"""
266264
Trigger a flush of the queue.

elasticapm/utils/threading.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,11 @@ def run(self):
8585

8686
def cancel(self):
8787
self._interval_done.set()
88+
89+
90+
class ThreadManager(object):
91+
def start_thread(self):
92+
raise NotImplementedError()
93+
94+
def stop_thread(self):
95+
raise NotImplementedError()

0 commit comments

Comments
 (0)