Skip to content

Commit 7e6a96f

Browse files
committed
Tweak the session windows code
1 parent 3bd0c4a commit 7e6a96f

File tree

1 file changed

+24
-41
lines changed

1 file changed

+24
-41
lines changed

quixstreams/dataframe/windows/session.py

Lines changed: 24 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -66,73 +66,56 @@ def process_window(
6666
latest_timestamp = max(timestamp_ms, state_ts)
6767

6868
# Calculate session expiry threshold
69-
session_expiry_threshold = latest_timestamp - grace_ms
69+
expiry_threshold = latest_timestamp - grace_ms
7070

7171
# Check if the event is too late
72-
if timestamp_ms < session_expiry_threshold:
73-
late_by_ms = session_expiry_threshold - timestamp_ms
72+
if timestamp_ms < expiry_threshold:
7473
self._on_expired_window(
7574
value=value,
7675
key=key,
7776
start=timestamp_ms,
78-
end=timestamp_ms, # End time is the timestamp of the last event
77+
end=timestamp_ms,
7978
timestamp_ms=timestamp_ms,
80-
late_by_ms=late_by_ms,
79+
late_by_ms=expiry_threshold - timestamp_ms,
8180
)
8281
return [], []
8382

84-
# Look for an existing session that can be extended
85-
can_extend_session = False
86-
existing_aggregated = None
87-
old_window_to_delete = None
88-
8983
# Search for active sessions that can accommodate the new event
90-
search_start = max(0, timestamp_ms - timeout_ms * 2)
91-
windows = state.get_windows(
92-
search_start, timestamp_ms + timeout_ms + 1, backwards=True
93-
)
94-
95-
for (window_start, window_end), aggregated_value, _ in windows:
84+
for (window_start, window_end), aggregated_value, _ in state.get_windows(
85+
start_from_ms=max(0, timestamp_ms - timeout_ms * 2),
86+
start_to_ms=timestamp_ms + timeout_ms + 1,
87+
backwards=True,
88+
):
9689
# Calculate the time gap between the new event and the session's last activity
9790
# window_end now directly represents the timestamp of the last event
98-
session_last_activity = window_end
99-
time_gap = timestamp_ms - session_last_activity
91+
time_gap = timestamp_ms - window_end
10092

10193
# Check if this session can be extended
10294
if time_gap <= timeout_ms + grace_ms and timestamp_ms >= window_start:
95+
extend_session = True
10396
session_start = window_start
104-
# Only update end time if the new event is newer than the current end time
97+
# Only update end time if the new event is greater than the current end time
10598
session_end = max(window_end, timestamp_ms)
106-
can_extend_session = True
10799
existing_aggregated = aggregated_value
108-
old_window_to_delete = (window_start, window_end)
100+
# Delete the old window if extending an existing session
101+
state.delete_window(window_start, window_end)
109102
break
110-
111-
# If no extendable session found, start a new one
112-
if not can_extend_session:
113-
session_start = timestamp_ms
114-
session_end = timestamp_ms # End time is the timestamp of the last event
103+
else:
104+
# If no extendable session found, start a new one
105+
extend_session = False
106+
session_start = session_end = timestamp_ms
115107

116108
# Process the event for this session
117109
updated_windows: list[WindowKeyResult] = []
118110

119-
# Delete the old window if extending an existing session
120-
if can_extend_session and old_window_to_delete:
121-
old_start, old_end = old_window_to_delete
122-
state.delete_window(old_start, old_end)
123-
124111
# Add to collection if needed
125112
if collect:
126-
state.add_to_collection(
127-
value=self._collect_value(value),
128-
id=timestamp_ms,
129-
)
113+
state.add_to_collection(value=self._collect_value(value), id=timestamp_ms)
130114

131115
# Update the session window aggregation
132-
aggregated = None
133116
if aggregate:
134117
current_value = (
135-
existing_aggregated if can_extend_session else self._initialize_value()
118+
existing_aggregated if extend_session else self._initialize_value()
136119
)
137120

138121
aggregated = self._aggregate_value(current_value, value, timestamp_ms)
@@ -142,6 +125,8 @@ def process_window(
142125
self._results(aggregated, [], session_start, session_end),
143126
)
144127
)
128+
else:
129+
aggregated = None
145130

146131
state.update_window(
147132
session_start, session_end, value=aggregated, timestamp_ms=timestamp_ms
@@ -150,12 +135,10 @@ def process_window(
150135
# Expire old sessions
151136
if self._closing_strategy == ClosingStrategy.PARTITION:
152137
expired_windows = self.expire_by_partition(
153-
transaction, session_expiry_threshold, collect
138+
transaction, expiry_threshold, collect
154139
)
155140
else:
156-
expired_windows = self.expire_by_key(
157-
key, state, session_expiry_threshold, collect
158-
)
141+
expired_windows = self.expire_by_key(key, state, expiry_threshold, collect)
159142

160143
return updated_windows, expired_windows
161144

0 commit comments

Comments
 (0)