Skip to content

Commit f2c71fc

Browse files
committed
Fix end time resolution for out-of-order events
1 parent e371715 commit f2c71fc

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

quixstreams/dataframe/windows/session.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ def process_window(
101101
# Check if this session can be extended
102102
if time_gap <= timeout_ms + grace_ms and timestamp_ms >= window_start:
103103
session_start = window_start
104-
session_end = timestamp_ms
104+
# Only update end time if the new event is newer than the current end time
105+
session_end = max(window_end, timestamp_ms)
105106
can_extend_session = True
106107
existing_aggregated = aggregated_value
107108
old_window_to_delete = (window_start, window_end)

tests/test_quixstreams/test_dataframe/test_windows/test_session.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -883,3 +883,54 @@ def test_session_window_string_key_extension(
883883
assert updated[0][1]["start"] == 30000
884884
assert updated[0][1]["end"] == 30000 # timestamp of last event
885885
assert updated[0][1]["value"] == 25
886+
887+
def test_out_of_order_events_end_time(
888+
self, session_window_definition_factory, state_manager
889+
):
890+
"""Test that out-of-order events correctly maintain the latest timestamp as end time"""
891+
window_def = session_window_definition_factory(
892+
inactivity_gap_ms=10000, grace_ms=5000
893+
)
894+
window = window_def.sum()
895+
window.final(closing_strategy="key")
896+
897+
store = state_manager.get_store(stream_id="test", store_name=window.name)
898+
store.assign_partition(0)
899+
900+
with store.start_partition_transaction(0) as tx:
901+
key = b"key"
902+
903+
# 1. Start session with event at timestamp 1000
904+
updated, expired = process(
905+
window, value=1, key=key, transaction=tx, timestamp_ms=1000
906+
)
907+
assert updated[0][1]["start"] == 1000
908+
assert updated[0][1]["end"] == 1000 # End should be 1000
909+
assert updated[0][1]["value"] == 1
910+
911+
# 2. Add event at timestamp 8000 (in order)
912+
updated, expired = process(
913+
window, value=2, key=key, transaction=tx, timestamp_ms=8000
914+
)
915+
assert updated[0][1]["start"] == 1000
916+
assert updated[0][1]["end"] == 8000 # End should be 8000 (latest event)
917+
assert updated[0][1]["value"] == 3
918+
919+
# 3. Add OUT-OF-ORDER event at timestamp 3000 (before 8000)
920+
# This should be accepted (within grace period) but should NOT change the end time
921+
updated, expired = process(
922+
window, value=10, key=key, transaction=tx, timestamp_ms=3000
923+
)
924+
assert updated[0][1]["start"] == 1000
925+
# KEY TEST: End time should remain 8000, not become 3000!
926+
assert updated[0][1]["end"] == 8000
927+
assert updated[0][1]["value"] == 13
928+
929+
# 4. Add event NEWER than current end (timestamp 9000)
930+
updated, expired = process(
931+
window, value=4, key=key, transaction=tx, timestamp_ms=9000
932+
)
933+
assert updated[0][1]["start"] == 1000
934+
# NOW the end time should update to 9000
935+
assert updated[0][1]["end"] == 9000
936+
assert updated[0][1]["value"] == 17

0 commit comments

Comments
 (0)