Skip to content

Commit 6bee68a

Browse files
committed
End time equals to last event time
1 parent 6802727 commit 6bee68a

File tree

2 files changed

+42
-39
lines changed

2 files changed

+42
-39
lines changed

quixstreams/dataframe/windows/session.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def process_window(
7575
value=value,
7676
key=key,
7777
start=timestamp_ms,
78-
end=timestamp_ms + timeout_ms,
78+
end=timestamp_ms, # End time is the timestamp of the last event
7979
timestamp_ms=timestamp_ms,
8080
late_by_ms=late_by_ms,
8181
)
@@ -94,13 +94,14 @@ def process_window(
9494

9595
for (window_start, window_end), aggregated_value, _ in windows:
9696
# Calculate the time gap between the new event and the session's last activity
97-
session_last_activity = window_end - timeout_ms
97+
# window_end is stored as last_event_timestamp + 1, so subtract 1 to get actual last event time
98+
session_last_activity = window_end - 1
9899
time_gap = timestamp_ms - session_last_activity
99100

100101
# Check if this session can be extended
101102
if time_gap <= timeout_ms + grace_ms and timestamp_ms >= window_start:
102103
session_start = window_start
103-
session_end = timestamp_ms + timeout_ms
104+
session_end = timestamp_ms + 1
104105
can_extend_session = True
105106
existing_aggregated = aggregated_value
106107
old_window_to_delete = (window_start, window_end)
@@ -109,7 +110,7 @@ def process_window(
109110
# If no extendable session found, start a new one
110111
if not can_extend_session:
111112
session_start = timestamp_ms
112-
session_end = timestamp_ms + timeout_ms
113+
session_end = timestamp_ms + 1
113114

114115
# Process the event for this session
115116
updated_windows: list[WindowKeyResult] = []
@@ -137,7 +138,7 @@ def process_window(
137138
updated_windows.append(
138139
(
139140
key,
140-
self._results(aggregated, [], session_start, session_end),
141+
self._results(aggregated, [], session_start, session_end - 1),
141142
)
142143
)
143144

@@ -219,8 +220,10 @@ def expire_by_key(
219220

220221
windows_to_delete = []
221222
for (window_start, window_end), aggregated, _ in all_windows:
222-
# Session expires when the session end time has passed the expiry threshold
223-
if window_end <= expiry_threshold:
223+
# Session expires when the session end time + timeout has passed the expiry threshold
224+
# window_end is stored as last_event_timestamp + 1, so we subtract 1 and add timeout_ms
225+
last_event_timestamp = window_end - 1
226+
if last_event_timestamp + self._timeout_ms <= expiry_threshold:
224227
collected = []
225228
if collect:
226229
collected = state.get_from_collection(window_start, window_end)
@@ -229,7 +232,7 @@ def expire_by_key(
229232
count += 1
230233
yield (
231234
key,
232-
self._results(aggregated, collected, window_start, window_end),
235+
self._results(aggregated, collected, window_start, window_end - 1),
233236
)
234237

235238
# Clean up expired windows

tests/test_quixstreams/test_dataframe/test_windows/test_session.py

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def test_multiaggregation(
9191
key,
9292
{
9393
"start": 1000,
94-
"end": 11000, # 1000 + 10000 timeout
94+
"end": 1000, # timestamp of last event
9595
"count": 1,
9696
"sum": 1,
9797
"mean": 1.0,
@@ -112,7 +112,7 @@ def test_multiaggregation(
112112
key,
113113
{
114114
"start": 1000,
115-
"end": 15000, # 5000 + 10000 timeout
115+
"end": 5000, # timestamp of last event
116116
"count": 2,
117117
"sum": 5,
118118
"mean": 2.5,
@@ -132,7 +132,7 @@ def test_multiaggregation(
132132
key,
133133
{
134134
"start": 1000,
135-
"end": 15000,
135+
"end": 5000, # timestamp of last event
136136
"count": 2,
137137
"sum": 5,
138138
"mean": 2.5,
@@ -147,7 +147,7 @@ def test_multiaggregation(
147147
key,
148148
{
149149
"start": 26000,
150-
"end": 36000, # 26000 + 10000 timeout
150+
"end": 26000, # timestamp of last event
151151
"count": 1,
152152
"sum": 2,
153153
"mean": 2.0,
@@ -182,7 +182,7 @@ def test_sessionwindow_count(
182182
assert len(updated) == 1
183183
assert updated[0][1]["value"] == 2
184184
assert updated[0][1]["start"] == 1000
185-
assert updated[0][1]["end"] == 15000 # 5000 + 10000
185+
assert updated[0][1]["end"] == 5000 # timestamp of last event
186186
assert not expired
187187

188188
@pytest.mark.parametrize("expiration", ("key", "partition"))
@@ -207,7 +207,7 @@ def test_sessionwindow_sum(
207207
assert len(updated) == 1
208208
assert updated[0][1]["value"] == 5
209209
assert updated[0][1]["start"] == 1000
210-
assert updated[0][1]["end"] == 15000
210+
assert updated[0][1]["end"] == 5000 # timestamp of last event
211211
assert not expired
212212

213213
@pytest.mark.parametrize("expiration", ("key", "partition"))
@@ -232,7 +232,7 @@ def test_sessionwindow_mean(
232232
assert len(updated) == 1
233233
assert updated[0][1]["value"] == 3.0
234234
assert updated[0][1]["start"] == 1000
235-
assert updated[0][1]["end"] == 15000
235+
assert updated[0][1]["end"] == 5000 # timestamp of last event
236236
assert not expired
237237

238238
@pytest.mark.parametrize("expiration", ("key", "partition"))
@@ -260,7 +260,7 @@ def test_sessionwindow_reduce(
260260
assert len(updated) == 1
261261
assert updated[0][1]["value"] == [2, 3]
262262
assert updated[0][1]["start"] == 1000
263-
assert updated[0][1]["end"] == 15000
263+
assert updated[0][1]["end"] == 5000 # timestamp of last event
264264
assert not expired
265265

266266
@pytest.mark.parametrize("expiration", ("key", "partition"))
@@ -285,7 +285,7 @@ def test_sessionwindow_max(
285285
assert len(updated) == 1
286286
assert updated[0][1]["value"] == 5
287287
assert updated[0][1]["start"] == 1000
288-
assert updated[0][1]["end"] == 15000
288+
assert updated[0][1]["end"] == 5000 # timestamp of last event
289289
assert not expired
290290

291291
@pytest.mark.parametrize("expiration", ("key", "partition"))
@@ -310,7 +310,7 @@ def test_sessionwindow_min(
310310
assert len(updated) == 1
311311
assert updated[0][1]["value"] == 2
312312
assert updated[0][1]["start"] == 1000
313-
assert updated[0][1]["end"] == 15000
313+
assert updated[0][1]["end"] == 5000 # timestamp of last event
314314
assert not expired
315315

316316
@pytest.mark.parametrize("expiration", ("key", "partition"))
@@ -336,7 +336,7 @@ def test_sessionwindow_collect(
336336
window, value=4, key=key, transaction=tx, timestamp_ms=25000
337337
)
338338
assert not updated
339-
assert expired == [(key, {"start": 1000, "end": 18000, "value": [1, 2, 3]})]
339+
assert expired == [(key, {"start": 1000, "end": 8000, "value": [1, 2, 3]})]
340340

341341
@pytest.mark.parametrize(
342342
"timeout, grace, name",
@@ -391,7 +391,7 @@ def test_session_window_process_timeout_behavior(
391391
assert len(updated) == 1
392392
assert updated[0][1]["value"] == 1
393393
assert updated[0][1]["start"] == 1000
394-
assert updated[0][1]["end"] == 6000 # 1000 + 5000
394+
assert updated[0][1]["end"] == 1000 # timestamp of last event
395395
assert not expired
396396

397397
# Add to session 1 (within timeout)
@@ -401,7 +401,7 @@ def test_session_window_process_timeout_behavior(
401401
assert len(updated) == 1
402402
assert updated[0][1]["value"] == 3
403403
assert updated[0][1]["start"] == 1000
404-
assert updated[0][1]["end"] == 9000 # 4000 + 5000
404+
assert updated[0][1]["end"] == 4000 # timestamp of last event
405405
assert not expired
406406

407407
# Start session 2 (outside timeout) - should expire session 1
@@ -411,12 +411,12 @@ def test_session_window_process_timeout_behavior(
411411
assert len(updated) == 1
412412
assert updated[0][1]["value"] == 5
413413
assert updated[0][1]["start"] == 15000
414-
assert updated[0][1]["end"] == 20000 # 15000 + 5000
414+
assert updated[0][1]["end"] == 15000 # timestamp of last event
415415

416416
assert len(expired) == 1
417417
assert expired[0][1]["value"] == 3
418418
assert expired[0][1]["start"] == 1000
419-
assert expired[0][1]["end"] == 9000
419+
assert expired[0][1]["end"] == 4000 # timestamp of last event
420420

421421
def test_session_window_grace_period(
422422
self, session_window_definition_factory, state_manager
@@ -670,7 +670,7 @@ def test_session_window_merge_sessions(
670670
)
671671
assert len(updated) == 1
672672
assert updated[0][1]["start"] == 1000
673-
assert updated[0][1]["end"] == 11000 # 1000 + 10000
673+
assert updated[0][1]["end"] == 1000 # timestamp of last event
674674
assert updated[0][1]["value"] == 1
675675
assert not expired
676676

@@ -683,12 +683,12 @@ def test_session_window_merge_sessions(
683683
# First session should now be expired
684684
assert len(expired) == 1
685685
assert expired[0][1]["start"] == 1000
686-
assert expired[0][1]["end"] == 11000
686+
assert expired[0][1]["end"] == 1000 # timestamp of last event
687687
assert expired[0][1]["value"] == 1
688688

689689
assert len(updated) == 1
690690
assert updated[0][1]["start"] == 20000
691-
assert updated[0][1]["end"] == 30000 # 20000 + 10000
691+
assert updated[0][1]["end"] == 20000 # timestamp of last event
692692
assert updated[0][1]["value"] == 10
693693

694694
# Add another event to the second session
@@ -697,7 +697,7 @@ def test_session_window_merge_sessions(
697697
)
698698
assert len(updated) == 1
699699
assert updated[0][1]["start"] == 20000
700-
assert updated[0][1]["end"] == 35000 # 25000 + 10000
700+
assert updated[0][1]["end"] == 25000 # timestamp of last event
701701
assert updated[0][1]["value"] == 15 # 10 + 5
702702
assert not expired
703703

@@ -710,13 +710,13 @@ def test_session_window_merge_sessions(
710710
# Second session should be expired
711711
assert len(expired) == 1
712712
assert expired[0][1]["start"] == 20000
713-
assert expired[0][1]["end"] == 35000
713+
assert expired[0][1]["end"] == 25000 # timestamp of last event
714714
assert expired[0][1]["value"] == 15
715715

716716
# Third session starts
717717
assert len(updated) == 1
718718
assert updated[0][1]["start"] == 50000
719-
assert updated[0][1]["end"] == 60000 # 50000 + 10000
719+
assert updated[0][1]["end"] == 50000 # timestamp of last event
720720
assert updated[0][1]["value"] == 100
721721

722722
def test_session_window_bridging_event_scenario(
@@ -755,7 +755,7 @@ def test_session_window_bridging_event_scenario(
755755
)
756756
assert len(updated) == 1
757757
assert updated[0][1]["start"] == 1000
758-
assert updated[0][1]["end"] == 11000 # 1000 + 10000
758+
assert updated[0][1]["end"] == 1000 # timestamp of last event
759759
assert updated[0][1]["value"] == 5
760760
assert not expired
761761

@@ -770,7 +770,7 @@ def test_session_window_bridging_event_scenario(
770770
# Event at 12000 is before 13000, so it should extend Session A
771771
assert len(updated) == 1
772772
assert updated[0][1]["start"] == 1000 # Session A extended
773-
assert updated[0][1]["end"] == 22000 # 12000 + 10000
773+
assert updated[0][1]["end"] == 12000 # timestamp of last event
774774
assert updated[0][1]["value"] == 15 # 5 + 10
775775
assert not expired
776776

@@ -781,7 +781,7 @@ def test_session_window_bridging_event_scenario(
781781
# This should extend the already extended Session A further
782782
assert len(updated) == 1
783783
assert updated[0][1]["start"] == 1000 # Still Session A
784-
assert updated[0][1]["end"] == 25000 # 15000 + 10000
784+
assert updated[0][1]["end"] == 15000 # timestamp of last event
785785
assert updated[0][1]["value"] == 35 # 5 + 10 + 20
786786
assert not expired
787787

@@ -791,7 +791,7 @@ def test_session_window_bridging_event_scenario(
791791
)
792792
assert len(expired) == 1
793793
assert expired[0][1]["start"] == 1000
794-
assert expired[0][1]["end"] == 25000
794+
assert expired[0][1]["end"] == 15000 # timestamp of last event
795795
assert expired[0][1]["value"] == 35 # All events combined
796796

797797
assert len(updated) == 1
@@ -828,7 +828,7 @@ def test_session_window_string_key_extension(
828828
)
829829
assert len(updated) == 1
830830
assert updated[0][1]["start"] == 1000
831-
assert updated[0][1]["end"] == 11000 # 1000 + 10000
831+
assert updated[0][1]["end"] == 1000 # timestamp of last event
832832
assert updated[0][1]["value"] == 100
833833
assert not expired
834834

@@ -839,7 +839,7 @@ def test_session_window_string_key_extension(
839839
)
840840
assert len(updated) == 1
841841
assert updated[0][1]["start"] == 1000 # Session extended, same start
842-
assert updated[0][1]["end"] == 15000 # 5000 + 10000 (new end time)
842+
assert updated[0][1]["end"] == 5000 # timestamp of last event
843843
assert updated[0][1]["value"] == 300 # 100 + 200
844844
assert not expired
845845

@@ -849,7 +849,7 @@ def test_session_window_string_key_extension(
849849
)
850850
assert len(updated) == 1
851851
assert updated[0][1]["start"] == 1000 # Session extended again
852-
assert updated[0][1]["end"] == 18000 # 8000 + 10000
852+
assert updated[0][1]["end"] == 8000 # timestamp of last event
853853
assert updated[0][1]["value"] == 350 # 100 + 200 + 50
854854
assert not expired
855855

@@ -861,7 +861,7 @@ def test_session_window_string_key_extension(
861861
assert len(updated) == 1
862862
assert updated[0][0] == key2 # Different key
863863
assert updated[0][1]["start"] == 9000
864-
assert updated[0][1]["end"] == 19000 # 9000 + 10000
864+
assert updated[0][1]["end"] == 9000 # timestamp of last event
865865
assert updated[0][1]["value"] == 75
866866
assert not expired
867867

@@ -874,12 +874,12 @@ def test_session_window_string_key_extension(
874874
assert len(expired) == 1
875875
assert expired[0][0] == key
876876
assert expired[0][1]["start"] == 1000
877-
assert expired[0][1]["end"] == 18000
877+
assert expired[0][1]["end"] == 8000 # timestamp of last event
878878
assert expired[0][1]["value"] == 350
879879

880880
# Should have started a new session for the first key
881881
assert len(updated) == 1
882882
assert updated[0][0] == key
883883
assert updated[0][1]["start"] == 30000
884-
assert updated[0][1]["end"] == 40000
884+
assert updated[0][1]["end"] == 30000 # timestamp of last event
885885
assert updated[0][1]["value"] == 25

0 commit comments

Comments
 (0)