Skip to content

Commit 7fc4325

Browse files
authored
fix config refresh interval (#818)
* set config refresh interval if no config change * refactor filter argument for as_dict to `_unserializable_fields` class attribute * add documentation for schedule_config_refresh * update changelog --------- Co-authored-by: Jörg Zimmermann <joerg.zimmermann@bwi.de>
1 parent 458c125 commit 7fc4325

File tree

10 files changed

+528
-470
lines changed

10 files changed

+528
-470
lines changed

CHANGELOG.md

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,26 @@
1414
### Improvements
1515

1616
* ensured that "_test.json" files are not loaded as rules
17+
* introduce new logger `Config`
18+
* refactor config refresh behavior from `logprep.runner` to `logprep.util.configuration`
19+
* refactor config related metrics from `logprep.runner` to `logprep.util.configuration`
20+
* added a log message for recovering config refresh mechanic from failing source
21+
22+
### Bugfix
23+
24+
* Fixed logging error in _revoke_callback() by adding error handling
25+
* Fixed endless loading in logprep test config
26+
* prevent the auto rule tester from loading rules directly defined inside the config, since they break the auto rule tester and can't have tests anyways
27+
* Fixed typo and broken link in documentation
28+
* Fixed assign_callback error in confluentkafka input
29+
* Fixed error logging in ` _get_configuration`, which caused the github checks to fail
30+
* Resolved `mypy` errors in `BaseProcessorTestCase.` by ensuring `self.object` and `self.patchers` are not `None` before accessing attributes.
31+
* Fix domain resolver errors for invalid domains
32+
* Fixed deprecation warnings caused by datetime when using Python >= 3.12
33+
* Fixed timestamp and timezone mismatch issue
34+
* Fixed a bug where config refresh interval was not reset to original interval after recovering from source related failures (i.e. http timeouts)
35+
* Fixed inconsistent generator statistics report during multithreading by making it thread safe
1736

18-
### Bugfix
19-
20-
- Fixed logging error in _revoke_callback() by adding error handling
21-
- Fixed endless loading in logprep test config
22-
- prevent the auto rule tester from loading rules directly defined inside the config, since they break the auto rule tester and can't have tests anyways
23-
- Fixed typo and broken link in documentation
24-
- Fixed assign_callback error in confluentkafka input
25-
- Fixed error logging in ` _get_configuration`, which caused the github checks to fail
26-
- Resolved `mypy` errors in `BaseProcessorTestCase.` by ensuring `self.object` and `self.patchers` are not `None` before accessing attributes.
27-
- Fix domain resolver errors for invalid domains
28-
- Fixed deprecation warnings caused by datetime when using Python >= 3.12
29-
- Fixed timestamp and timezone mismatch issue
30-
- Fixed inconsistent generator statistics report during multithreading by making it thread safe
3137

3238
## 16.1.0
3339
### Deprecations

examples/exampledata/config/pipeline.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version: 1
1+
version: 2
22
process_count: 2
33
timeout: 0.1
44
restart_count: 2

logprep/runner.py

Lines changed: 13 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,11 @@
99
from typing import Generator
1010

1111
from attrs import define, field
12-
from schedule import Scheduler
1312

1413
from logprep.abc.component import Component
1514
from logprep.framework.pipeline_manager import PipelineManager
16-
from logprep.metrics.metrics import CounterMetric, GaugeMetric
17-
from logprep.util.configuration import (
18-
ConfigGetterException,
19-
Configuration,
20-
ConfigVersionDidNotChangeError,
21-
InvalidConfigurationError,
22-
)
15+
from logprep.metrics.metrics import GaugeMetric
16+
from logprep.util.configuration import Configuration
2317
from logprep.util.defaults import EXITCODES
2418

2519

@@ -52,8 +46,6 @@ class Runner:
5246

5347
_exit_received: bool = False
5448

55-
scheduler: Scheduler
56-
5749
@define(kw_only=True)
5850
class Metrics(Component.Metrics):
5951
"""Metrics for the Logprep Runner."""
@@ -66,44 +58,6 @@ class Metrics(Component.Metrics):
6658
)
6759
"""Current size of the error queue."""
6860

69-
version_info: GaugeMetric = field(
70-
factory=lambda: GaugeMetric(
71-
description="Logprep version information",
72-
name="version_info",
73-
labels={"logprep": "unset", "config": "unset"},
74-
inject_label_values=False,
75-
)
76-
)
77-
"""Logprep version info."""
78-
config_refresh_interval: GaugeMetric = field(
79-
factory=lambda: GaugeMetric(
80-
description="Logprep config refresh interval",
81-
name="config_refresh_interval",
82-
labels={"from": "unset", "config": "unset"},
83-
)
84-
)
85-
"""Indicates the configuration refresh interval in seconds."""
86-
number_of_config_refreshes: CounterMetric = field(
87-
factory=lambda: CounterMetric(
88-
description="Indicates how often the logprep configuration was updated.",
89-
name="number_of_config_refreshes",
90-
labels={"from": "unset", "config": "unset"},
91-
)
92-
)
93-
"""Indicates how often the logprep configuration was updated."""
94-
number_of_config_refresh_failures: CounterMetric = field(
95-
factory=lambda: CounterMetric(
96-
description=(
97-
"Indicates how often the logprep configuration "
98-
"could not be updated due to failures during the update."
99-
),
100-
name="number_of_config_refreshes",
101-
labels={"from": "unset", "config": "unset"},
102-
)
103-
)
104-
"""Indicates how often the logprep configuration could not be updated
105-
due to failures during the update."""
106-
10761
@property
10862
def _metric_labels(self) -> dict[str, str]:
10963
labels = {
@@ -112,21 +66,6 @@ def _metric_labels(self) -> dict[str, str]:
11266
}
11367
return labels
11468

115-
@property
116-
def _config_refresh_interval(self) -> int:
117-
"""Indicates the configuration refresh interval in seconds."""
118-
return self._configuration.config_refresh_interval
119-
120-
@_config_refresh_interval.setter
121-
def _config_refresh_interval(self, value: int | None) -> None:
122-
"""Set the configuration refresh interval in seconds."""
123-
if value is None:
124-
self._configuration.config_refresh_interval = None
125-
elif value <= 5:
126-
self._configuration.config_refresh_interval = 5
127-
else:
128-
self._configuration.config_refresh_interval = value
129-
13069
# Use this method to obtain a runner singleton for production
13170
@staticmethod
13271
def get_runner(configuration: Configuration) -> "Runner":
@@ -137,89 +76,53 @@ def get_runner(configuration: Configuration) -> "Runner":
13776

13877
# For production, use the get_runner method to create/get access to a singleton!
13978
def __init__(self, configuration: Configuration) -> None:
140-
self._manager: PipelineManager | None = None
14179
atexit.register(self.stop_and_exit)
14280
self.exit_code = EXITCODES.SUCCESS
14381
self._configuration = configuration
82+
self._config_version = self._configuration.version # to trigger reloads
14483
self.metrics = self.Metrics(labels={"logprep": "unset", "config": "unset"})
14584
self._logger = logging.getLogger("Runner")
14685
self._manager = PipelineManager(configuration)
147-
self.scheduler = Scheduler()
14886

149-
def start(self):
87+
def start(self) -> None:
15088
"""Start processing.
15189
15290
This runs until an SIGTERM, SIGINT or KeyboardInterrupt signal is received, or an unhandled
15391
error occurs.
15492
"""
155-
self._set_version_info_metric()
156-
self._schedule_config_refresh_job()
93+
self._configuration.schedule_config_refresh()
15794
self._manager.start()
15895
self._logger.info("Startup complete")
15996
self._logger.debug("Runner iterating")
16097
self._iterate()
16198

162-
def stop_and_exit(self):
99+
def stop_and_exit(self) -> None:
163100
"""Stop the runner and exit the process."""
164101
self._logger.info("Shutting down")
165102
if self._manager:
166103
self._manager.stop()
167104

168-
def _iterate(self):
105+
def _iterate(self) -> None:
169106
for _ in self._keep_iterating():
170107
if self._exit_received:
171108
break
172-
self.scheduler.run_pending()
109+
self._configuration.refresh()
110+
if self._configuration.version != self._config_version:
111+
self._manager.reload()
112+
self._config_version = self._configuration.version
173113
if self._manager.should_exit():
174-
self.exit_code = EXITCODES.PIPELINE_ERROR.value
114+
self.exit_code = EXITCODES.PIPELINE_ERROR
175115
self._logger.error("Restart count exceeded. Exiting.")
176116
sys.exit(self.exit_code)
177117
if self._manager.error_queue is not None:
178118
self.metrics.number_of_events_in_error_queue += self._manager.error_queue.qsize()
179119
self._manager.restart_failed_pipeline()
180120

181-
def reload_configuration(self):
182-
"""Reloads the configuration"""
183-
try:
184-
self._configuration.reload()
185-
self._logger.info("Successfully reloaded configuration")
186-
self.metrics.number_of_config_refreshes += 1
187-
self._manager.reload()
188-
self._schedule_config_refresh_job()
189-
self._logger.info(f"Configuration version: {self._configuration.version}")
190-
self._set_version_info_metric()
191-
except ConfigGetterException as error:
192-
self._logger.warning(f"Failed to load configuration: {error}")
193-
self.metrics.number_of_config_refresh_failures += 1
194-
self._config_refresh_interval = int(self._config_refresh_interval / 4)
195-
self._schedule_config_refresh_job()
196-
except ConfigVersionDidNotChangeError as error:
197-
self._logger.info(str(error))
198-
except InvalidConfigurationError as error:
199-
self._logger.error(str(error))
200-
self.metrics.number_of_config_refresh_failures += 1
201-
202-
def _set_version_info_metric(self):
203-
self.metrics.version_info.add_with_labels(
204-
1,
205-
{"logprep": f"{version('logprep')}", "config": self._configuration.version},
206-
)
207-
208-
def stop(self):
121+
def stop(self) -> None:
209122
"""Stop the logprep runner. Is called by the signal handler
210123
in run_logprep.py."""
211124
self._exit_received = True
212125

213-
def _schedule_config_refresh_job(self):
214-
refresh_interval = self._config_refresh_interval
215-
scheduler = self.scheduler
216-
if scheduler.jobs:
217-
scheduler.cancel_job(scheduler.jobs[0])
218-
if isinstance(refresh_interval, (float, int)):
219-
self.metrics.config_refresh_interval += refresh_interval
220-
scheduler.every(refresh_interval).seconds.do(self.reload_configuration)
221-
self._logger.info(f"Config refresh interval is set to: {refresh_interval} seconds")
222-
223126
def _keep_iterating(self) -> Generator:
224127
"""Indicates whether the runner should keep iterating."""
225128

0 commit comments

Comments
 (0)