7
7
8
8
@pytest .fixture ()
9
9
def session_window_definition_factory (state_manager , dataframe_factory ):
10
- def factory (timeout_ms : int , grace_ms : int = 0 ) -> SessionWindowDefinition :
10
+ def factory (inactivity_gap_ms : int , grace_ms : int = 0 ) -> SessionWindowDefinition :
11
11
sdf = dataframe_factory (
12
12
state_manager = state_manager , registry = DataFrameRegistry ()
13
13
)
14
14
window_def = SessionWindowDefinition (
15
- timeout_ms = timeout_ms , grace_ms = grace_ms , dataframe = sdf
15
+ inactivity_gap_ms = inactivity_gap_ms , grace_ms = grace_ms , dataframe = sdf
16
16
)
17
17
return window_def
18
18
@@ -51,7 +51,7 @@ def test_session_window_definition_get_name(
51
51
dataframe_factory ,
52
52
):
53
53
swd = SessionWindowDefinition (
54
- timeout_ms = timeout ,
54
+ inactivity_gap_ms = timeout ,
55
55
grace_ms = grace ,
56
56
dataframe = dataframe_factory (),
57
57
name = provided_name ,
@@ -64,7 +64,9 @@ def test_multiaggregation(
64
64
session_window_definition_factory ,
65
65
state_manager ,
66
66
):
67
- window = session_window_definition_factory (timeout_ms = 10000 , grace_ms = 1000 ).agg (
67
+ window = session_window_definition_factory (
68
+ inactivity_gap_ms = 10000 , grace_ms = 1000
69
+ ).agg (
68
70
count = agg .Count (),
69
71
sum = agg .Sum (),
70
72
mean = agg .Mean (),
@@ -160,7 +162,9 @@ def test_multiaggregation(
160
162
def test_sessionwindow_count (
161
163
self , expiration , session_window_definition_factory , state_manager
162
164
):
163
- window_def = session_window_definition_factory (timeout_ms = 10000 , grace_ms = 1000 )
165
+ window_def = session_window_definition_factory (
166
+ inactivity_gap_ms = 10000 , grace_ms = 1000
167
+ )
164
168
window = window_def .count ()
165
169
assert window .name == "session_window_10000_count"
166
170
@@ -185,7 +189,9 @@ def test_sessionwindow_count(
185
189
def test_sessionwindow_sum (
186
190
self , expiration , session_window_definition_factory , state_manager
187
191
):
188
- window_def = session_window_definition_factory (timeout_ms = 10000 , grace_ms = 1000 )
192
+ window_def = session_window_definition_factory (
193
+ inactivity_gap_ms = 10000 , grace_ms = 1000
194
+ )
189
195
window = window_def .sum ()
190
196
assert window .name == "session_window_10000_sum"
191
197
@@ -208,7 +214,9 @@ def test_sessionwindow_sum(
208
214
def test_sessionwindow_mean (
209
215
self , expiration , session_window_definition_factory , state_manager
210
216
):
211
- window_def = session_window_definition_factory (timeout_ms = 10000 , grace_ms = 1000 )
217
+ window_def = session_window_definition_factory (
218
+ inactivity_gap_ms = 10000 , grace_ms = 1000
219
+ )
212
220
window = window_def .mean ()
213
221
assert window .name == "session_window_10000_mean"
214
222
@@ -231,7 +239,9 @@ def test_sessionwindow_mean(
231
239
def test_sessionwindow_reduce (
232
240
self , expiration , session_window_definition_factory , state_manager
233
241
):
234
- window_def = session_window_definition_factory (timeout_ms = 10000 , grace_ms = 1000 )
242
+ window_def = session_window_definition_factory (
243
+ inactivity_gap_ms = 10000 , grace_ms = 1000
244
+ )
235
245
window = window_def .reduce (
236
246
reducer = lambda agg , current : agg + [current ],
237
247
initializer = lambda value : [value ],
@@ -257,7 +267,9 @@ def test_sessionwindow_reduce(
257
267
def test_sessionwindow_max (
258
268
self , expiration , session_window_definition_factory , state_manager
259
269
):
260
- window_def = session_window_definition_factory (timeout_ms = 10000 , grace_ms = 1000 )
270
+ window_def = session_window_definition_factory (
271
+ inactivity_gap_ms = 10000 , grace_ms = 1000
272
+ )
261
273
window = window_def .max ()
262
274
assert window .name == "session_window_10000_max"
263
275
@@ -280,7 +292,9 @@ def test_sessionwindow_max(
280
292
def test_sessionwindow_min (
281
293
self , expiration , session_window_definition_factory , state_manager
282
294
):
283
- window_def = session_window_definition_factory (timeout_ms = 10000 , grace_ms = 1000 )
295
+ window_def = session_window_definition_factory (
296
+ inactivity_gap_ms = 10000 , grace_ms = 1000
297
+ )
284
298
window = window_def .min ()
285
299
assert window .name == "session_window_10000_min"
286
300
@@ -303,7 +317,9 @@ def test_sessionwindow_min(
303
317
def test_sessionwindow_collect (
304
318
self , expiration , session_window_definition_factory , state_manager
305
319
):
306
- window_def = session_window_definition_factory (timeout_ms = 10000 , grace_ms = 1000 )
320
+ window_def = session_window_definition_factory (
321
+ inactivity_gap_ms = 10000 , grace_ms = 1000
322
+ )
307
323
window = window_def .collect ()
308
324
assert window .name == "session_window_10000_collect"
309
325
@@ -335,7 +351,7 @@ def test_session_window_def_init_invalid(
335
351
):
336
352
with pytest .raises (ValueError ):
337
353
SessionWindowDefinition (
338
- timeout_ms = timeout ,
354
+ inactivity_gap_ms = timeout ,
339
355
grace_ms = grace ,
340
356
name = name ,
341
357
dataframe = dataframe_factory (),
@@ -344,7 +360,7 @@ def test_session_window_def_init_invalid(
344
360
def test_session_window_def_init_invalid_type (self , dataframe_factory ):
345
361
with pytest .raises (TypeError ):
346
362
SessionWindowDefinition (
347
- timeout_ms = "invalid" , # should be int
363
+ inactivity_gap_ms = "invalid" , # should be int
348
364
grace_ms = 1000 ,
349
365
name = "test" ,
350
366
dataframe = dataframe_factory (),
@@ -358,7 +374,9 @@ def test_session_window_process_timeout_behavior(
358
374
state_manager ,
359
375
):
360
376
"""Test that sessions properly timeout and new sessions start correctly"""
361
- window_def = session_window_definition_factory (timeout_ms = 5000 , grace_ms = 0 )
377
+ window_def = session_window_definition_factory (
378
+ inactivity_gap_ms = 5000 , grace_ms = 0
379
+ )
362
380
window = window_def .sum ()
363
381
window .final (closing_strategy = expiration )
364
382
store = state_manager .get_store (stream_id = "test" , store_name = window .name )
@@ -404,7 +422,9 @@ def test_session_window_grace_period(
404
422
self , session_window_definition_factory , state_manager
405
423
):
406
424
"""Test that grace period allows late events"""
407
- window_def = session_window_definition_factory (timeout_ms = 5000 , grace_ms = 2000 )
425
+ window_def = session_window_definition_factory (
426
+ inactivity_gap_ms = 5000 , grace_ms = 2000
427
+ )
408
428
window = window_def .sum ()
409
429
window .final (closing_strategy = "key" )
410
430
store = state_manager .get_store (stream_id = "test" , store_name = window .name )
@@ -441,7 +461,9 @@ def test_session_window_multiple_keys(
441
461
self , session_window_definition_factory , state_manager
442
462
):
443
463
"""Test that different keys maintain separate sessions"""
444
- window_def = session_window_definition_factory (timeout_ms = 5000 , grace_ms = 0 )
464
+ window_def = session_window_definition_factory (
465
+ inactivity_gap_ms = 5000 , grace_ms = 0
466
+ )
445
467
window = window_def .sum ()
446
468
window .final (closing_strategy = "key" )
447
469
store = state_manager .get_store (stream_id = "test" , store_name = window .name )
@@ -490,7 +512,9 @@ def test_session_partition_expiration(
490
512
self , session_window_definition_factory , state_manager
491
513
):
492
514
"""Test partition-level session expiration"""
493
- window_def = session_window_definition_factory (timeout_ms = 5000 , grace_ms = 1000 )
515
+ window_def = session_window_definition_factory (
516
+ inactivity_gap_ms = 5000 , grace_ms = 1000
517
+ )
494
518
window = window_def .sum ()
495
519
window .final (closing_strategy = "partition" )
496
520
store = state_manager .get_store (stream_id = "test" , store_name = window .name )
@@ -528,7 +552,9 @@ def test_session_window_late_events(
528
552
self , session_window_definition_factory , state_manager
529
553
):
530
554
"""Test handling of late events that arrive after session closure"""
531
- window_def = session_window_definition_factory (timeout_ms = 5000 , grace_ms = 1000 )
555
+ window_def = session_window_definition_factory (
556
+ inactivity_gap_ms = 5000 , grace_ms = 1000
557
+ )
532
558
window = window_def .sum ()
533
559
window .final (closing_strategy = "key" )
534
560
store = state_manager .get_store (stream_id = "test" , store_name = window .name )
@@ -560,7 +586,9 @@ def test_session_window_current_mode(
560
586
self , session_window_definition_factory , state_manager
561
587
):
562
588
"""Test session window with current() mode"""
563
- window_def = session_window_definition_factory (timeout_ms = 5000 , grace_ms = 0 )
589
+ window_def = session_window_definition_factory (
590
+ inactivity_gap_ms = 5000 , grace_ms = 0
591
+ )
564
592
window = window_def .sum ()
565
593
window .current (closing_strategy = "key" )
566
594
store = state_manager .get_store (stream_id = "test" , store_name = window .name )
@@ -588,7 +616,9 @@ def test_session_window_overlapping_sessions(
588
616
self , session_window_definition_factory , state_manager
589
617
):
590
618
"""Test that sessions don't overlap for the same key"""
591
- window_def = session_window_definition_factory (timeout_ms = 10000 , grace_ms = 0 )
619
+ window_def = session_window_definition_factory (
620
+ inactivity_gap_ms = 10000 , grace_ms = 0
621
+ )
592
622
window = window_def .sum ()
593
623
window .final (closing_strategy = "key" )
594
624
store = state_manager .get_store (stream_id = "test" , store_name = window .name )
@@ -624,7 +654,9 @@ def test_session_window_merge_sessions(
624
654
self , session_window_definition_factory , state_manager
625
655
):
626
656
"""Test that an event can merge two previously separate sessions"""
627
- window_def = session_window_definition_factory (timeout_ms = 10000 , grace_ms = 1000 )
657
+ window_def = session_window_definition_factory (
658
+ inactivity_gap_ms = 10000 , grace_ms = 1000
659
+ )
628
660
window = window_def .sum ()
629
661
window .final (closing_strategy = "key" )
630
662
store = state_manager .get_store (stream_id = "test" , store_name = window .name )
@@ -707,7 +739,9 @@ def test_session_window_bridging_event_scenario(
707
739
Current behavior: Session A gets extended, Session B remains separate
708
740
Ideal behavior: Sessions A and B get merged when bridging event arrives
709
741
"""
710
- window_def = session_window_definition_factory (timeout_ms = 10000 , grace_ms = 2000 )
742
+ window_def = session_window_definition_factory (
743
+ inactivity_gap_ms = 10000 , grace_ms = 2000
744
+ )
711
745
window = window_def .sum ()
712
746
window .final (closing_strategy = "key" )
713
747
store = state_manager .get_store (stream_id = "test" , store_name = window .name )
@@ -777,7 +811,9 @@ def test_session_window_string_key_extension(
777
811
`transaction.delete_window()` was called with a string key instead of
778
812
the properly serialized bytes prefix.
779
813
"""
780
- window_def = session_window_definition_factory (timeout_ms = 10000 , grace_ms = 1000 )
814
+ window_def = session_window_definition_factory (
815
+ inactivity_gap_ms = 10000 , grace_ms = 1000
816
+ )
781
817
window = window_def .sum ()
782
818
window .final (closing_strategy = "key" )
783
819
store = state_manager .get_store (stream_id = "test" , store_name = window .name )
0 commit comments