Skip to content

Commit 43c39a6

Browse files
authored
perf: improve clusterer performance by optimizing static dotted fields (#940)
* Improve clusterer performance by removing dotted fields * Fix pylint and mypy * Remove dotted field access in ng clusterer and refactor * Fix mypy * Refactor clusterer and add helper function * Fix mypy for clusterer * Fix mypy for get_field_value_no_slice * Fix clusterer config docstring * Use small caps for typing in clusterer
1 parent fa80f1e commit 43c39a6

File tree

7 files changed

+331
-79
lines changed

7 files changed

+331
-79
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
### Improvements
88
* improve http endpoint security by fully checking basic auth hashes, and doing that in a time constant manner to not expose secrets
9+
* improve clusterer performance by removing access via dotted fields where possible
910

1011
### Bugfix
1112
* fix missing examples for processor decoder

logprep/abc/processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ def result(self, value: ProcessorResult):
150150
self._result = value
151151

152152
@property
153-
def rules(self):
153+
def rules(self) -> list["Rule"]:
154154
"""Returns all rules
155155
156156
Returns

logprep/ng/processor/clusterer/processor.py

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616
1717
Criteria 1: { "message": "A sample message", "tags": ["clusterable", ...], ... }
1818
Criteria 2: { "message": "A sample message", "clusterable": true, ... }
19-
Criteria 3: { "message": "A sample message", "syslog": { "facility": <number> }, "event": { "severity": <string> }, ... }
19+
Criteria 3: {
20+
"message": "A sample message",
21+
"syslog": { "facility": <number> },
22+
"event": { "severity": <string> },
23+
...
24+
}
2025
2126
Processor Configuration
2227
^^^^^^^^^^^^^^^^^^^^^^^
@@ -38,7 +43,7 @@
3843
.. automodule:: logprep.processor.clusterer.rule
3944
"""
4045

41-
import math
46+
import typing
4247
from typing import Tuple
4348

4449
from attrs import define, field, validators
@@ -51,7 +56,13 @@
5156
SignatureEngine,
5257
SignaturePhaseStreaming,
5358
)
54-
from logprep.util.helper import add_fields_to, get_dotted_field_value
59+
from logprep.util.helper import (
60+
add_fields_to,
61+
get_dotted_field_value,
62+
get_field_value_no_slice,
63+
MISSING,
64+
FieldValue,
65+
)
5566

5667

5768
class Clusterer(FieldManager):
@@ -70,15 +81,20 @@ class Config(Processor.Config):
7081

7182
sps: SignaturePhaseStreaming
7283

73-
_last_rule_id: int
84+
_last_rule_id: int | None
7485

7586
_last_non_extracted_signature: str | None
7687

88+
@property
89+
def config(self) -> Config:
90+
"""Provides the properly typed configuration object"""
91+
return typing.cast(Clusterer.Config, self._config)
92+
7793
def __init__(self, name: str, configuration: Processor.Config):
7894
super().__init__(name=name, configuration=configuration)
7995
self.sps = SignaturePhaseStreaming()
8096

81-
self._last_rule_id = math.inf
97+
self._last_rule_id = None
8298
self._last_non_extracted_signature = None
8399

84100
def _apply_rules(self, event, rule):
@@ -95,28 +111,25 @@ def _is_clusterable(self, event: dict, source_field: str) -> bool:
95111
return False
96112

97113
# Return clusterable state if it exists, since it can be true or false
98-
clusterable = get_dotted_field_value(event, "clusterable")
114+
clusterable = event.get("clusterable")
99115
if clusterable is not None:
100-
return clusterable
116+
return bool(clusterable)
101117

102118
# Alternatively, check for a clusterable tag
103-
tags = get_dotted_field_value(event, "tags")
119+
tags = event.get("tags")
104120
if tags and "clusterable" in tags:
105121
return True
106122

107123
# It is clusterable if a syslog with PRI exists even if no clusterable field exists
108-
# has_facility = 'syslog' in event and 'facility' in event['syslog']
109-
# has_severity = 'event' in event and 'severity' in event['event']
110-
if self._syslog_has_pri(event):
111-
return True
112-
113-
return False
124+
return self._syslog_has_pri(event)
114125

115126
@staticmethod
116-
def _syslog_has_pri(event: dict):
117-
syslog_value = get_dotted_field_value(event, "syslog")
118-
event_value = get_dotted_field_value(event, "event")
119-
return not (syslog_value is None or event_value is None)
127+
def _syslog_has_pri(event: dict) -> bool:
128+
facility = get_field_value_no_slice(event, ("syslog", "facility"))
129+
severity = get_field_value_no_slice(event, ("event", "severity"))
130+
if MISSING in (facility, severity):
131+
return False
132+
return None not in (facility, severity)
120133

121134
def _cluster(self, event: dict, rule: ClustererRule):
122135
raw_text, sig_text = self._get_text_to_cluster(rule, event)
@@ -133,16 +146,16 @@ def _cluster(self, event: dict, rule: ClustererRule):
133146
if self._syslog_has_pri(event):
134147
cluster_signature = " , ".join(
135148
[
136-
str(get_dotted_field_value(event, "syslog.facility")),
137-
str(get_dotted_field_value(event, "event.severity")),
149+
str(get_field_value_no_slice(event, ("syslog", "facility"))),
150+
str(get_field_value_no_slice(event, ("event", "severity"))),
138151
cluster_signature_based_on_message,
139152
]
140153
)
141154
else:
142155
cluster_signature = cluster_signature_based_on_message
143156
add_fields_to(
144157
event,
145-
fields={self._config.output_field_name: cluster_signature},
158+
fields={self.config.output_field_name: cluster_signature},
146159
merge_with_target=rule.merge_with_target,
147160
overwrite_target=rule.overwrite_target,
148161
)
@@ -152,11 +165,13 @@ def _is_new_tree_iteration(self, rule: ClustererRule) -> bool:
152165
rule_id = self._rule_tree.get_rule_id(rule)
153166
if rule_id is None:
154167
return True
155-
is_new_iteration = rule_id <= self._last_rule_id
168+
is_new_iteration = self._last_rule_id is None or rule_id <= self._last_rule_id
156169
self._last_rule_id = rule_id
157170
return is_new_iteration
158171

159-
def _get_text_to_cluster(self, rule: ClustererRule, event: dict) -> Tuple[str, str | None]:
172+
def _get_text_to_cluster(
173+
self, rule: ClustererRule, event: dict
174+
) -> tuple[FieldValue, str | None]:
160175
sig_text = None
161176
if self._is_new_tree_iteration(rule):
162177
self._last_non_extracted_signature = None
@@ -168,9 +183,10 @@ def _get_text_to_cluster(self, rule: ClustererRule, event: dict) -> Tuple[str, s
168183
raw_text = sig_text
169184
return raw_text, sig_text
170185

171-
def test_rules(self):
172-
results = {}
186+
def test_rules(self) -> dict[str, list]:
187+
results: dict[str, list] = {}
173188
for _, rule in enumerate(self.rules):
189+
rule = typing.cast(ClustererRule, rule)
174190
rule_repr = repr(rule)
175191
results[rule_repr] = []
176192
try:

logprep/processor/clusterer/processor.py

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616
1717
Criteria 1: { "message": "A sample message", "tags": ["clusterable", ...], ... }
1818
Criteria 2: { "message": "A sample message", "clusterable": true, ... }
19-
Criteria 3: { "message": "A sample message", "syslog": { "facility": <number> }, "event": { "severity": <string> }, ... }
19+
Criteria 3: {
20+
"message": "A sample message",
21+
"syslog": { "facility": <number> },
22+
"event": { "severity": <string> },
23+
...
24+
}
2025
2126
Processor Configuration
2227
^^^^^^^^^^^^^^^^^^^^^^^
@@ -38,43 +43,57 @@
3843
.. automodule:: logprep.processor.clusterer.rule
3944
"""
4045

41-
import math
46+
import typing
4247
from typing import Tuple
4348

4449
from attrs import define, field, validators
4550

46-
from logprep.abc.processor import Processor
51+
from logprep.processor.field_manager.processor import FieldManager
4752
from logprep.processor.clusterer.rule import ClustererRule
4853
from logprep.processor.clusterer.signature_calculation.signature_phase import (
4954
LogRecord,
5055
SignatureEngine,
5156
SignaturePhaseStreaming,
5257
)
53-
from logprep.processor.field_manager.processor import FieldManager
54-
from logprep.util.helper import add_fields_to, get_dotted_field_value
58+
from logprep.util.helper import (
59+
add_fields_to,
60+
get_dotted_field_value,
61+
get_field_value_no_slice,
62+
MISSING,
63+
FieldValue,
64+
)
5565

5666

5767
class Clusterer(FieldManager):
5868
"""Cluster log events using a heuristic."""
5969

6070
@define(kw_only=True)
61-
class Config(Processor.Config):
71+
class Config(FieldManager.Config):
6272
"""Clusterer Configuration"""
6373

6474
output_field_name: str = field(validator=validators.instance_of(str))
6575
"""defines in which field results of the clustering should be stored."""
6676

67-
__slots__ = ["sps"]
77+
__slots__ = ("sps", "_last_rule_id", "_last_non_extracted_signature")
78+
79+
rule_class = ClustererRule
6880

6981
sps: SignaturePhaseStreaming
7082

71-
rule_class = ClustererRule
83+
_last_rule_id: int | None
7284

73-
def __init__(self, name: str, configuration: Processor.Config):
85+
_last_non_extracted_signature: str | None
86+
87+
@property
88+
def config(self) -> Config:
89+
"""Provides the properly typed configuration object"""
90+
return typing.cast(Clusterer.Config, self._config)
91+
92+
def __init__(self, name: str, configuration: Config):
7493
super().__init__(name=name, configuration=configuration)
7594
self.sps = SignaturePhaseStreaming()
7695

77-
self._last_rule_id = math.inf
96+
self._last_rule_id = None
7897
self._last_non_extracted_signature = None
7998

8099
def _apply_rules(self, event, rule):
@@ -91,28 +110,25 @@ def _is_clusterable(self, event: dict, source_field: str) -> bool:
91110
return False
92111

93112
# Return clusterable state if it exists, since it can be true or false
94-
clusterable = get_dotted_field_value(event, "clusterable")
113+
clusterable = event.get("clusterable")
95114
if clusterable is not None:
96-
return clusterable
115+
return bool(clusterable)
97116

98117
# Alternatively, check for a clusterable tag
99-
tags = get_dotted_field_value(event, "tags")
118+
tags = event.get("tags")
100119
if tags and "clusterable" in tags:
101120
return True
102121

103122
# It is clusterable if a syslog with PRI exists even if no clusterable field exists
104-
# has_facility = 'syslog' in event and 'facility' in event['syslog']
105-
# has_severity = 'event' in event and 'severity' in event['event']
106-
if self._syslog_has_pri(event):
107-
return True
108-
109-
return False
123+
return self._syslog_has_pri(event)
110124

111125
@staticmethod
112-
def _syslog_has_pri(event: dict):
113-
syslog_value = get_dotted_field_value(event, "syslog")
114-
event_value = get_dotted_field_value(event, "event")
115-
return not (syslog_value is None or event_value is None)
126+
def _syslog_has_pri(event: dict) -> bool:
127+
facility = get_field_value_no_slice(event, ("syslog", "facility"))
128+
severity = get_field_value_no_slice(event, ("event", "severity"))
129+
if MISSING in (facility, severity):
130+
return False
131+
return None not in (facility, severity)
116132

117133
def _cluster(self, event: dict, rule: ClustererRule):
118134
raw_text, sig_text = self._get_text_to_cluster(rule, event)
@@ -129,16 +145,16 @@ def _cluster(self, event: dict, rule: ClustererRule):
129145
if self._syslog_has_pri(event):
130146
cluster_signature = " , ".join(
131147
[
132-
str(get_dotted_field_value(event, "syslog.facility")),
133-
str(get_dotted_field_value(event, "event.severity")),
148+
str(get_field_value_no_slice(event, ("syslog", "facility"))),
149+
str(get_field_value_no_slice(event, ("event", "severity"))),
134150
cluster_signature_based_on_message,
135151
]
136152
)
137153
else:
138154
cluster_signature = cluster_signature_based_on_message
139155
add_fields_to(
140156
event,
141-
fields={self._config.output_field_name: cluster_signature},
157+
fields={self.config.output_field_name: cluster_signature},
142158
merge_with_target=rule.merge_with_target,
143159
overwrite_target=rule.overwrite_target,
144160
)
@@ -148,11 +164,13 @@ def _is_new_tree_iteration(self, rule: ClustererRule) -> bool:
148164
rule_id = self._rule_tree.get_rule_id(rule)
149165
if rule_id is None:
150166
return True
151-
is_new_iteration = rule_id <= self._last_rule_id
167+
is_new_iteration = self._last_rule_id is None or rule_id <= self._last_rule_id
152168
self._last_rule_id = rule_id
153169
return is_new_iteration
154170

155-
def _get_text_to_cluster(self, rule: ClustererRule, event: dict) -> Tuple[str, str]:
171+
def _get_text_to_cluster(
172+
self, rule: ClustererRule, event: dict
173+
) -> tuple[FieldValue, str | None]:
156174
sig_text = None
157175
if self._is_new_tree_iteration(rule):
158176
self._last_non_extracted_signature = None
@@ -164,16 +182,17 @@ def _get_text_to_cluster(self, rule: ClustererRule, event: dict) -> Tuple[str, s
164182
raw_text = sig_text
165183
return raw_text, sig_text
166184

167-
def test_rules(self):
168-
results = {}
185+
def test_rules(self) -> dict[str, list]:
186+
results: dict[str, list] = {}
169187
for _, rule in enumerate(self.rules):
170-
rule_repr = rule.__repr__()
188+
rule = typing.cast(ClustererRule, rule)
189+
rule_repr = repr(rule)
171190
results[rule_repr] = []
172191
try:
173192
for test in rule.tests:
174193
result = SignatureEngine.apply_signature_rule(test["raw"], rule)
175194
expected_result = test["result"]
176195
results[rule_repr].append((result, expected_result))
177-
except AttributeError:
196+
except AttributeError: # pragma: no cover
178197
results[rule_repr].append(None)
179198
return results

logprep/util/helper.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import itertools
44
import re
55
import sys
6+
import typing
67
from enum import Enum, auto
78
from functools import lru_cache, partial, reduce
89
from importlib.metadata import version
@@ -425,6 +426,18 @@ def pop_dotted_field_value(event: dict, dotted_field: str) -> FieldValue:
425426
)
426427

427428

429+
def get_field_value_no_slice(
430+
event: dict[str, FieldValue], fields: Iterable[str]
431+
) -> FieldValue | Missing:
432+
current: FieldValue = event
433+
for field in fields:
434+
try:
435+
current = typing.cast(dict[str, FieldValue], current)[field]
436+
except (KeyError, TypeError):
437+
return MISSING
438+
return current
439+
440+
428441
def _retrieve_field_value_and_delete_field_if_configured(
429442
sub_dict, dotted_fields_path, delete_source_field=False
430443
):

0 commit comments

Comments
 (0)