@@ -93,6 +93,20 @@ def _create_test_stream(elements: list[int]):
9393 return test_stream
9494
9595
96+ def _convert_timestamp_to_int (has_key = False ):
97+ if has_key :
98+ return beam .MapTuple (
99+ lambda key , value : (
100+ key ,
101+ ((int (value [0 ][0 ].micros // 1e6 ), int (value [0 ][1 ].micros // 1e6 )),
102+ [(int (t .micros // 1e6 ), v ) for t , v in value [1 ]])))
103+
104+ return beam .MapTuple (
105+ lambda window , elements :
106+ ((int (window [0 ].micros // 1e6 ), int (window [1 ].micros // 1e6 )),
107+ [(int (t .micros // 1e6 ), v ) for t , v in elements ]))
108+
109+
96110_go_installed = shutil .which ('go' ) is not None
97111_in_windows = sys .platform == "win32"
98112
@@ -140,14 +154,28 @@ def test_default(self):
140154 WINDOW_SIZE ,
141155 stop_timestamp = 13 ,
142156 buffer_state_type = self .buffer_state_type ))
143- result = _maybe_log_elements (result )
157+ result = _maybe_log_elements (result ) | _convert_timestamp_to_int ()
144158 assert_that (result , equal_to ([
145- [ 0 , 1 , 2 ] ,
146- [ 3 , 4 , 5 ] ,
147- [ 6 , 7 , 8 ] ,
148- [ 9 ] ,
159+ (( 0 , 3 ), [( 0 , 0 ), ( 1 , 1 ), ( 2 , 2 )]) ,
160+ (( 3 , 6 ), [( 3 , 3 ), ( 4 , 4 ), ( 5 , 5 )]) ,
161+ (( 6 , 9 ), [( 6 , 6 ), ( 7 , 7 ), ( 8 , 8 )]) ,
162+ (( 9 , 12 ), [( 9 , 9 )]) ,
149163 ]))
150164
165+ def test_offset (self ):
166+ with TestPipeline (options = self .options ) as p :
167+ result = (
168+ p | _create_test_stream ([2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ])
169+ | OrderedWindowElements (WINDOW_SIZE , stop_timestamp = 13 , offset = 2 ))
170+ result = _maybe_log_elements (result ) | _convert_timestamp_to_int ()
171+ assert_that (
172+ result ,
173+ equal_to ([
174+ ((2 , 5 ), [(2 , 2 ), (3 , 3 ), (4 , 4 )]), # window start at 2
175+ ((5 , 8 ), [(5 , 5 ), (6 , 6 ), (7 , 7 )]),
176+ ((8 , 11 ), [(8 , 8 ), (9 , 9 )])
177+ ]))
178+
151179 def test_slide_interval (self ):
152180 with TestPipeline (options = self .options ) as p :
153181 result = (
@@ -157,16 +185,18 @@ def test_slide_interval(self):
157185 assert_that (
158186 result ,
159187 equal_to ([
160- [0 , 1 , 2 ],
161- [1 , 2 , 3 ],
162- [2 , 3 , 4 ],
163- [3 , 4 , 5 ],
164- [4 , 5 , 6 ],
165- [5 , 6 , 7 ],
166- [6 , 7 , 8 ],
167- [7 , 8 , 9 ],
168- [8 , 9 ],
169- [9 ],
188+ ((- 2 , 1 ), [(0 , 0 )]),
189+ ((- 1 , 2 ), [(0 , 0 ), (1 , 1 )]),
190+ ((0 , 3 ), [(0 , 0 ), (1 , 1 ), (2 , 2 )]),
191+ ((1 , 4 ), [(1 , 1 ), (2 , 2 ), (3 , 3 )]),
192+ ((2 , 5 ), [(2 , 2 ), (3 , 3 ), (4 , 4 )]),
193+ ((3 , 6 ), [(3 , 3 ), (4 , 4 ), (5 , 5 )]),
194+ ((4 , 7 ), [(4 , 4 ), (5 , 5 ), (6 , 6 )]),
195+ ((5 , 8 ), [(5 , 5 ), (6 , 6 ), (7 , 7 )]),
196+ ((6 , 9 ), [(6 , 6 ), (7 , 7 ), (8 , 8 )]),
197+ ((7 , 10 ), [(7 , 7 ), (8 , 8 ), (9 , 9 )]),
198+ ((8 , 11 ), [(8 , 8 ), (9 , 9 )]),
199+ ((9 , 12 ), [(9 , 9 )]),
170200 ]))
171201
172202 def test_keyed_input (self ):
@@ -175,14 +205,15 @@ def test_keyed_input(self):
175205 p | _create_test_stream ([1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ])
176206 | beam .WithKeys ("my_key" ) # key is present in the output
177207 | OrderedWindowElements (WINDOW_SIZE , stop_timestamp = 13 ))
178- result = _maybe_log_elements (result )
208+ result = _maybe_log_elements (result ) | _convert_timestamp_to_int (
209+ has_key = True )
179210 assert_that (
180211 result ,
181212 equal_to ([
182- ("my_key" , [ 1 , 2 ] ),
183- ("my_key" , [ 3 , 4 , 5 ] ),
184- ("my_key" , [ 6 , 7 , 8 ] ),
185- ("my_key" , [ 9 , 10 ] ),
213+ ("my_key" , (( 0 , 3 ), [( 1 , 1 ), ( 2 , 2 )]) ),
214+ ("my_key" , (( 3 , 6 ), [( 3 , 3 ), ( 4 , 4 ), ( 5 , 5 )]) ),
215+ ("my_key" , (( 6 , 9 ), [( 6 , 6 ), ( 7 , 7 ), ( 8 , 8 )]) ),
216+ ("my_key" , (( 9 , 12 ), [( 9 , 9 ), ( 10 , 10 )]) ),
186217 ]))
187218
188219 @parameterized .expand ([
@@ -192,18 +223,18 @@ def test_keyed_input(self):
192223 def test_non_zero_offset_and_default_value (self , fill_window_start ):
193224 if fill_window_start :
194225 expected = [
195- [ - 100 ,
196- 0 ], # window [ -2, 1 ), and the start is filled with default value
197- [ 1 , 2 , 3 ] , # window [1, 4)
198- [ 4 , 5 , 6 ] ,
199- [ 7 , 8 , 9 ] ,
226+ # window [-2, 1), and the start is filled with default value
227+ (( - 2 , 1 ), [( - 2 , - 100 ), ( 0 , 0 )]),
228+ (( 1 , 4 ), [( 1 , 1 ), ( 2 , 2 ), ( 3 , 3 )]) , # window [1, 4)
229+ (( 4 , 7 ), [( 4 , 4 ), ( 5 , 5 ), ( 6 , 6 )]) ,
230+ (( 7 , 10 ), [( 7 , 7 ), ( 8 , 8 ), ( 9 , 9 )]) ,
200231 ]
201232 else :
202233 expected = [
203- [ 0 ] , # window [-2, 1)
204- [ 1 , 2 , 3 ] , # window [1, 4)
205- [ 4 , 5 , 6 ] ,
206- [ 7 , 8 , 9 ] ,
234+ (( - 2 , 1 ), [( 0 , 0 )]) , # window [-2, 1)
235+ (( 1 , 4 ), [( 1 , 1 ), ( 2 , 2 ), ( 3 , 3 )]) , # window [1, 4)
236+ (( 4 , 7 ), [( 4 , 4 ), ( 5 , 5 ), ( 6 , 6 )]) ,
237+ (( 7 , 10 ), [( 7 , 7 ), ( 8 , 8 ), ( 9 , 9 )]) ,
207238 ]
208239
209240 with TestPipeline (options = self .options ) as p :
@@ -215,7 +246,7 @@ def test_non_zero_offset_and_default_value(self, fill_window_start):
215246 default_start_value = - 100 ,
216247 fill_start_if_missing = fill_window_start ,
217248 stop_timestamp = 13 ))
218- result = _maybe_log_elements (result )
249+ result = _maybe_log_elements (result ) | _convert_timestamp_to_int ()
219250 assert_that (result , equal_to (expected ))
220251
221252 @parameterized .expand ([
@@ -225,23 +256,23 @@ def test_non_zero_offset_and_default_value(self, fill_window_start):
225256 def test_ordered_data_with_gap (self , fill_window_start ):
226257 if fill_window_start :
227258 expected = [
228- [ 0 , 1 , 2 ] ,
229- [ 3 , 4 ] ,
230- [ 4 ] , # window [6, 9) is empty, so the start is filled. Same as below .
231- [ 4 ] , # window [9, 12) is empty
232- [ 4 ] , # window [12, 15) is empty
233- [ 4 , 16 , 17 ], # window [ 15, 18) misses the start as well.
234- [ 18 , 19 , 20 ],
259+ (( 0 , 3 ), [( 0 , 0 ), ( 1 , 1 ), ( 2 , 2 )]) ,
260+ (( 3 , 6 ), [( 3 , 3 ), ( 4 , 4 )]) ,
261+ (( 6 , 9 ), [( 6 , 4 )]) , # window [6, 9) is empty, so the start is filled.
262+ (( 9 , 12 ), [( 9 , 4 )]) , # window [9, 12) is empty, so the start is filled.
263+ (( 12 , 15 ), [( 12 , 4 )]) , # window [12, 15) is empty, so the start is filled.
264+ (( 15 , 18 ), [( 15 , 4 ), ( 16 , 16 ), ( 17 , 17 )]),
265+ (( 18 , 21 ), [( 18 , 18 ), ( 19 , 19 ), ( 20 , 20 )])
235266 ]
236267 else :
237268 expected = [
238- [ 0 , 1 , 2 ] ,
239- [ 3 , 4 ] ,
240- [] , # window [6, 9) is empty
241- [] , # window [9, 12) is empty
242- [] , # window [12, 15) is empty
243- [ 16 , 17 ] ,
244- [ 18 , 19 , 20 ],
269+ (( 0 , 3 ), [( 0 , 0 ), ( 1 , 1 ), ( 2 , 2 )]) ,
270+ (( 3 , 6 ), [( 3 , 3 ), ( 4 , 4 )]) ,
271+ (( 6 , 9 ), []) , # window [6, 9) is empty
272+ (( 9 , 12 ), []) , # window [9, 12) is empty
273+ (( 12 , 15 ), []) , # window [12, 15) is empty
274+ (( 15 , 18 ), [( 16 , 16 ), ( 17 , 17 )]) ,
275+ (( 18 , 21 ), [( 18 , 18 ), ( 19 , 19 ), ( 20 , 20 )])
245276 ]
246277 with TestPipeline (options = self .options ) as p :
247278 result = (
@@ -250,22 +281,22 @@ def test_ordered_data_with_gap(self, fill_window_start):
250281 WINDOW_SIZE ,
251282 fill_start_if_missing = fill_window_start ,
252283 stop_timestamp = 23 ))
253- result = _maybe_log_elements (result )
284+ result = _maybe_log_elements (result ) | _convert_timestamp_to_int ()
254285 assert_that (result , equal_to (expected ))
255286
256287 def test_single_late_data_with_no_allowed_lateness (self ):
257288 with TestPipeline (options = self .options ) as p :
258289 result = (
259290 p | _create_test_stream ([0 , 1 , 2 , 3 , 4 , 6 , 7 , 8 , 9 , 5 ])
260291 | OrderedWindowElements (WINDOW_SIZE , stop_timestamp = 13 ))
261- result = _maybe_log_elements (result )
292+ result = _maybe_log_elements (result ) | _convert_timestamp_to_int ()
262293 assert_that (
263294 result ,
264295 equal_to ([
265- [ 0 , 1 , 2 ] ,
266- [ 3 , 4 ] , # 5 is late and discarded
267- [ 6 , 7 , 8 ] ,
268- [ 9 ] ,
296+ (( 0 , 3 ), [( 0 , 0 ), ( 1 , 1 ), ( 2 , 2 )]) ,
297+ (( 3 , 6 ), [( 3 , 3 ), ( 4 , 4 )]) , # 5 is late and discarded
298+ (( 6 , 9 ), [( 6 , 6 ), ( 7 , 7 ), ( 8 , 8 )]) ,
299+ (( 9 , 12 ), [( 9 , 9 )]) ,
269300 ]))
270301
271302 def test_single_late_data_with_allowed_lateness (self ):
@@ -274,16 +305,16 @@ def test_single_late_data_with_allowed_lateness(self):
274305 p | _create_test_stream ([0 , 1 , 2 , 3 , 4 , 6 , 7 , 8 , 9 , 5 ])
275306 | OrderedWindowElements (
276307 WINDOW_SIZE , allowed_lateness = 4 , stop_timestamp = 17 ))
277- result = _maybe_log_elements (result )
308+ result = _maybe_log_elements (result ) | _convert_timestamp_to_int ()
278309 assert_that (
279310 result ,
280311 equal_to ([
281- [ 0 , 1 , 2 ] ,
312+ (( 0 , 3 ), [( 0 , 0 ), ( 1 , 1 ), ( 2 , 2 )]) ,
282313 # allow late data up to:
283314 # 9 (watermark before late data) - 4 (allowed lateness) = 5
284- [ 3 , 4 , 5 ] ,
285- [ 6 , 7 , 8 ] ,
286- [ 9 ] ,
315+ (( 3 , 6 ), [( 3 , 3 ), ( 4 , 4 ), ( 5 , 5 )]) ,
316+ (( 6 , 9 ), [( 6 , 6 ), ( 7 , 7 ), ( 8 , 8 )]) ,
317+ (( 9 , 12 ), [( 9 , 9 )]) ,
287318 ]))
288319
289320 @parameterized .expand ([
@@ -295,19 +326,19 @@ def test_reversed_ordered_data_with_allowed_lateness(self, fill_start):
295326 expected = [
296327 # allow late data up to:
297328 # 9 (watermark before late data) - 5 (allowed lateness) = 4
298- [ None , 4 , 5 ] ,
299- [ 6 , 7 , 8 ] ,
300- [ 9 ] ,
301- [ 9 ] ,
302- [ 9 ] ,
329+ (( 3 , 6 ), [( 3 , None ), ( 4 , 4 ), ( 5 , 5 )]) ,
330+ (( 6 , 9 ), [( 6 , 6 ), ( 7 , 7 ), ( 8 , 8 )]) ,
331+ (( 9 , 12 ), [( 9 , 9 )]) ,
332+ (( 12 , 15 ), [( 12 , 9 )]) ,
333+ (( 15 , 18 ), [( 15 , 9 )]) ,
303334 ]
304335 else :
305336 expected = [
306- [ 4 , 5 ] ,
307- [ 6 , 7 , 8 ] ,
308- [ 9 ] ,
309- [] ,
310- [] ,
337+ (( 3 , 6 ), [( 4 , 4 ), ( 5 , 5 )]) ,
338+ (( 6 , 9 ), [( 6 , 6 ), ( 7 , 7 ), ( 8 , 8 )]) ,
339+ (( 9 , 12 ), [( 9 , 9 )]) ,
340+ (( 12 , 15 ), []) ,
341+ (( 15 , 18 ), []) ,
311342 ]
312343 with TestPipeline (options = self .options ) as p :
313344 result = (
@@ -317,7 +348,7 @@ def test_reversed_ordered_data_with_allowed_lateness(self, fill_start):
317348 fill_start_if_missing = fill_start ,
318349 allowed_lateness = 5 ,
319350 stop_timestamp = 25 ))
320- result = _maybe_log_elements (result )
351+ result = _maybe_log_elements (result ) | _convert_timestamp_to_int ()
321352 assert_that (result , equal_to (expected ))
322353
323354 def test_multiple_late_data_with_allowed_lateness (self ):
@@ -330,29 +361,31 @@ def test_multiple_late_data_with_allowed_lateness(self):
330361 allowed_lateness = 6 ,
331362 fill_start_if_missing = True ,
332363 stop_timestamp = 28 ))
333- result = _maybe_log_elements (result )
364+ result = _maybe_log_elements (result ) | _convert_timestamp_to_int ()
365+ # yapf: disable
334366 assert_that (
335367 result ,
336368 equal_to ([
337- [ 1 , 2 , 3 ] ,
338- [ 2 , 3 ] ,
339- [ 3 ] ,
340- [ 3 ] ,
341- [ 3 ] ,
342- [ 3 ] ,
343- [ 3 , 9 ] ,
344- [ 3 , 9 ] ,
345- [ 9 ] ,
346- [ 9 , 12 ] ,
347- [ 9 , 12 ] ,
348- [ 12 , 14 ] ,
349- [ 12 , 14 ] ,
350- [ 14 , 16 ] ,
351- [ 14 , 16 , 17 ] ,
352- [ 16 , 17 ] ,
353- [ 17 ] ,
354- [ 17 ],
369+ (( - 1 , 2 ), [( - 1 , None ), ( 1 , 1 )]) ,
370+ (( 0 , 3 ), [( 0 , None ), ( 1 , 1 ), ( 2 , 2 )]) ,
371+ (( 1 , 4 ), [( 1 , 1 ), ( 2 , 2 ), ( 3 , 3 )]) ,
372+ (( 2 , 5 ), [( 2 , 2 ), ( 3 , 3 )]), (( 3 , 6 ), [( 3 , 3 )]) ,
373+ (( 4 , 7 ), [( 4 , 3 )]) ,
374+ (( 5 , 8 ), [( 5 , 3 )]) ,
375+ (( 6 , 9 ), [( 6 , 3 )]) ,
376+ (( 7 , 10 ), [( 7 , 3 ), ( 9 , 9 )]) ,
377+ (( 8 , 11 ), [( 8 , 3 ), ( 9 , 9 )]) ,
378+ (( 9 , 12 ), [( 9 , 9 )]) ,
379+ (( 10 , 13 ), [( 10 , 9 ), ( 12 , 12 )]) ,
380+ (( 11 , 14 ), [( 11 , 9 ), ( 12 , 12 )]) ,
381+ (( 12 , 15 ), [( 12 , 12 ), ( 14 , 14 )]) ,
382+ (( 13 , 16 ), [( 13 , 12 ), ( 14 , 14 )]) ,
383+ (( 14 , 17 ), [( 14 , 14 ), ( 16 , 16 )]) ,
384+ (( 15 , 18 ), [( 15 , 14 ), ( 16 , 16 ),( 17 , 17 )]) ,
385+ (( 16 , 19 ), [( 16 , 16 ), ( 17 , 17 )]) ,
386+ (( 17 , 20 ), [( 17 , 17 )]), (( 18 , 21 ), [( 18 , 17 )])
355387 ]))
388+ # yapf: enable
356389
357390
358391if __name__ == '__main__' :
0 commit comments