Skip to content

Commit 939518d

Browse files
committed
Introduce FixedTimeWindow intermediary class
1 parent ec0fb99 commit 939518d

File tree

5 files changed

+79
-180
lines changed

5 files changed

+79
-180
lines changed

quixstreams/dataframe/windows/definitions.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@
3333
SlidingWindowSingleAggregation,
3434
)
3535
from .time_based import (
36+
FixedTimeWindowMultiAggregation,
37+
FixedTimeWindowSingleAggregation,
3638
TimeWindow,
37-
TimeWindowMultiAggregation,
38-
TimeWindowSingleAggregation,
3939
)
4040

4141
if TYPE_CHECKING:
@@ -312,10 +312,11 @@ def _create_window(
312312
) -> TimeWindow:
313313
if func_name:
314314
window_type: Union[
315-
type[TimeWindowSingleAggregation], type[TimeWindowMultiAggregation]
316-
] = TimeWindowSingleAggregation
315+
type[FixedTimeWindowSingleAggregation],
316+
type[FixedTimeWindowMultiAggregation],
317+
] = FixedTimeWindowSingleAggregation
317318
else:
318-
window_type = TimeWindowMultiAggregation
319+
window_type = FixedTimeWindowMultiAggregation
319320

320321
return window_type(
321322
duration_ms=self._duration_ms,
@@ -361,10 +362,11 @@ def _create_window(
361362
) -> TimeWindow:
362363
if func_name:
363364
window_type: Union[
364-
type[TimeWindowSingleAggregation], type[TimeWindowMultiAggregation]
365-
] = TimeWindowSingleAggregation
365+
type[FixedTimeWindowSingleAggregation],
366+
type[FixedTimeWindowMultiAggregation],
367+
] = FixedTimeWindowSingleAggregation
366368
else:
367-
window_type = TimeWindowMultiAggregation
369+
window_type = FixedTimeWindowMultiAggregation
368370

369371
return window_type(
370372
duration_ms=self._duration_ms,

quixstreams/dataframe/windows/session.py

Lines changed: 9 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,23 @@
22
import time
33
from typing import TYPE_CHECKING, Any, Iterable, Optional
44

5-
from quixstreams.context import message_context
65
from quixstreams.state import WindowedPartitionTransaction, WindowedState
76

87
from .base import (
98
MultiAggregationWindowMixin,
109
SingleAggregationWindowMixin,
11-
Window,
1210
WindowKeyResult,
1311
WindowOnLateCallback,
1412
)
15-
from .time_based import ClosingStrategy, ClosingStrategyValues
13+
from .time_based import ClosingStrategy, TimeWindow
1614

1715
if TYPE_CHECKING:
1816
from quixstreams.dataframe.dataframe import StreamingDataFrame
1917

2018
logger = logging.getLogger(__name__)
2119

2220

23-
class SessionWindow(Window):
21+
class SessionWindow(TimeWindow):
2422
"""
2523
Session window groups events that occur within a specified timeout period.
2624
@@ -40,77 +38,10 @@ def __init__(
4038
dataframe: "StreamingDataFrame",
4139
on_late: Optional[WindowOnLateCallback] = None,
4240
):
43-
super().__init__(
44-
name=name,
45-
dataframe=dataframe,
46-
)
41+
super().__init__(name=name, dataframe=dataframe, on_late=on_late)
4742

4843
self._timeout_ms = timeout_ms
4944
self._grace_ms = grace_ms
50-
self._on_late = on_late
51-
self._closing_strategy = ClosingStrategy.KEY
52-
53-
def final(
54-
self, closing_strategy: ClosingStrategyValues = "key"
55-
) -> "StreamingDataFrame":
56-
"""
57-
Apply the session window aggregation and return results only when the sessions
58-
are closed.
59-
60-
The format of returned sessions:
61-
```python
62-
{
63-
"start": <session start time in milliseconds>,
64-
"end": <session end time in milliseconds>,
65-
"value: <aggregated session value>,
66-
}
67-
```
68-
69-
The individual session is closed when the event time
70-
(the maximum observed timestamp across the partition) passes
71-
the last event timestamp + timeout + grace period.
72-
The closed sessions cannot receive updates anymore and are considered final.
73-
74-
:param closing_strategy: the strategy to use when closing sessions.
75-
Possible values:
76-
- `"key"` - messages advance time and close sessions with the same key.
77-
If some message keys appear irregularly in the stream, the latest sessions can remain unprocessed until a message with the same key is received.
78-
- `"partition"` - messages advance time and close sessions for the whole partition to which this message key belongs.
79-
If timestamps between keys are not ordered, it may increase the number of discarded late messages.
80-
Default - `"key"`.
81-
"""
82-
self._closing_strategy = ClosingStrategy.new(closing_strategy)
83-
return super().final()
84-
85-
def current(
86-
self, closing_strategy: ClosingStrategyValues = "key"
87-
) -> "StreamingDataFrame":
88-
"""
89-
Apply the session window transformation to the StreamingDataFrame to return results
90-
for each updated session.
91-
92-
The format of returned sessions:
93-
```python
94-
{
95-
"start": <session start time in milliseconds>,
96-
"end": <session end time in milliseconds>,
97-
"value: <aggregated session value>,
98-
}
99-
```
100-
101-
This method processes streaming data and returns results as they come,
102-
regardless of whether the session is closed or not.
103-
104-
:param closing_strategy: the strategy to use when closing sessions.
105-
Possible values:
106-
- `"key"` - messages advance time and close sessions with the same key.
107-
If some message keys appear irregularly in the stream, the latest sessions can remain unprocessed until a message with the same key is received.
108-
- `"partition"` - messages advance time and close sessions for the whole partition to which this message key belongs.
109-
If timestamps between keys are not ordered, it may increase the number of discarded late messages.
110-
Default - `"key"`.
111-
"""
112-
self._closing_strategy = ClosingStrategy.new(closing_strategy)
113-
return super().current()
11445

11546
def process_window(
11647
self,
@@ -140,7 +71,7 @@ def process_window(
14071
# Check if the event is too late
14172
if timestamp_ms < session_expiry_threshold:
14273
late_by_ms = session_expiry_threshold - timestamp_ms
143-
self._on_expired_session(
74+
self._on_expired_window(
14475
value=value,
14576
key=key,
14677
start=timestamp_ms,
@@ -216,17 +147,17 @@ def process_window(
216147

217148
# Expire old sessions
218149
if self._closing_strategy == ClosingStrategy.PARTITION:
219-
expired_windows = self.expire_sessions_by_partition(
150+
expired_windows = self.expire_by_partition(
220151
transaction, session_expiry_threshold, collect
221152
)
222153
else:
223-
expired_windows = self.expire_sessions_by_key(
154+
expired_windows = self.expire_by_key(
224155
key, state, session_expiry_threshold, collect
225156
)
226157

227158
return updated_windows, expired_windows
228159

229-
def expire_sessions_by_partition(
160+
def expire_by_partition(
230161
self,
231162
transaction: WindowedPartitionTransaction,
232163
expiry_threshold: int,
@@ -257,7 +188,7 @@ def expire_sessions_by_partition(
257188
for prefix in seen_prefixes:
258189
state = transaction.as_state(prefix=prefix)
259190
prefix_expired = list(
260-
self.expire_sessions_by_key(prefix, state, expiry_threshold, collect)
191+
self.expire_by_key(prefix, state, expiry_threshold, collect)
261192
)
262193
expired_results.extend(prefix_expired)
263194
count += len(prefix_expired)
@@ -271,7 +202,7 @@ def expire_sessions_by_partition(
271202

272203
return expired_results
273204

274-
def expire_sessions_by_key(
205+
def expire_by_key(
275206
self,
276207
key: Any,
277208
state: WindowedState,
@@ -318,43 +249,6 @@ def expire_sessions_by_key(
318249
round(time.monotonic() - start, 2),
319250
)
320251

321-
def _on_expired_session(
322-
self,
323-
value: Any,
324-
key: Any,
325-
start: int,
326-
end: int,
327-
timestamp_ms: int,
328-
late_by_ms: int,
329-
) -> None:
330-
ctx = message_context()
331-
to_log = True
332-
333-
# Trigger the "on_late" callback if provided
334-
if self._on_late:
335-
to_log = self._on_late(
336-
value,
337-
key,
338-
timestamp_ms,
339-
late_by_ms,
340-
start,
341-
end,
342-
self._name,
343-
ctx.topic,
344-
ctx.partition,
345-
ctx.offset,
346-
)
347-
if to_log:
348-
logger.warning(
349-
"Skipping session processing for the closed session "
350-
f"timestamp_ms={timestamp_ms} "
351-
f"session={(start, end)} "
352-
f"late_by_ms={late_by_ms} "
353-
f"store_name={self._name} "
354-
f"partition={ctx.topic}[{ctx.partition}] "
355-
f"offset={ctx.offset}"
356-
)
357-
358252

359253
class SessionWindowSingleAggregation(SingleAggregationWindowMixin, SessionWindow):
360254
pass

quixstreams/dataframe/windows/sliding.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
SingleAggregationWindowMixin,
88
WindowKeyResult,
99
)
10-
from .time_based import ClosingStrategyValues, TimeWindow
10+
from .time_based import ClosingStrategyValues, FixedTimeWindow
1111

1212
if TYPE_CHECKING:
1313
from quixstreams.dataframe.dataframe import StreamingDataFrame
1414

1515

16-
class SlidingWindow(TimeWindow):
16+
class SlidingWindow(FixedTimeWindow):
1717
def final(
1818
self, closing_strategy: ClosingStrategyValues = "key"
1919
) -> "StreamingDataFrame":

quixstreams/dataframe/windows/time_based.py

Lines changed: 57 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,12 @@ def new(cls, value: str) -> "ClosingStrategy":
4040
class TimeWindow(Window):
4141
def __init__(
4242
self,
43-
duration_ms: int,
44-
grace_ms: int,
4543
name: str,
4644
dataframe: "StreamingDataFrame",
47-
step_ms: Optional[int] = None,
4845
on_late: Optional[WindowOnLateCallback] = None,
4946
):
50-
super().__init__(
51-
name=name,
52-
dataframe=dataframe,
53-
)
54-
55-
self._duration_ms = duration_ms
56-
self._grace_ms = grace_ms
57-
self._step_ms = step_ms
47+
super().__init__(name=name, dataframe=dataframe)
5848
self._on_late = on_late
59-
6049
self._closing_strategy = ClosingStrategy.KEY
6150

6251
def final(
@@ -122,6 +111,60 @@ def current(
122111
self._closing_strategy = ClosingStrategy.new(closing_strategy)
123112
return super().current()
124113

114+
def _on_expired_window(
115+
self,
116+
value: Any,
117+
key: Any,
118+
start: int,
119+
end: int,
120+
timestamp_ms: int,
121+
late_by_ms: int,
122+
) -> None:
123+
ctx = message_context()
124+
to_log = True
125+
# Trigger the "on_late" callback if provided.
126+
# Log the lateness warning if the callback returns True
127+
if self._on_late:
128+
to_log = self._on_late(
129+
value,
130+
key,
131+
timestamp_ms,
132+
late_by_ms,
133+
start,
134+
end,
135+
self._name,
136+
ctx.topic,
137+
ctx.partition,
138+
ctx.offset,
139+
)
140+
if to_log:
141+
logger.warning(
142+
"Skipping window processing for the closed window "
143+
f"timestamp_ms={timestamp_ms} "
144+
f"window={(start, end)} "
145+
f"late_by_ms={late_by_ms} "
146+
f"store_name={self._name} "
147+
f"partition={ctx.topic}[{ctx.partition}] "
148+
f"offset={ctx.offset}"
149+
)
150+
151+
152+
class FixedTimeWindow(TimeWindow):
153+
def __init__(
154+
self,
155+
duration_ms: int,
156+
grace_ms: int,
157+
name: str,
158+
dataframe: "StreamingDataFrame",
159+
step_ms: Optional[int] = None,
160+
on_late: Optional[WindowOnLateCallback] = None,
161+
):
162+
super().__init__(name=name, dataframe=dataframe, on_late=on_late)
163+
164+
self._duration_ms = duration_ms
165+
self._grace_ms = grace_ms
166+
self._step_ms = step_ms
167+
125168
def process_window(
126169
self,
127170
value: Any,
@@ -233,47 +276,10 @@ def expire_by_key(
233276
):
234277
yield (key, self._results(aggregated, collected, window_start, window_end))
235278

236-
def _on_expired_window(
237-
self,
238-
value: Any,
239-
key: Any,
240-
start: int,
241-
end: int,
242-
timestamp_ms: int,
243-
late_by_ms: int,
244-
) -> None:
245-
ctx = message_context()
246-
to_log = True
247-
# Trigger the "on_late" callback if provided.
248-
# Log the lateness warning if the callback returns True
249-
if self._on_late:
250-
to_log = self._on_late(
251-
value,
252-
key,
253-
timestamp_ms,
254-
late_by_ms,
255-
start,
256-
end,
257-
self._name,
258-
ctx.topic,
259-
ctx.partition,
260-
ctx.offset,
261-
)
262-
if to_log:
263-
logger.warning(
264-
"Skipping window processing for the closed window "
265-
f"timestamp_ms={timestamp_ms} "
266-
f"window={(start, end)} "
267-
f"late_by_ms={late_by_ms} "
268-
f"store_name={self._name} "
269-
f"partition={ctx.topic}[{ctx.partition}] "
270-
f"offset={ctx.offset}"
271-
)
272-
273279

274-
class TimeWindowSingleAggregation(SingleAggregationWindowMixin, TimeWindow):
280+
class FixedTimeWindowSingleAggregation(SingleAggregationWindowMixin, FixedTimeWindow):
275281
pass
276282

277283

278-
class TimeWindowMultiAggregation(MultiAggregationWindowMixin, TimeWindow):
284+
class FixedTimeWindowMultiAggregation(MultiAggregationWindowMixin, FixedTimeWindow):
279285
pass

0 commit comments

Comments
 (0)