17
17
from typing_extensions import TypeAlias
18
18
19
19
from quixstreams .context import message_context
20
- from quixstreams .core .stream import TransformExpandedCallback
20
+ from quixstreams .core .stream import (
21
+ Stream ,
22
+ TransformExpandedCallback ,
23
+ TransformFunction ,
24
+ )
21
25
from quixstreams .core .stream .exceptions import InvalidOperation
22
26
from quixstreams .models .topics .manager import TopicManager
23
27
from quixstreams .state import WindowedPartitionTransaction
42
46
Iterable [Message ],
43
47
]
44
48
49
+ WallClockCallback = Callable [[WindowedPartitionTransaction ], Iterable [Message ]]
50
+
45
51
46
52
class Window (abc .ABC ):
47
53
def __init__ (
@@ -69,6 +75,13 @@ def process_window(
69
75
) -> tuple [Iterable [WindowKeyResult ], Iterable [WindowKeyResult ]]:
70
76
pass
71
77
78
+ @abstractmethod
79
+ def process_wall_clock (
80
+ self ,
81
+ transaction : WindowedPartitionTransaction ,
82
+ ) -> Iterable [WindowKeyResult ]:
83
+ pass
84
+
72
85
def register_store (self ) -> None :
73
86
TopicManager .ensure_topics_copartitioned (* self ._dataframe .topics )
74
87
# Create a config for the changelog topic based on the underlying SDF topics
@@ -83,6 +96,7 @@ def _apply_window(
83
96
self ,
84
97
func : TransformRecordCallbackExpandedWindowed ,
85
98
name : str ,
99
+ wall_clock_func : WallClockCallback ,
86
100
) -> "StreamingDataFrame" :
87
101
self .register_store ()
88
102
@@ -92,12 +106,24 @@ def _apply_window(
92
106
processing_context = self ._dataframe .processing_context ,
93
107
store_name = name ,
94
108
)
109
+ wall_clock_transform_func = _as_wall_clock (
110
+ func = wall_clock_func ,
111
+ stream_id = self ._dataframe .stream_id ,
112
+ processing_context = self ._dataframe .processing_context ,
113
+ store_name = name ,
114
+ )
95
115
# Manually modify the Stream and clone the source StreamingDataFrame
96
116
# to avoid adding "transform" API to it.
97
117
# Transform callbacks can modify record key and timestamp,
98
118
# and it's prone to misuse.
99
- stream = self ._dataframe .stream .add_transform (func = windowed_func , expand = True )
100
- return self ._dataframe .__dataframe_clone__ (stream = stream )
119
+ windowed_stream = self ._dataframe .stream .add_transform (
120
+ func = windowed_func , expand = True
121
+ )
122
+ wall_clock_stream = Stream (
123
+ func = TransformFunction (wall_clock_transform_func , expand = True )
124
+ )
125
+ sdf = self ._dataframe .__dataframe_clone__ (stream = windowed_stream )
126
+ return sdf .concat_wall_clock (wall_clock_stream )
101
127
102
128
def final (self ) -> "StreamingDataFrame" :
103
129
"""
@@ -140,9 +166,17 @@ def window_callback(
140
166
for key , window in expired_windows :
141
167
yield (window , key , window ["start" ], None )
142
168
169
+ def wall_clock_callback (
170
+ transaction : WindowedPartitionTransaction ,
171
+ ) -> Iterable [Message ]:
172
+ # TODO: Check if this will work for sliding windows
173
+ for key , window in self .process_wall_clock (transaction ):
174
+ yield (window , key , window ["start" ], None )
175
+
143
176
return self ._apply_window (
144
177
func = window_callback ,
145
178
name = self ._name ,
179
+ wall_clock_func = wall_clock_callback ,
146
180
)
147
181
148
182
def current (self ) -> "StreamingDataFrame" :
@@ -188,7 +222,17 @@ def window_callback(
188
222
for key , window in updated_windows :
189
223
yield (window , key , window ["start" ], None )
190
224
191
- return self ._apply_window (func = window_callback , name = self ._name )
225
+ def wall_clock_callback (
226
+ transaction : WindowedPartitionTransaction ,
227
+ ) -> Iterable [Message ]:
228
+ # TODO: Implement wall_clock callback
229
+ return []
230
+
231
+ return self ._apply_window (
232
+ func = window_callback ,
233
+ name = self ._name ,
234
+ wall_clock_func = wall_clock_callback ,
235
+ )
192
236
193
237
# Implemented by SingleAggregationWindowMixin and MultiAggregationWindowMixin
194
238
# Single aggregation and multi aggregation windows store aggregations and collections
@@ -424,6 +468,28 @@ def wrapper(
424
468
return wrapper
425
469
426
470
471
+ def _as_wall_clock (
472
+ func : WallClockCallback ,
473
+ processing_context : "ProcessingContext" ,
474
+ store_name : str ,
475
+ stream_id : str ,
476
+ ) -> TransformExpandedCallback :
477
+ @functools .wraps (func )
478
+ def wrapper (
479
+ value : Any , key : Any , timestamp : int , headers : Any
480
+ ) -> Iterable [Message ]:
481
+ ctx = message_context ()
482
+ transaction = cast (
483
+ WindowedPartitionTransaction ,
484
+ processing_context .checkpoint .get_store_transaction (
485
+ stream_id = stream_id , partition = ctx .partition , store_name = store_name
486
+ ),
487
+ )
488
+ return func (transaction )
489
+
490
+ return wrapper
491
+
492
+
427
493
class WindowOnLateCallback (Protocol ):
428
494
def __call__ (
429
495
self ,
0 commit comments