Skip to content

Commit 610d38c

Browse files
committed
Introduce delete_window method to eliminate private member usage
1 parent 08dd3d5 commit 610d38c

File tree

4 files changed

+48
-6
lines changed

4 files changed

+48
-6
lines changed

quixstreams/dataframe/windows/session.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ def process_window(
119119
# Delete the old window if extending an existing session
120120
if can_extend_session and old_window_to_delete:
121121
old_start, old_end = old_window_to_delete
122-
transaction.delete_window(old_start, old_end, prefix=state._prefix) # type: ignore # noqa: SLF001
122+
state.delete_window(old_start, old_end)
123123

124124
# Add to collection if needed
125125
if collect:
@@ -238,11 +238,7 @@ def expire_by_key(
238238

239239
# Clean up expired windows
240240
for window_start, window_end in windows_to_delete:
241-
state._transaction.delete_window( # type: ignore # noqa: SLF001
242-
window_start,
243-
window_end,
244-
prefix=state._prefix, # type: ignore # noqa: SLF001
245-
)
241+
state.delete_window(window_start, window_end)
246242
if collect:
247243
state.delete_from_collection(window_end, start=window_start)
248244

quixstreams/state/rocksdb/windowed/state.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,18 @@ def delete_windows(self, max_start_time: int, delete_values: bool) -> None:
182182
delete_values=delete_values,
183183
prefix=self._prefix,
184184
)
185+
186+
def delete_window(self, start_ms: int, end_ms: int) -> None:
187+
"""
188+
Delete a specific window from the state store.
189+
190+
This method removes a single window entry with the specified start and end timestamps.
191+
192+
:param start_ms: The start timestamp of the window to delete
193+
:param end_ms: The end timestamp of the window to delete
194+
"""
195+
return self._transaction.delete_window(
196+
start_ms=start_ms,
197+
end_ms=end_ms,
198+
prefix=self._prefix,
199+
)

quixstreams/state/types.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,17 @@ def delete_windows(self, max_start_time: int, delete_values: bool) -> None:
187187
"""
188188
...
189189

190+
def delete_window(self, start_ms: int, end_ms: int) -> None:
191+
"""
192+
Delete a specific window from the state store.
193+
194+
This method removes a single window entry with the specified start and end timestamps.
195+
196+
:param start_ms: The start timestamp of the window to delete
197+
:param end_ms: The end timestamp of the window to delete
198+
"""
199+
...
200+
190201
def get_windows(
191202
self, start_from_ms: int, start_to_ms: int, backwards: bool = False
192203
) -> list[WindowDetail[V]]:

tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_state.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,26 @@ def test_delete_windows_with_values(transaction_state, get_value):
388388
assert get_value(timestamp_ms=2, counter=1) == "b"
389389

390390

391+
def test_delete_window(transaction_state):
392+
with transaction_state() as state:
393+
state.update_window(start_ms=1, end_ms=2, value=1, timestamp_ms=1)
394+
state.update_window(start_ms=2, end_ms=3, value=2, timestamp_ms=2)
395+
state.update_window(start_ms=3, end_ms=4, value=3, timestamp_ms=3)
396+
397+
with transaction_state() as state:
398+
assert state.get_window(start_ms=1, end_ms=2)
399+
assert state.get_window(start_ms=2, end_ms=3)
400+
assert state.get_window(start_ms=3, end_ms=4)
401+
402+
# Delete a specific window
403+
state.delete_window(start_ms=2, end_ms=3)
404+
405+
# Only the specified window should be deleted
406+
assert state.get_window(start_ms=1, end_ms=2)
407+
assert not state.get_window(start_ms=2, end_ms=3)
408+
assert state.get_window(start_ms=3, end_ms=4)
409+
410+
391411
@pytest.mark.parametrize("value", [1, "string", None, ["list"], {"dict": "dict"}])
392412
def test_add_to_collection(transaction_state, get_value, value):
393413
with transaction_state() as state:

0 commit comments

Comments
 (0)