Skip to content

Commit 892938e

Browse files
committed
Rename timeout_ms to inactivity_gap_ms in tests
1 parent 120409f commit 892938e

File tree

1 file changed

+59
-23
lines changed

1 file changed

+59
-23
lines changed

tests/test_quixstreams/test_dataframe/test_windows/test_session.py

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77

88
@pytest.fixture()
99
def session_window_definition_factory(state_manager, dataframe_factory):
10-
def factory(timeout_ms: int, grace_ms: int = 0) -> SessionWindowDefinition:
10+
def factory(inactivity_gap_ms: int, grace_ms: int = 0) -> SessionWindowDefinition:
1111
sdf = dataframe_factory(
1212
state_manager=state_manager, registry=DataFrameRegistry()
1313
)
1414
window_def = SessionWindowDefinition(
15-
timeout_ms=timeout_ms, grace_ms=grace_ms, dataframe=sdf
15+
inactivity_gap_ms=inactivity_gap_ms, grace_ms=grace_ms, dataframe=sdf
1616
)
1717
return window_def
1818

@@ -51,7 +51,7 @@ def test_session_window_definition_get_name(
5151
dataframe_factory,
5252
):
5353
swd = SessionWindowDefinition(
54-
timeout_ms=timeout,
54+
inactivity_gap_ms=timeout,
5555
grace_ms=grace,
5656
dataframe=dataframe_factory(),
5757
name=provided_name,
@@ -64,7 +64,9 @@ def test_multiaggregation(
6464
session_window_definition_factory,
6565
state_manager,
6666
):
67-
window = session_window_definition_factory(timeout_ms=10000, grace_ms=1000).agg(
67+
window = session_window_definition_factory(
68+
inactivity_gap_ms=10000, grace_ms=1000
69+
).agg(
6870
count=agg.Count(),
6971
sum=agg.Sum(),
7072
mean=agg.Mean(),
@@ -160,7 +162,9 @@ def test_multiaggregation(
160162
def test_sessionwindow_count(
161163
self, expiration, session_window_definition_factory, state_manager
162164
):
163-
window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000)
165+
window_def = session_window_definition_factory(
166+
inactivity_gap_ms=10000, grace_ms=1000
167+
)
164168
window = window_def.count()
165169
assert window.name == "session_window_10000_count"
166170

@@ -185,7 +189,9 @@ def test_sessionwindow_count(
185189
def test_sessionwindow_sum(
186190
self, expiration, session_window_definition_factory, state_manager
187191
):
188-
window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000)
192+
window_def = session_window_definition_factory(
193+
inactivity_gap_ms=10000, grace_ms=1000
194+
)
189195
window = window_def.sum()
190196
assert window.name == "session_window_10000_sum"
191197

@@ -208,7 +214,9 @@ def test_sessionwindow_sum(
208214
def test_sessionwindow_mean(
209215
self, expiration, session_window_definition_factory, state_manager
210216
):
211-
window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000)
217+
window_def = session_window_definition_factory(
218+
inactivity_gap_ms=10000, grace_ms=1000
219+
)
212220
window = window_def.mean()
213221
assert window.name == "session_window_10000_mean"
214222

@@ -231,7 +239,9 @@ def test_sessionwindow_mean(
231239
def test_sessionwindow_reduce(
232240
self, expiration, session_window_definition_factory, state_manager
233241
):
234-
window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000)
242+
window_def = session_window_definition_factory(
243+
inactivity_gap_ms=10000, grace_ms=1000
244+
)
235245
window = window_def.reduce(
236246
reducer=lambda agg, current: agg + [current],
237247
initializer=lambda value: [value],
@@ -257,7 +267,9 @@ def test_sessionwindow_reduce(
257267
def test_sessionwindow_max(
258268
self, expiration, session_window_definition_factory, state_manager
259269
):
260-
window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000)
270+
window_def = session_window_definition_factory(
271+
inactivity_gap_ms=10000, grace_ms=1000
272+
)
261273
window = window_def.max()
262274
assert window.name == "session_window_10000_max"
263275

@@ -280,7 +292,9 @@ def test_sessionwindow_max(
280292
def test_sessionwindow_min(
281293
self, expiration, session_window_definition_factory, state_manager
282294
):
283-
window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000)
295+
window_def = session_window_definition_factory(
296+
inactivity_gap_ms=10000, grace_ms=1000
297+
)
284298
window = window_def.min()
285299
assert window.name == "session_window_10000_min"
286300

@@ -303,7 +317,9 @@ def test_sessionwindow_min(
303317
def test_sessionwindow_collect(
304318
self, expiration, session_window_definition_factory, state_manager
305319
):
306-
window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000)
320+
window_def = session_window_definition_factory(
321+
inactivity_gap_ms=10000, grace_ms=1000
322+
)
307323
window = window_def.collect()
308324
assert window.name == "session_window_10000_collect"
309325

@@ -335,7 +351,7 @@ def test_session_window_def_init_invalid(
335351
):
336352
with pytest.raises(ValueError):
337353
SessionWindowDefinition(
338-
timeout_ms=timeout,
354+
inactivity_gap_ms=timeout,
339355
grace_ms=grace,
340356
name=name,
341357
dataframe=dataframe_factory(),
@@ -344,7 +360,7 @@ def test_session_window_def_init_invalid(
344360
def test_session_window_def_init_invalid_type(self, dataframe_factory):
345361
with pytest.raises(TypeError):
346362
SessionWindowDefinition(
347-
timeout_ms="invalid", # should be int
363+
inactivity_gap_ms="invalid", # should be int
348364
grace_ms=1000,
349365
name="test",
350366
dataframe=dataframe_factory(),
@@ -358,7 +374,9 @@ def test_session_window_process_timeout_behavior(
358374
state_manager,
359375
):
360376
"""Test that sessions properly timeout and new sessions start correctly"""
361-
window_def = session_window_definition_factory(timeout_ms=5000, grace_ms=0)
377+
window_def = session_window_definition_factory(
378+
inactivity_gap_ms=5000, grace_ms=0
379+
)
362380
window = window_def.sum()
363381
window.final(closing_strategy=expiration)
364382
store = state_manager.get_store(stream_id="test", store_name=window.name)
@@ -404,7 +422,9 @@ def test_session_window_grace_period(
404422
self, session_window_definition_factory, state_manager
405423
):
406424
"""Test that grace period allows late events"""
407-
window_def = session_window_definition_factory(timeout_ms=5000, grace_ms=2000)
425+
window_def = session_window_definition_factory(
426+
inactivity_gap_ms=5000, grace_ms=2000
427+
)
408428
window = window_def.sum()
409429
window.final(closing_strategy="key")
410430
store = state_manager.get_store(stream_id="test", store_name=window.name)
@@ -441,7 +461,9 @@ def test_session_window_multiple_keys(
441461
self, session_window_definition_factory, state_manager
442462
):
443463
"""Test that different keys maintain separate sessions"""
444-
window_def = session_window_definition_factory(timeout_ms=5000, grace_ms=0)
464+
window_def = session_window_definition_factory(
465+
inactivity_gap_ms=5000, grace_ms=0
466+
)
445467
window = window_def.sum()
446468
window.final(closing_strategy="key")
447469
store = state_manager.get_store(stream_id="test", store_name=window.name)
@@ -490,7 +512,9 @@ def test_session_partition_expiration(
490512
self, session_window_definition_factory, state_manager
491513
):
492514
"""Test partition-level session expiration"""
493-
window_def = session_window_definition_factory(timeout_ms=5000, grace_ms=1000)
515+
window_def = session_window_definition_factory(
516+
inactivity_gap_ms=5000, grace_ms=1000
517+
)
494518
window = window_def.sum()
495519
window.final(closing_strategy="partition")
496520
store = state_manager.get_store(stream_id="test", store_name=window.name)
@@ -528,7 +552,9 @@ def test_session_window_late_events(
528552
self, session_window_definition_factory, state_manager
529553
):
530554
"""Test handling of late events that arrive after session closure"""
531-
window_def = session_window_definition_factory(timeout_ms=5000, grace_ms=1000)
555+
window_def = session_window_definition_factory(
556+
inactivity_gap_ms=5000, grace_ms=1000
557+
)
532558
window = window_def.sum()
533559
window.final(closing_strategy="key")
534560
store = state_manager.get_store(stream_id="test", store_name=window.name)
@@ -560,7 +586,9 @@ def test_session_window_current_mode(
560586
self, session_window_definition_factory, state_manager
561587
):
562588
"""Test session window with current() mode"""
563-
window_def = session_window_definition_factory(timeout_ms=5000, grace_ms=0)
589+
window_def = session_window_definition_factory(
590+
inactivity_gap_ms=5000, grace_ms=0
591+
)
564592
window = window_def.sum()
565593
window.current(closing_strategy="key")
566594
store = state_manager.get_store(stream_id="test", store_name=window.name)
@@ -588,7 +616,9 @@ def test_session_window_overlapping_sessions(
588616
self, session_window_definition_factory, state_manager
589617
):
590618
"""Test that sessions don't overlap for the same key"""
591-
window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=0)
619+
window_def = session_window_definition_factory(
620+
inactivity_gap_ms=10000, grace_ms=0
621+
)
592622
window = window_def.sum()
593623
window.final(closing_strategy="key")
594624
store = state_manager.get_store(stream_id="test", store_name=window.name)
@@ -624,7 +654,9 @@ def test_session_window_merge_sessions(
624654
self, session_window_definition_factory, state_manager
625655
):
626656
"""Test that an event can merge two previously separate sessions"""
627-
window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000)
657+
window_def = session_window_definition_factory(
658+
inactivity_gap_ms=10000, grace_ms=1000
659+
)
628660
window = window_def.sum()
629661
window.final(closing_strategy="key")
630662
store = state_manager.get_store(stream_id="test", store_name=window.name)
@@ -707,7 +739,9 @@ def test_session_window_bridging_event_scenario(
707739
Current behavior: Session A gets extended, Session B remains separate
708740
Ideal behavior: Sessions A and B get merged when bridging event arrives
709741
"""
710-
window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=2000)
742+
window_def = session_window_definition_factory(
743+
inactivity_gap_ms=10000, grace_ms=2000
744+
)
711745
window = window_def.sum()
712746
window.final(closing_strategy="key")
713747
store = state_manager.get_store(stream_id="test", store_name=window.name)
@@ -777,7 +811,9 @@ def test_session_window_string_key_extension(
777811
`transaction.delete_window()` was called with a string key instead of
778812
the properly serialized bytes prefix.
779813
"""
780-
window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000)
814+
window_def = session_window_definition_factory(
815+
inactivity_gap_ms=10000, grace_ms=1000
816+
)
781817
window = window_def.sum()
782818
window.final(closing_strategy="key")
783819
store = state_manager.get_store(stream_id="test", store_name=window.name)

0 commit comments

Comments
 (0)