Skip to content

Commit 738d7dc

Browse files
authored
fix: properly handle falsy resolved values in generic_resolver (#933)
* add tests for verifying the bug * fix handling of falsy values and improve static typing * fix mypy issues * attempt to circumvent false mypy issue * restrict usage of None * support None values properly * fix typing issues and adapt ng * attempt to fix mypy issue * fix typo in docs * add CHANGELOG entries * add tests for resolve_file and add more complex tests * fix tests * use MISSING to avoid double dict query * update CHANGELOG to reflect the bugfix for resolve_from_file * fix review findings * replace isinstance with a more readable alternative
1 parent 751bbe8 commit 738d7dc

File tree

7 files changed

+371
-72
lines changed

7 files changed

+371
-72
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
### Features
55
* add uv as dependency management, including uv.lock
66
* allow configuration (and auto-creation) of service accounts in helm chart
7+
* generic_resolver now handles all FieldValue types (including None)
78

89
### Improvements
910
* simplify Dockerfile and remove docker build support for `LOGPREP_VERSION`
1011

1112
### Bugfix
1213
* generic_resolver now follows yaml standard and accepts a list instead of relying on the ordering of a dict
1314
* decoder errors are handled properly as warnings instead of causing pipeline failures
15+
* generic_resolver now properly handles falsy values in resolve_list and resolve_from_file
1416

1517
## 18.0.1
1618
### Breaking

logprep/ng/processor/generic_resolver/processor.py

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,27 @@
2323
.. automodule:: logprep.processor.generic_resolver.rule
2424
"""
2525

26+
import typing
27+
from copy import deepcopy
2628
from functools import cached_property, lru_cache
27-
from typing import Callable, Optional
29+
from typing import Callable
2830

2931
from attrs import define, field, validators
3032

3133
from logprep.metrics.metrics import GaugeMetric
3234
from logprep.ng.abc.processor import Processor
3335
from logprep.ng.processor.field_manager.processor import FieldManager
3436
from logprep.processor.base.exceptions import FieldExistsWarning
37+
from logprep.processor.base.rule import Rule
3538
from logprep.processor.generic_resolver.rule import GenericResolverRule
36-
from logprep.util.helper import add_fields_to, get_dotted_field_value
39+
from logprep.util.helper import (
40+
MISSING,
41+
FieldValue,
42+
Missing,
43+
add_fields_to,
44+
get_dotted_field_value,
45+
)
46+
from logprep.util.typing import is_lru_cached
3747

3848

3949
class GenericResolver(FieldManager):
@@ -43,9 +53,7 @@ class GenericResolver(FieldManager):
4353
class Config(Processor.Config):
4454
"""GenericResolver config"""
4555

46-
max_cache_entries: Optional[int] = field(
47-
validator=validators.optional(validators.instance_of(int)), default=0
48-
)
56+
max_cache_entries: int = field(validator=validators.instance_of(int), default=0)
4957
"""(Optional) Size of cache for results when resolving from a list.
5058
The cache can be disabled by setting this option to :code:`0`.
5159
@@ -56,9 +64,7 @@ class Config(Processor.Config):
5664
and OOM situations caused by the generic resolver cache.
5765
5866
"""
59-
cache_metrics_interval: Optional[int] = field(
60-
validator=validators.optional(validators.instance_of(int)), default=1
61-
)
67+
cache_metrics_interval: int = field(validator=validators.instance_of(int), default=1)
6268
"""(Optional) Cache metrics won't be updated immediately.
6369
Instead updating is skipped for a number of events before it's next update.
6470
:code:`cache_metrics_interval` sets the number of events between updates (default: 1)."""
@@ -103,25 +109,33 @@ class Metrics(FieldManager.Metrics):
103109

104110
rule_class = GenericResolverRule
105111

112+
@property
113+
def config(self) -> Config:
114+
"""Returns the typed GenericResolver.Config"""
115+
return typing.cast(GenericResolver.Config, self._config)
116+
106117
@property
107118
def max_cache_entries(self) -> int:
108119
"""Returns the configured number of max_cache_entries"""
109-
return self._config.max_cache_entries
120+
return self.config.max_cache_entries
110121

111122
@property
112123
def cache_metrics_interval(self) -> int:
113124
"""Returns the configured cache_metrics_interval"""
114-
return self._config.cache_metrics_interval
125+
return self.config.cache_metrics_interval
115126

116127
@cached_property
117-
def _get_lru_cached_value_from_list(self) -> Callable[[GenericResolverRule, str], str | None]:
118-
"""Returns lru cashed method to retrieve values from list if configured"""
128+
def _get_lru_cached_value_from_list(
129+
self,
130+
) -> Callable[[GenericResolverRule, str], FieldValue | Missing]:
131+
"""Returns lru cached method to retrieve values from list if configured"""
119132
if self.max_cache_entries <= 0:
120133
return self._resolve_value_from_list
121134
return lru_cache(maxsize=self.max_cache_entries)(self._resolve_value_from_list)
122135

123-
def _apply_rules(self, event: dict, rule: GenericResolverRule) -> None:
136+
def _apply_rules(self, event: dict, rule: Rule) -> None:
124137
"""Apply the given rule to the current event"""
138+
rule = typing.cast(GenericResolverRule, rule)
125139
source_field_values = [
126140
get_dotted_field_value(event, source_field)
127141
for source_field in rule.field_mapping.keys()
@@ -130,25 +144,30 @@ def _apply_rules(self, event: dict, rule: GenericResolverRule) -> None:
130144
conflicting_fields = []
131145
for source_field, target_field in rule.field_mapping.items():
132146
source_field_value = str(get_dotted_field_value(event, source_field))
133-
content = self._find_content_of_first_matching_pattern(rule, source_field_value)
134-
if not content:
147+
resolved_content = self._find_content_of_first_matching_pattern(
148+
rule, source_field_value
149+
)
150+
if resolved_content is MISSING:
135151
continue
136152
current_content = get_dotted_field_value(event, target_field)
137-
if isinstance(current_content, list) and content in current_content:
153+
if isinstance(current_content, list) and resolved_content in current_content:
138154
continue
155+
if isinstance(resolved_content, (list, dict)):
156+
resolved_content = deepcopy(resolved_content)
139157
try:
140158
add_fields_to(
141159
event,
142160
fields={
143161
target_field: (
144-
[content]
162+
[resolved_content]
145163
if rule.merge_with_target and current_content is None
146-
else content
164+
else resolved_content
147165
)
148166
},
149167
rule=rule,
150168
merge_with_target=rule.merge_with_target,
151169
overwrite_target=rule.overwrite_target,
170+
skip_none=False,
152171
)
153172
except FieldExistsWarning as error:
154173
conflicting_fields.extend(error.skipped_fields)
@@ -160,28 +179,28 @@ def _apply_rules(self, event: dict, rule: GenericResolverRule) -> None:
160179

161180
def _find_content_of_first_matching_pattern(
162181
self, rule: GenericResolverRule, source_field_value: str
163-
) -> str | None:
182+
) -> FieldValue | Missing:
164183
if rule.resolve_from_file:
165184
matches = rule.pattern.match(source_field_value)
166185
if matches:
167186
mapping = matches.group("mapping")
168187
if rule.ignore_case:
169188
mapping = mapping.upper()
170-
content = rule.additions.get(mapping)
171-
if content:
189+
content = rule.additions.get(mapping, MISSING)
190+
if content is not MISSING:
172191
return content
173192
return self._get_lru_cached_value_from_list(rule, source_field_value)
174193

175194
def _resolve_value_from_list(
176195
self, rule: GenericResolverRule, source_field_value: str
177-
) -> Optional[str]:
196+
) -> FieldValue | Missing:
178197
for pattern, content in rule.compiled_resolve_list:
179198
if pattern.search(source_field_value):
180199
return content
181-
return None
200+
return MISSING
182201

183202
def _update_cache_metrics(self) -> None:
184-
if self.max_cache_entries <= 0:
203+
if not is_lru_cached(self._get_lru_cached_value_from_list):
185204
return
186205
self._cache_metrics_skip_count += 1
187206
if self._cache_metrics_skip_count < self.cache_metrics_interval:
@@ -192,7 +211,7 @@ def _update_cache_metrics(self) -> None:
192211
self.metrics.new_results += cache_info.misses
193212
self.metrics.cached_results += cache_info.hits
194213
self.metrics.num_cache_entries += cache_info.currsize
195-
self.metrics.cache_load += cache_info.currsize / cache_info.maxsize
214+
self.metrics.cache_load += cache_info.currsize / self.max_cache_entries
196215

197216
def setup(self) -> None:
198217
super().setup()

logprep/processor/generic_resolver/processor.py

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,27 @@
2323
.. automodule:: logprep.processor.generic_resolver.rule
2424
"""
2525

26+
import typing
27+
from copy import deepcopy
2628
from functools import cached_property, lru_cache
27-
from typing import Optional
29+
from typing import Callable
2830

2931
from attrs import define, field, validators
3032

3133
from logprep.abc.processor import Processor
3234
from logprep.metrics.metrics import GaugeMetric
3335
from logprep.processor.base.exceptions import FieldExistsWarning
36+
from logprep.processor.base.rule import Rule
3437
from logprep.processor.field_manager.processor import FieldManager
3538
from logprep.processor.generic_resolver.rule import GenericResolverRule
36-
from logprep.util.helper import add_fields_to, get_dotted_field_value
39+
from logprep.util.helper import (
40+
MISSING,
41+
FieldValue,
42+
Missing,
43+
add_fields_to,
44+
get_dotted_field_value,
45+
)
46+
from logprep.util.typing import is_lru_cached
3747

3848

3949
class GenericResolver(FieldManager):
@@ -43,9 +53,7 @@ class GenericResolver(FieldManager):
4353
class Config(Processor.Config):
4454
"""GenericResolver config"""
4555

46-
max_cache_entries: Optional[int] = field(
47-
validator=validators.optional(validators.instance_of(int)), default=0
48-
)
56+
max_cache_entries: int = field(validator=validators.instance_of(int), default=0)
4957
"""(Optional) Size of cache for results when resolving from a list.
5058
The cache can be disabled by setting this option to :code:`0`.
5159
@@ -56,9 +64,7 @@ class Config(Processor.Config):
5664
and OOM situations caused by the generic resolver cache.
5765
5866
"""
59-
cache_metrics_interval: Optional[int] = field(
60-
validator=validators.optional(validators.instance_of(int)), default=1
61-
)
67+
cache_metrics_interval: int = field(validator=validators.instance_of(int), default=1)
6268
"""(Optional) Cache metrics won't be updated immediately.
6369
Instead updating is skipped for a number of events before it's next update.
6470
:code:`cache_metrics_interval` sets the number of events between updates (default: 1)."""
@@ -104,24 +110,32 @@ class Metrics(FieldManager.Metrics):
104110
rule_class = GenericResolverRule
105111

106112
@property
107-
def max_cache_entries(self):
113+
def config(self) -> Config:
114+
"""Returns the typed GenericResolver.Config"""
115+
return typing.cast(GenericResolver.Config, self._config)
116+
117+
@property
118+
def max_cache_entries(self) -> int:
108119
"""Returns the configured number of max_cache_entries"""
109-
return self._config.max_cache_entries
120+
return self.config.max_cache_entries
110121

111122
@property
112-
def cache_metrics_interval(self):
123+
def cache_metrics_interval(self) -> int:
113124
"""Returns the configured cache_metrics_interval"""
114-
return self._config.cache_metrics_interval
125+
return self.config.cache_metrics_interval
115126

116127
@cached_property
117-
def _get_lru_cached_value_from_list(self):
118-
"""Returns lru cashed method to retrieve values from list if configured"""
128+
def _get_lru_cached_value_from_list(
129+
self,
130+
) -> Callable[[GenericResolverRule, str], FieldValue | Missing]:
131+
"""Returns lru cached method to retrieve values from list if configured"""
119132
if self.max_cache_entries <= 0:
120133
return self._resolve_value_from_list
121134
return lru_cache(maxsize=self.max_cache_entries)(self._resolve_value_from_list)
122135

123-
def _apply_rules(self, event: dict, rule: GenericResolverRule) -> None:
136+
def _apply_rules(self, event: dict, rule: Rule) -> None:
124137
"""Apply the given rule to the current event"""
138+
rule = typing.cast(GenericResolverRule, rule)
125139
source_field_values = [
126140
get_dotted_field_value(event, source_field)
127141
for source_field in rule.field_mapping.keys()
@@ -130,25 +144,30 @@ def _apply_rules(self, event: dict, rule: GenericResolverRule) -> None:
130144
conflicting_fields = []
131145
for source_field, target_field in rule.field_mapping.items():
132146
source_field_value = str(get_dotted_field_value(event, source_field))
133-
content = self._find_content_of_first_matching_pattern(rule, source_field_value)
134-
if not content:
147+
resolved_content = self._find_content_of_first_matching_pattern(
148+
rule, source_field_value
149+
)
150+
if resolved_content is MISSING:
135151
continue
136152
current_content = get_dotted_field_value(event, target_field)
137-
if isinstance(current_content, list) and content in current_content:
153+
if isinstance(current_content, list) and resolved_content in current_content:
138154
continue
155+
if isinstance(resolved_content, (list, dict)):
156+
resolved_content = deepcopy(resolved_content)
139157
try:
140158
add_fields_to(
141159
event,
142160
fields={
143161
target_field: (
144-
[content]
162+
[resolved_content]
145163
if rule.merge_with_target and current_content is None
146-
else content
164+
else resolved_content
147165
)
148166
},
149167
rule=rule,
150168
merge_with_target=rule.merge_with_target,
151169
overwrite_target=rule.overwrite_target,
170+
skip_none=False,
152171
)
153172
except FieldExistsWarning as error:
154173
conflicting_fields.extend(error.skipped_fields)
@@ -160,28 +179,28 @@ def _apply_rules(self, event: dict, rule: GenericResolverRule) -> None:
160179

161180
def _find_content_of_first_matching_pattern(
162181
self, rule: GenericResolverRule, source_field_value: str
163-
) -> str | None:
182+
) -> FieldValue | Missing:
164183
if rule.resolve_from_file:
165184
matches = rule.pattern.match(source_field_value)
166185
if matches:
167186
mapping = matches.group("mapping")
168187
if rule.ignore_case:
169188
mapping = mapping.upper()
170-
content = rule.additions.get(mapping)
171-
if content:
189+
content = rule.additions.get(mapping, MISSING)
190+
if content is not MISSING:
172191
return content
173192
return self._get_lru_cached_value_from_list(rule, source_field_value)
174193

175194
def _resolve_value_from_list(
176195
self, rule: GenericResolverRule, source_field_value: str
177-
) -> Optional[str]:
196+
) -> FieldValue | Missing:
178197
for pattern, content in rule.compiled_resolve_list:
179198
if pattern.search(source_field_value):
180199
return content
181-
return None
200+
return MISSING
182201

183-
def _update_cache_metrics(self):
184-
if self.max_cache_entries <= 0:
202+
def _update_cache_metrics(self) -> None:
203+
if not is_lru_cached(self._get_lru_cached_value_from_list):
185204
return
186205
self._cache_metrics_skip_count += 1
187206
if self._cache_metrics_skip_count < self.cache_metrics_interval:
@@ -192,8 +211,8 @@ def _update_cache_metrics(self):
192211
self.metrics.new_results += cache_info.misses
193212
self.metrics.cached_results += cache_info.hits
194213
self.metrics.num_cache_entries += cache_info.currsize
195-
self.metrics.cache_load += cache_info.currsize / cache_info.maxsize
214+
self.metrics.cache_load += cache_info.currsize / self.max_cache_entries
196215

197-
def setup(self):
216+
def setup(self) -> None:
198217
super().setup()
199218
self._cache_metrics_skip_count = 0

0 commit comments

Comments
 (0)