Skip to content

Commit cf6fdd1

Browse files
authored
setup event backlog in abstract input class and event registration an… (#860)
* setup event backlog in abstract input class and event registration and adapt get_event/get_next methods of input classes
1 parent dd5dafd commit cf6fdd1

File tree

22 files changed

+1675
-938
lines changed

22 files changed

+1675
-938
lines changed

CHANGELOG.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,27 @@
11
## Upcoming Changes
22

33
## next release
4+
5+
### Breaking
6+
7+
### Features
8+
9+
### Improvements
10+
11+
* Add `event_backlog` to the abstract input interface.
12+
* Register event in the backlog and return the registered event object.
13+
14+
### Bugfix
15+
16+
417
### Breaking
518

619
### Features
720

821
### Improvements
922

1023
* make `processors` handle Event class based objects
11-
* add an EventBacklog class hierarchies
24+
* add an EventBacklog class hierarchies
1225
* implement an iterator interface to Input connectors
1326
* make simple connectors handle Event class based objects
1427
* make `opensearch_output` handle Event class based objects

doc/source/development/notebooks/new_architecture_examples/1_event_metadata.ipynb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
},
2929
"cell_type": "code",
3030
"source": [
31-
"from logprep.ng.connector.confluent_kafka.metadata import KafkaInputMetadata\n",
31+
"from logprep.ng.connector.confluent_kafka.metadata import ConfluentKafkaMetadata\n",
3232
"\n",
33-
"kafka_metadata = KafkaInputMetadata(partition=1, offset=0)\n",
33+
"kafka_metadata = ConfluentKafkaMetadata(partition=1, offset=0)\n",
3434
"\n",
3535
"print(kafka_metadata)"
3636
],
@@ -59,7 +59,7 @@
5959
},
6060
"cell_type": "code",
6161
"source": [
62-
"kafka_metadata = KafkaInputMetadata(partition=\"1\", offset=\"Zero\")\n",
62+
"kafka_metadata = ConfluentKafkaMetadata(partition=\"1\", offset=\"Zero\")\n",
6363
"\n",
6464
"print(kafka_metadata)"
6565
],

logprep/abc/component.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import time
88
from abc import ABC
99
from functools import cached_property
10-
from typing import Callable
10+
from typing import Callable, Literal
1111

1212
import msgspec
1313
from attr import define, field, validators
@@ -59,7 +59,7 @@ def __attrs_post_init__(self):
5959

6060
# instance attributes
6161
name: str
62-
pipeline_index: int
62+
pipeline_index: int | None
6363
_config: Config
6464

6565
# class attributes
@@ -72,7 +72,7 @@ def metric_labels(self) -> dict:
7272
"""Labels for the metrics"""
7373
return {"component": self._config.type, "name": self.name, "description": "", "type": ""}
7474

75-
def __init__(self, name: str, configuration: "Component.Config", pipeline_index: int = None):
75+
def __init__(self, name: str, configuration: "Config", pipeline_index: int | None = None):
7676
self._config = configuration
7777
self.name = name
7878
self.pipeline_index = pipeline_index
@@ -149,9 +149,7 @@ def health(self) -> bool:
149149
logger.debug("Checking health of %s", self.name)
150150
return True
151151

152-
def _schedule_task(
153-
self, task: Callable, seconds: int, args: tuple = None, kwargs: dict = None
154-
) -> None:
152+
def _schedule_task(self, task: Callable, seconds: int, *args, **kwargs) -> None:
155153
"""Schedule a task to run periodically during pipeline run.
156154
The task is run in :code:`pipeline.py` in the :code:`process_pipeline` method.
157155

logprep/ng/abc/input.py

Lines changed: 176 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import os
99
import zlib
1010
from abc import abstractmethod
11+
from collections.abc import Iterator
12+
from copy import deepcopy
1113
from functools import cached_property, partial
1214
from hmac import HMAC
1315
from typing import Literal, Optional, Self
@@ -16,21 +18,105 @@
1618
from attrs import define, field, validators
1719

1820
from logprep.abc.connector import Connector
19-
from logprep.abc.input import (
20-
CriticalInputError,
21-
FullEventConfig,
22-
HmacConfig,
23-
TimeDeltaConfig,
24-
)
21+
from logprep.abc.exceptions import LogprepException
2522
from logprep.metrics.metrics import Metric
23+
from logprep.ng.abc.event import EventBacklog
24+
from logprep.ng.event.event_state import EventStateType
25+
from logprep.ng.event.log_event import LogEvent
2626
from logprep.ng.event.set_event_backlog import SetEventBacklog
2727
from logprep.processor.base.exceptions import FieldExistsWarning
2828
from logprep.util.helper import add_fields_to, get_dotted_field_value
2929
from logprep.util.time import UTC, TimeParser
3030
from logprep.util.validators import dict_structure_validator
3131

3232

33-
class InputIterator:
33+
class InputError(LogprepException):
34+
"""Base class for Input related exceptions."""
35+
36+
def __init__(self, input_connector: "Input", message: str) -> None:
37+
input_connector.metrics.number_of_errors += 1
38+
super().__init__(f"{self.__class__.__name__} in {input_connector.describe()}: {message}")
39+
40+
41+
class CriticalInputError(InputError):
42+
"""A significant error occurred - log and don't process the event."""
43+
44+
def __init__(self, input_connector: "Input", message, raw_input):
45+
super().__init__(
46+
input_connector, f"{message} -> event was written to error output if configured"
47+
)
48+
self.raw_input = deepcopy(raw_input) # is written to error output
49+
self.message = message
50+
51+
52+
class CriticalInputParsingError(CriticalInputError):
53+
"""The input couldn't be parsed correctly."""
54+
55+
56+
class FatalInputError(InputError):
57+
"""Must not be catched."""
58+
59+
60+
class InputWarning(LogprepException):
61+
"""May be catched but must be displayed to the user/logged."""
62+
63+
def __init__(self, input_connector: "Input", message: str) -> None:
64+
input_connector.metrics.number_of_warnings += 1
65+
super().__init__(f"{self.__class__.__name__} in {input_connector.describe()}: {message}")
66+
67+
68+
class SourceDisconnectedWarning(InputWarning):
69+
"""Lost (or failed to establish) contact with the source."""
70+
71+
72+
@define(kw_only=True)
73+
class HmacConfig:
74+
"""Hmac Configurations
75+
The hmac itself will be calculated with python's hashlib.sha256 algorithm and the compression
76+
is based on the zlib library.
77+
"""
78+
79+
target: str = field(validator=validators.instance_of(str))
80+
"""Defines a field inside the log message which should be used for the hmac
81+
calculation. If the target field is not found or does not exists an error message
82+
is written into the configured output field. If the hmac should be calculated on
83+
the full incoming raw message instead of a subfield the target option should be set to
84+
:code:`<RAW_MSG>`."""
85+
key: str = field(validator=validators.instance_of(str))
86+
"""The secret key that will be used to calculate the hmac."""
87+
output_field: str = field(validator=validators.instance_of(str))
88+
"""The parent name of the field where the hmac result should be written to in the
89+
original incoming log message. As subfields the result will have a field called :code:`hmac`,
90+
containing the calculated hmac, and :code:`compressed_base64`, containing the original message
91+
that was used to calculate the hmac in compressed and base64 encoded. In case the output
92+
field exists already in the original message an error is raised."""
93+
94+
95+
@define(kw_only=True)
96+
class TimeDeltaConfig:
97+
"""TimeDelta Configurations
98+
Works only if the preprocessor log_arrival_time_target_field is set."""
99+
100+
target_field: str = field(validator=(validators.instance_of(str), lambda _, __, x: bool(x)))
101+
"""Defines the fieldname to which the time difference should be written to."""
102+
reference_field: str = field(validator=(validators.instance_of(str), lambda _, __, x: bool(x)))
103+
"""Defines a field with a timestamp that should be used for the time difference.
104+
The calculation will be the arrival time minus the time of this reference field."""
105+
106+
107+
@define(kw_only=True)
108+
class FullEventConfig:
109+
"""Full Event Configurations
110+
Works only if the preprocessor :code:`add_full_event_to_target_field` is set."""
111+
112+
format: str = field(validator=validators.in_(["dict", "str"]), default="str")
113+
"""Defines the Format in which the event should be written to the new field.
114+
The default ist :code:`str`, which results in escaped json string"""
115+
target_field: str = field(validator=validators.instance_of(str), default="event.original")
116+
"""Defines the fieldname which the event should be written to"""
117+
118+
119+
class InputIterator(Iterator):
34120
"""Base Class for an input Iterator"""
35121

36122
def __init__(self, input_connector: "Input", timeout: float):
@@ -181,10 +267,10 @@ class Config(Connector.Config):
181267
def __init__(
182268
self,
183269
name: str,
184-
configuration: Literal["Config"],
270+
configuration: "Input.Config",
185271
pipeline_index: int | None = None,
186272
) -> None:
187-
self.backlog = SetEventBacklog()
273+
self.event_backlog: EventBacklog | None = None
188274

189275
super().__init__(name, configuration, pipeline_index)
190276

@@ -213,6 +299,18 @@ def __call__(self, *, timeout: float) -> InputIterator:
213299

214300
return InputIterator(self, timeout)
215301

302+
def setup(self):
303+
"""Initialize the input connector.
304+
305+
This method sets the event backlog to an instance of `SetEventBacklog`.
306+
You can override this to configure a different event backlog type,
307+
depending on the available configuration settings.
308+
"""
309+
310+
super().setup()
311+
312+
self.event_backlog = SetEventBacklog()
313+
216314
@property
217315
def _add_hmac(self) -> bool:
218316
"""Check and return if a hmac should be added or not."""
@@ -291,11 +389,30 @@ def _get_event(self, timeout: float) -> tuple:
291389
292390
Returns
293391
-------
294-
(event, raw_event)
392+
(event, raw_event, metadata)
295393
"""
296394

395+
def _register_failed_event(
396+
self,
397+
event: dict | None,
398+
raw_event: bytes | None,
399+
metadata: dict | None,
400+
error: Exception,
401+
) -> None:
402+
"""Helper method to register the failed event to event backlog."""
403+
404+
error_log_event = LogEvent(
405+
data=event if isinstance(event, dict) else {},
406+
original=raw_event if raw_event is not None else b"",
407+
metadata=metadata,
408+
)
409+
error_log_event.errors.append(error)
410+
error_log_event.state.current_state = EventStateType.FAILED
411+
412+
self.event_backlog.register(events=[error_log_event]) # type: ignore[union-attr]
413+
297414
@Metric.measure_time()
298-
def get_next(self, timeout: float) -> dict | None:
415+
def get_next(self, timeout: float) -> LogEvent | None:
299416
"""Return the next document
300417
301418
Parameters
@@ -305,36 +422,57 @@ def get_next(self, timeout: float) -> dict | None:
305422
306423
Returns
307424
-------
308-
input : dict
425+
input : LogEvent, None
309426
Input log data.
310-
311-
Raises
312-
------
313-
TimeoutWhileWaitingForInputError
314-
After timeout (usually a fraction of seconds) if no input data was available by then.
315427
"""
316-
event, raw_event = self._get_event(timeout)
317-
if event is None:
318-
return None
319-
self.metrics.number_of_processed_events += 1
320-
if not isinstance(event, dict):
321-
raise CriticalInputError(self, "not a dict", event)
428+
429+
event: dict | None = None
430+
raw_event: bytearray | None = None
431+
metadata: dict | None = None
432+
322433
try:
323-
if self._add_full_event_to_target_field:
324-
self._write_full_event_to_target_field(event, raw_event)
325-
if self._add_hmac:
326-
event = self._add_hmac_to(event, raw_event)
327-
if self._add_version_info:
328-
self._add_version_information_to_event(event)
329-
if self._add_log_arrival_time_information:
330-
self._add_arrival_time_information_to_event(event)
331-
if self._add_log_arrival_timedelta_information:
332-
self._add_arrival_timedelta_information_to_event(event)
333-
if self._add_env_enrichment:
334-
self._add_env_enrichment_to_event(event)
335-
except FieldExistsWarning as error:
336-
raise CriticalInputError(self, error.args[0], event) from error
337-
return event
434+
event, raw_event, metadata = self._get_event(timeout)
435+
436+
if event is None:
437+
return None
438+
439+
if not isinstance(event, dict):
440+
raise CriticalInputError(self, "not a dict", event)
441+
442+
self.metrics.number_of_processed_events += 1
443+
444+
try:
445+
if self._add_full_event_to_target_field:
446+
self._write_full_event_to_target_field(event, raw_event)
447+
if self._add_hmac:
448+
event = self._add_hmac_to(event, raw_event)
449+
if self._add_version_info:
450+
self._add_version_information_to_event(event)
451+
if self._add_log_arrival_time_information:
452+
self._add_arrival_time_information_to_event(event)
453+
if self._add_log_arrival_timedelta_information:
454+
self._add_arrival_timedelta_information_to_event(event)
455+
if self._add_env_enrichment:
456+
self._add_env_enrichment_to_event(event)
457+
except FieldExistsWarning as error:
458+
raise CriticalInputError(self, error.args[0], event) from error
459+
except CriticalInputError as error:
460+
self._register_failed_event(
461+
event=event,
462+
raw_event=raw_event,
463+
metadata=metadata,
464+
error=error,
465+
)
466+
return None
467+
468+
log_event = LogEvent(
469+
data=event,
470+
original=raw_event,
471+
metadata=metadata,
472+
)
473+
474+
self.event_backlog.register(events=[log_event]) # type: ignore[union-attr]
475+
return log_event
338476

339477
def batch_finished_callback(self) -> None:
340478
"""Can be called by output connectors after processing a batch of one or more records."""

0 commit comments

Comments
 (0)