Skip to content

Commit e371715

Browse files
committed
Lift the equality check from validation to simplify session windows
1 parent 6bee68a commit e371715

File tree

2 files changed

+12
-12
lines changed

2 files changed

+12
-12
lines changed

quixstreams/dataframe/windows/session.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,14 @@ def process_window(
9494

9595
for (window_start, window_end), aggregated_value, _ in windows:
9696
# Calculate the time gap between the new event and the session's last activity
97-
# window_end is stored as last_event_timestamp + 1, so subtract 1 to get actual last event time
98-
session_last_activity = window_end - 1
97+
# window_end now directly represents the timestamp of the last event
98+
session_last_activity = window_end
9999
time_gap = timestamp_ms - session_last_activity
100100

101101
# Check if this session can be extended
102102
if time_gap <= timeout_ms + grace_ms and timestamp_ms >= window_start:
103103
session_start = window_start
104-
session_end = timestamp_ms + 1
104+
session_end = timestamp_ms
105105
can_extend_session = True
106106
existing_aggregated = aggregated_value
107107
old_window_to_delete = (window_start, window_end)
@@ -110,7 +110,7 @@ def process_window(
110110
# If no extendable session found, start a new one
111111
if not can_extend_session:
112112
session_start = timestamp_ms
113-
session_end = timestamp_ms + 1
113+
session_end = timestamp_ms # End time is the timestamp of the last event
114114

115115
# Process the event for this session
116116
updated_windows: list[WindowKeyResult] = []
@@ -138,7 +138,7 @@ def process_window(
138138
updated_windows.append(
139139
(
140140
key,
141-
self._results(aggregated, [], session_start, session_end - 1),
141+
self._results(aggregated, [], session_start, session_end),
142142
)
143143
)
144144

@@ -221,18 +221,18 @@ def expire_by_key(
221221
windows_to_delete = []
222222
for (window_start, window_end), aggregated, _ in all_windows:
223223
# Session expires when the session end time + timeout has passed the expiry threshold
224-
# window_end is stored as last_event_timestamp + 1, so we subtract 1 and add timeout_ms
225-
last_event_timestamp = window_end - 1
226-
if last_event_timestamp + self._timeout_ms <= expiry_threshold:
224+
# window_end directly represents the timestamp of the last event
225+
if window_end + self._timeout_ms <= expiry_threshold:
227226
collected = []
228227
if collect:
229-
collected = state.get_from_collection(window_start, window_end)
228+
# window_end is now the timestamp of the last event, so we need +1 to include it
229+
collected = state.get_from_collection(window_start, window_end + 1)
230230

231231
windows_to_delete.append((window_start, window_end))
232232
count += 1
233233
yield (
234234
key,
235-
self._results(aggregated, collected, window_start, window_end - 1),
235+
self._results(aggregated, collected, window_start, window_end),
236236
)
237237

238238
# Clean up expired windows

quixstreams/state/rocksdb/windowed/transaction.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -485,9 +485,9 @@ def _set_timestamp(self, cache: Cache, prefix: bytes, timestamp_ms: int):
485485
)
486486

487487
def _validate_duration(self, start_ms: int, end_ms: int):
488-
if end_ms <= start_ms:
488+
if end_ms < start_ms:
489489
raise ValueError(
490-
f"Invalid window duration: window end {end_ms} is smaller or equal "
490+
f"Invalid window duration: window end {end_ms} is smaller "
491491
f"than window start {start_ms}"
492492
)
493493

0 commit comments

Comments
 (0)