Skip to content

Commit e40b78d

Browse files
authored
add and register ng connectors (#850)
1 parent 198b808 commit e40b78d

File tree

13 files changed

+887
-3
lines changed

13 files changed

+887
-3
lines changed

logprep/ng/abc/output.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
"""This module provides the abstract base class for all output endpoints.
2+
New output endpoint types are created by implementing it.
3+
"""
4+
5+
import threading
6+
from abc import abstractmethod
7+
from copy import deepcopy
8+
from typing import Any, Callable
9+
10+
from attrs import define, field, validators
11+
12+
from logprep.abc.connector import Connector
13+
from logprep.abc.exceptions import LogprepException
14+
from logprep.ng.abc.event import Event
15+
16+
17+
class OutputError(LogprepException):
18+
"""Base class for Output related exceptions."""
19+
20+
def __init__(self, output: "Output", message: str) -> None:
21+
output.metrics.number_of_errors += 1
22+
super().__init__(f"{self.__class__.__name__} in {output.describe()}: {message}")
23+
24+
25+
class OutputWarning(LogprepException):
26+
"""Base class for Output related warnings."""
27+
28+
def __init__(self, output: "Output", message: str) -> None:
29+
output.metrics.number_of_warnings += 1
30+
super().__init__(f"{self.__class__.__name__} in {output.describe()}: {message}")
31+
32+
33+
class CriticalOutputError(OutputError):
34+
"""A significant error occurred - log and don't process the event."""
35+
36+
__match_args__ = ("raw_input",)
37+
38+
def __init__(self, output: "Output", message: str, raw_input: Any) -> None:
39+
super().__init__(output, f"{message} -> event was written to error output if configured")
40+
self.raw_input = deepcopy(raw_input)
41+
self.message = message
42+
43+
44+
class FatalOutputError(OutputError):
45+
"""Must not be caught."""
46+
47+
48+
class Output(Connector):
49+
"""Connect to a output destination."""
50+
51+
@define(kw_only=True)
52+
class Metrics(Connector.Metrics):
53+
"""Tracks statistics about this connector"""
54+
55+
@define(kw_only=True)
56+
class Config(Connector.Config):
57+
"""output config parameters"""
58+
59+
default: bool = field(validator=validators.instance_of(bool), default=True)
60+
""" (Optional) if :code:`false` the event are not delivered to this output.
61+
But this output can be called as output for extra_data.
62+
"""
63+
64+
@property
65+
def default(self):
66+
"""returns the default parameter"""
67+
return self._config.default
68+
69+
@property
70+
def metric_labels(self) -> dict:
71+
"""Return the metric labels for this component."""
72+
return {
73+
"component": "output",
74+
"description": self.describe(),
75+
"type": self._config.type,
76+
"name": self.name,
77+
}
78+
79+
def __init__(self, name: str, configuration: "Connector.Config"):
80+
super().__init__(name, configuration)
81+
self.input_connector = None
82+
self.lock = threading.Lock()
83+
84+
@abstractmethod
85+
def store(self, event: Event) -> None:
86+
"""Store the event in the output destination.
87+
88+
Parameters
89+
----------
90+
event : dict
91+
Processed log event that will be stored.
92+
"""
93+
94+
@abstractmethod
95+
def store_custom(self, event: Event, target: str) -> None:
96+
"""Store the event in the output destination.
97+
98+
Parameters
99+
----------
100+
event : dict
101+
Processed log event that will be stored.
102+
target : str
103+
Custom target for the event.
104+
"""
105+
106+
@abstractmethod
107+
def flush(self):
108+
"""Write the backlog to the output destination.
109+
Needs to be implemented in child classes to ensure
110+
that the backlog is written to the output destination.
111+
"""
112+
113+
@staticmethod
114+
def _handle_errors(func: Callable) -> Callable:
115+
"""Decorator to handle errors during the store process."""
116+
117+
def wrapper(self, *args, **kwargs):
118+
event = args[0] if args else kwargs.get("event")
119+
try:
120+
func(self, *args, **kwargs)
121+
except Exception as e: # pylint: disable=broad-except
122+
event.errors.append(e)
123+
self.metrics.number_of_errors += 1
124+
event.state.next_state(success=False)
125+
126+
return wrapper
127+
128+
def shut_down(self) -> None:
129+
"""Shut down the output connector."""
130+
self.flush()
131+
super().shut_down()

logprep/ng/connector/console/__init__.py

Whitespace-only changes.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""
2+
ConsoleOutput
3+
=============
4+
5+
This section describes the ConsoleOutput, which pretty prints documents to the
6+
console and can be used for testing.
7+
8+
Example
9+
^^^^^^^
10+
.. code-block:: yaml
11+
:linenos:
12+
13+
output:
14+
my_console_output:
15+
type: console_output
16+
"""
17+
18+
import sys
19+
from pprint import pprint
20+
21+
from logprep.ng.abc.event import Event
22+
from logprep.ng.abc.output import Output
23+
24+
25+
class ConsoleOutput(Output):
26+
"""A console output that pretty prints documents instead of storing them."""
27+
28+
@Output._handle_errors
29+
def store(self, event: Event) -> None:
30+
"""Store a document to the console."""
31+
event.state.next_state()
32+
pprint(event.data)
33+
self.metrics.number_of_processed_events += 1
34+
event.state.next_state(success=True)
35+
36+
@Output._handle_errors
37+
def store_custom(self, event: Event, target: str) -> None:
38+
"""Store a custom document to the console."""
39+
event.state.next_state()
40+
pprint(event.data, stream=getattr(sys, target))
41+
self.metrics.number_of_processed_events += 1
42+
event.state.next_state(success=True)
43+
44+
def flush(self):
45+
"""Flush the console output.
46+
Do nothing as console output does not have a backlog.
47+
"""
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""
2+
DummyOutput
3+
===========
4+
5+
The Dummy Output Connector can be used to store unmodified documents.
6+
It only requires the connector type to be configured.
7+
8+
.. code-block:: yaml
9+
10+
Example
11+
^^^^^^^
12+
.. code-block:: yaml
13+
:linenos:
14+
15+
output:
16+
my_dummy_output:
17+
type: dummy_output
18+
"""
19+
20+
from typing import TYPE_CHECKING, List
21+
22+
from attr import define, field
23+
from attrs import validators
24+
25+
from logprep.ng.abc.event import Event
26+
from logprep.ng.abc.output import Output
27+
from logprep.ng.event.log_event import LogEvent
28+
29+
if TYPE_CHECKING:
30+
from logprep.abc.connector import Connector # pragma: no cover
31+
32+
33+
class DummyOutput(Output):
34+
"""
35+
A dummy output that stores unmodified documents unless an exception was raised.
36+
"""
37+
38+
@define(kw_only=True)
39+
class Config(Output.Config):
40+
"""Common Configurations"""
41+
42+
do_nothing: bool = field(default=False)
43+
"""If set to True, this connector will behave completely neutral and not do anything.
44+
Especially counting metrics or storing events."""
45+
46+
exceptions: List[str] = field(
47+
validator=validators.deep_iterable(
48+
member_validator=validators.instance_of((str, type(None))),
49+
iterable_validator=validators.instance_of(list),
50+
),
51+
default=[],
52+
)
53+
"""List of exceptions to raise when storing an event. This is useful
54+
for testing purposes. If an exception is raised, the exception is handled
55+
by the output decorator.
56+
"""
57+
58+
events: list[LogEvent]
59+
failed_events: list[LogEvent]
60+
setup_called_count: int
61+
shut_down_called_count: int
62+
_exceptions: list
63+
64+
__slots__ = [
65+
"events",
66+
"failed_events",
67+
"setup_called_count",
68+
"shut_down_called_count",
69+
"_exceptions",
70+
]
71+
72+
def __init__(self, name: str, configuration: "Connector.Config"):
73+
super().__init__(name, configuration)
74+
self.events = []
75+
self.failed_events = []
76+
self.shut_down_called_count = 0
77+
self._exceptions = configuration.exceptions
78+
79+
@Output._handle_errors
80+
def store(self, event: Event) -> None:
81+
"""Store the document in the output destination.
82+
83+
Parameters
84+
----------
85+
document : dict
86+
Processed log event that will be stored.
87+
"""
88+
if self._config.do_nothing:
89+
return
90+
event.state.next_state()
91+
if self._exceptions:
92+
exception = self._exceptions.pop(0)
93+
if exception is not None:
94+
raise Exception(exception) # pylint: disable=broad-exception-raised
95+
self.events.append(event)
96+
event.state.next_state(success=True)
97+
self.metrics.number_of_processed_events += 1
98+
99+
def store_custom(self, event: Event, target: str): # pylint: disable=unused-argument
100+
"""Store additional data in a custom location inside the output destination."""
101+
self.store(event)
102+
103+
def shut_down(self):
104+
self.shut_down_called_count += 1
105+
super().shut_down()
106+
107+
def flush(self):
108+
"""Flush not implemented because it has not backlog."""

logprep/ng/connector/jsonl/__init__.py

Whitespace-only changes.
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""
2+
JsonlOutput
3+
===========
4+
5+
The JsonlOutput Connector can be used to write processed documents to .jsonl
6+
files.
7+
8+
Example
9+
^^^^^^^
10+
.. code-block:: yaml
11+
:linenos:
12+
13+
output:
14+
my_jsonl_output:
15+
type: jsonl_output
16+
output_file: path/to/output.file
17+
output_file_custom: ""
18+
output_file_error: ""
19+
"""
20+
21+
import json
22+
23+
from attrs import define, field, validators
24+
25+
from logprep.ng.abc.event import Event
26+
from logprep.ng.abc.output import Output
27+
28+
29+
class JsonlOutput(Output):
30+
"""An output that writes the documents it was initialized with to a file.
31+
32+
Parameters
33+
----------
34+
output_path : str
35+
The path for the output file.
36+
output_path_custom : str
37+
The path to store custom
38+
output_path_error : str
39+
The path to store error
40+
"""
41+
42+
@define(kw_only=True)
43+
class Config(Output.Config):
44+
"""Common Configurations"""
45+
46+
output_file = field(validator=validators.instance_of(str))
47+
output_file_custom = field(validator=validators.instance_of(str), default="")
48+
49+
last_timeout: float
50+
events: list[dict]
51+
failed_events: list[dict]
52+
53+
__slots__ = [
54+
"last_timeout",
55+
"events",
56+
"failed_events",
57+
]
58+
59+
def __init__(self, name: str, configuration: "Output.Config"):
60+
super().__init__(name, configuration)
61+
self.events = []
62+
self.failed_events = []
63+
64+
def setup(self):
65+
super().setup()
66+
open(self._config.output_file, "a+", encoding="utf8").close()
67+
if self._config.output_file_custom:
68+
open(self._config.output_file_custom, "a+", encoding="utf8").close()
69+
70+
@staticmethod
71+
def _write_json(filepath: str, line: dict):
72+
"""writes processed document to configured file"""
73+
with open(filepath, "a+", encoding="utf8") as file:
74+
file.write(f"{json.dumps(line)}\n")
75+
76+
@Output._handle_errors
77+
def store(self, event: Event) -> None:
78+
"""Store the event in the output destination."""
79+
event.state.next_state()
80+
self.events.append(event.data)
81+
JsonlOutput._write_json(self._config.output_file, event.data)
82+
self.metrics.number_of_processed_events += 1
83+
event.state.next_state(success=True)
84+
85+
@Output._handle_errors
86+
def store_custom(self, event: Event, target: str) -> None:
87+
"""Store the event in the output destination with a custom target."""
88+
event.state.next_state()
89+
document = {target: event.data}
90+
self.events.append(document)
91+
92+
if self._config.output_file_custom:
93+
JsonlOutput._write_json(self._config.output_file_custom, document)
94+
self.metrics.number_of_processed_events += 1
95+
event.state.next_state(success=True)
96+
97+
def flush(self):
98+
"""Flush is not implemented because it has no backlog."""

logprep/ng/event/event_state.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ def __str__(self) -> str:
128128

129129
return f"{self.current_state}"
130130

131+
def __repr__(self) -> str:
132+
return self.__str__()
133+
131134
@staticmethod
132135
def _construct_state_machine() -> dict[EventStateType, list[EventStateType]]:
133136
"""

0 commit comments

Comments
 (0)