Skip to content

Commit 51b5154

Browse files
kaya-davidkmeinerz
andauthored
Dev eventstate implementation (#824)
* add new namespac (ng package) for new architecture implementations * implement EventState, related types and state machine logic * add tests for EventState and state machine logic * fix state machine: add missing STORED_IN_ERROR to FAILED transition * update CHANGELOG.md * refactor: move EventState state machine to class-level for shared reuse + tests * fix mypy issues * update CHANGELOG.md * fix next() transition behavior for ambiguous states and update related tests * refactor: rename class method next() to next_state() and remove redundant test case * Update logprep/ng/event_state.py Co-authored-by: kmeinerz <meinerzkai@gmail.com> * Update logprep/ng/event_state.py Co-authored-by: kmeinerz <meinerzkai@gmail.com> * refactor EventState tests to cover updated transition behavior and error messages --------- Co-authored-by: kmeinerz <meinerzkai@gmail.com>
1 parent 768c449 commit 51b5154

File tree

4 files changed

+354
-0
lines changed

4 files changed

+354
-0
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
### Features
77

88
### Improvements
9+
* implement EventState class to manage the lifecycle of log events
10+
* integrate a finite state machine to control valid state transitions
11+
912
* add ng packages as namespace in dirs 'unit' and 'logprep' as preparation for new architecture implementation
1013
* add abstract EventMetadata class and KafkaInputMetadata class
1114

logprep/ng/abc/__init__.py

Whitespace-only changes.

logprep/ng/event_state.py

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
"""The event classes and related types"""
2+
3+
from enum import StrEnum
4+
5+
6+
class EventStateType(StrEnum):
7+
"""Event states representing the lifecycle of a log event."""
8+
9+
RECEIVING = "receiving"
10+
"""The event is being received (e.g. from input connector)."""
11+
12+
RECEIVED = "received"
13+
"""The event has been successfully received."""
14+
15+
PROCESSING = "processing"
16+
"""The event is currently being processed by the pipeline."""
17+
18+
PROCESSED = "processed"
19+
"""The event has been processed by all pipeline processors."""
20+
21+
STORED_IN_OUTPUT = "stored_in_output"
22+
"""The event was successfully stored in the output connector."""
23+
24+
FAILED = "failed"
25+
"""The event failed during processing or output storage."""
26+
27+
STORED_IN_ERROR = "stored_in_error"
28+
"""The event was stored in the error output (e.g. error queue or
29+
fallback output)."""
30+
31+
DELIVERED = "delivered"
32+
"""The event was delivered to the target system or final destination."""
33+
34+
ACKED = "acked"
35+
"""The event was acknowledged by the downstream system or consumer."""
36+
37+
38+
class EventState:
39+
"""
40+
Manages the lifecycle of a log event using a finite state machine.
41+
42+
This class encapsulates valid transitions between event states such as
43+
receiving, processing, delivery, and failure handling. It supports
44+
automatic and conditional transitions based on success flags.
45+
46+
Examples
47+
--------
48+
>>> state = EventState()
49+
>>> state.current_state
50+
<EventStateType.RECEIVING: 'receiving'>
51+
52+
>>> state.next_state()
53+
<EventStateType.RECEIVED: 'received'>
54+
55+
>>> state.next_state()
56+
<EventStateType.PROCESSING: 'processing'>
57+
58+
>>> state.next_state(success=True)
59+
<EventStateType.PROCESSED: 'processed'>
60+
61+
>>> state.next_state()
62+
<EventStateType.STORED_IN_OUTPUT: 'stored_in_output'>
63+
64+
>>> state.next_state(success=False)
65+
<EventStateType.FAILED: 'failed'>
66+
67+
>>> state.next_state()
68+
<EventStateType.STORED_IN_ERROR: 'stored_in_error'>
69+
70+
>>> state.next_state(success=True)
71+
<EventStateType.DELIVERED: 'delivered'>
72+
"""
73+
74+
_FAILURE_STATES = {EventStateType.FAILED, EventStateType.STORED_IN_ERROR}
75+
_SUCCESS_STATES = {
76+
EventStateType.RECEIVED,
77+
EventStateType.PROCESSING,
78+
EventStateType.PROCESSED,
79+
EventStateType.STORED_IN_OUTPUT,
80+
EventStateType.DELIVERED,
81+
EventStateType.ACKED,
82+
}
83+
84+
_state_machine: dict[str, list[str]] = {} # Will be initialized lazily
85+
"""Class-level state transition map, initialized once and shared across
86+
all instances."""
87+
88+
def __init__(self) -> None:
89+
"""Initialize the event state with the default starting state."""
90+
91+
if not EventState._state_machine:
92+
EventState._state_machine = EventState._construct_state_machine()
93+
94+
self.current_state: str = EventStateType.RECEIVING
95+
96+
@staticmethod
97+
def _construct_state_machine() -> dict[str, list[str]]:
98+
"""
99+
Define the valid state transitions as an adjacency list.
100+
101+
Returns
102+
-------
103+
dict[EventStateType, list[str]]
104+
A dictionary mapping each state to its allowed successor states.
105+
"""
106+
107+
return {
108+
EventStateType.RECEIVING: [EventStateType.RECEIVED],
109+
EventStateType.RECEIVED: [EventStateType.PROCESSING],
110+
EventStateType.PROCESSING: [
111+
EventStateType.FAILED,
112+
EventStateType.PROCESSED,
113+
],
114+
EventStateType.PROCESSED: [EventStateType.STORED_IN_OUTPUT],
115+
EventStateType.STORED_IN_OUTPUT: [
116+
EventStateType.FAILED,
117+
EventStateType.DELIVERED,
118+
],
119+
EventStateType.FAILED: [EventStateType.STORED_IN_ERROR],
120+
EventStateType.STORED_IN_ERROR: [
121+
EventStateType.FAILED,
122+
EventStateType.DELIVERED,
123+
],
124+
EventStateType.DELIVERED: [EventStateType.ACKED],
125+
}
126+
127+
def next_state(self, *, success: bool | None = None) -> str:
128+
"""
129+
Advance to the next logical state based on the current state.
130+
131+
If there is exactly one valid next state, it will be chosen automatically.
132+
If multiple transitions are possible (e.g. success vs. failure), the `success`
133+
parameter can be used to resolve the path.
134+
135+
Parameters
136+
----------
137+
success : bool, optional
138+
If provided, determines the outcome path in ambiguous transitions.
139+
- True: prefer successful path
140+
- False: prefer failure path
141+
142+
Returns
143+
-------
144+
EventStateType
145+
The new current state after transition.
146+
147+
Raises
148+
------
149+
ValueError
150+
If the current state has no defined next transitions or if the transition
151+
is ambiguous and `success` is not provided.
152+
"""
153+
154+
next_states = self._state_machine.get(self.current_state)
155+
156+
if not next_states:
157+
raise ValueError("Invalid state transition: Already reached terminal state")
158+
159+
if len(next_states) == 1:
160+
self.current_state = next_states[0]
161+
return self.current_state
162+
163+
if success is not None:
164+
chosen = self._resolve_by_success_flag(next_states, success)
165+
166+
if chosen:
167+
self.current_state = chosen
168+
return self.current_state
169+
170+
raise ValueError("Invalid state transition: Ambiguous event without success.")
171+
172+
@classmethod
173+
def _resolve_by_success_flag(cls, options: list[str], success: bool) -> str | None:
174+
"""
175+
Resolve a path when multiple options are available based on success.
176+
177+
Parameters
178+
----------
179+
options : list of EventStateType
180+
Available next states.
181+
success : bool
182+
Outcome of the current step to choose the proper next state.
183+
184+
Returns
185+
-------
186+
str or None
187+
The chosen next state, or None if no suitable match was found.
188+
"""
189+
190+
candidates = cls._SUCCESS_STATES if success else cls._FAILURE_STATES
191+
return next((state for state in options if state in candidates), None)
192+
193+
def reset(self) -> None:
194+
"""Reset the event state to the initial state (RECEIVING)."""
195+
196+
self.current_state = EventStateType.RECEIVING
197+
198+
def __str__(self) -> str:
199+
"""
200+
Return a string representation of the current event state.
201+
202+
Returns
203+
-------
204+
str
205+
A string like "<EventState: current_state>".
206+
"""
207+
208+
return f"<EventState: {self.current_state}>"

tests/unit/ng/test_event_state.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# pylint: disable=missing-docstring
2+
# pylint: disable=protected-access
3+
4+
import pytest
5+
6+
from logprep.ng.event_state import EventState, EventStateType
7+
8+
9+
@pytest.mark.parametrize(
10+
"initial, success, next_expected",
11+
[
12+
# Automatic transitions
13+
(EventStateType.RECEIVING, None, EventStateType.RECEIVED),
14+
(EventStateType.RECEIVED, None, EventStateType.PROCESSING),
15+
(EventStateType.PROCESSED, None, EventStateType.STORED_IN_OUTPUT),
16+
(EventStateType.FAILED, None, EventStateType.STORED_IN_ERROR),
17+
(EventStateType.DELIVERED, None, EventStateType.ACKED),
18+
# Ambiguous transitions resolved with success flag
19+
(EventStateType.STORED_IN_ERROR, True, EventStateType.DELIVERED),
20+
(EventStateType.STORED_IN_ERROR, False, EventStateType.FAILED),
21+
(EventStateType.PROCESSING, True, EventStateType.PROCESSED),
22+
(EventStateType.PROCESSING, False, EventStateType.FAILED),
23+
(EventStateType.STORED_IN_OUTPUT, True, EventStateType.DELIVERED),
24+
(EventStateType.STORED_IN_OUTPUT, False, EventStateType.FAILED),
25+
],
26+
)
27+
def test_next_transitions_correctly(
28+
initial: EventStateType,
29+
success: bool | None,
30+
next_expected: EventStateType,
31+
) -> None:
32+
"""Ensure next_state() correctly advances to the expected state."""
33+
34+
state = EventState()
35+
state.current_state = initial
36+
result = state.next_state(success=success)
37+
assert result == next_expected
38+
assert state.current_state == next_expected
39+
40+
41+
def test_resolve_by_success_flag_returns_correct_result() -> None:
42+
"""Test resolving a next state with success flag (True/False)."""
43+
44+
assert (
45+
EventState._resolve_by_success_flag(
46+
[EventStateType.FAILED, EventStateType.PROCESSED], success=True
47+
)
48+
== EventStateType.PROCESSED
49+
)
50+
assert (
51+
EventState._resolve_by_success_flag(
52+
[EventStateType.FAILED, EventStateType.PROCESSED], success=False
53+
)
54+
== EventStateType.FAILED
55+
)
56+
57+
58+
def test_resolve_by_success_flag_returns_none_if_no_match() -> None:
59+
"""Return None if no state matches success condition."""
60+
61+
resolve_flag = EventState._resolve_by_success_flag(
62+
[EventStateType.ACKED],
63+
success=False,
64+
)
65+
assert resolve_flag is None
66+
67+
68+
def test_reset_sets_state_to_initial() -> None:
69+
"""Calling reset() should set the state back to RECEIVING."""
70+
state = EventState()
71+
state.current_state = EventStateType.FAILED
72+
state.reset()
73+
assert state.current_state == EventStateType.RECEIVING
74+
75+
76+
def test_str_representation() -> None:
77+
"""String representation should be human-readable."""
78+
state = EventState()
79+
assert str(state) == "<EventState: receiving>"
80+
81+
82+
def test_next_raises_exception_when_no_further_state() -> None:
83+
"""If no further transition is defined, next_state() should return None."""
84+
85+
state = EventState()
86+
state.current_state = EventStateType.ACKED
87+
88+
with pytest.raises(
89+
ValueError, match="Invalid state transition: Already reached terminal state"
90+
):
91+
state.next_state()
92+
93+
94+
@pytest.mark.parametrize(
95+
"current_state, success, expected",
96+
[
97+
# STORED_IN_OUTPUT -> ...
98+
(
99+
EventStateType.STORED_IN_OUTPUT,
100+
None,
101+
pytest.raises(ValueError, match="Ambiguous event without success"),
102+
),
103+
(EventStateType.STORED_IN_OUTPUT, True, EventStateType.DELIVERED),
104+
(EventStateType.STORED_IN_OUTPUT, False, EventStateType.FAILED),
105+
# STORED_IN_ERROR -> ...
106+
(
107+
EventStateType.STORED_IN_ERROR,
108+
None,
109+
pytest.raises(ValueError, match="Ambiguous event without success"),
110+
),
111+
(EventStateType.STORED_IN_ERROR, True, EventStateType.DELIVERED),
112+
(EventStateType.STORED_IN_ERROR, False, EventStateType.FAILED),
113+
],
114+
)
115+
def test_next_state_handles_ambiguous_transitions_with_or_without_success_flag(
116+
current_state, success, expected
117+
) -> None:
118+
"""
119+
Handle ambiguous transitions based on the success flag.
120+
Raises ValueError if success is not provided.
121+
"""
122+
123+
state = EventState()
124+
state.current_state = current_state
125+
126+
if isinstance(expected, type(pytest.raises(ValueError))):
127+
with expected:
128+
state.next_state(success=success)
129+
else:
130+
result = state.next_state(success=success)
131+
assert result == expected
132+
assert state.current_state == expected
133+
134+
135+
def test_all_states_covered_in_state_machine() -> None:
136+
"""Ensure that all EventStateType values are represented
137+
in the state machine."""
138+
139+
graph = EventState._construct_state_machine()
140+
all_keys = set(graph.keys())
141+
all_targets = {state for targets in graph.values() for state in targets}
142+
all_used = all_keys.union(all_targets)
143+
assert set(EventStateType).issubset(all_used)

0 commit comments

Comments
 (0)