Skip to content

Commit 5dab17f

Browse files
committed
Implement ActionQueue for batching actions in OverkizClient and add integration tests
1 parent dbf0e92 commit 5dab17f

File tree

5 files changed

+923
-2
lines changed

5 files changed

+923
-2
lines changed

pyoverkiz/action_queue.py

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
"""Action queue for batching multiple action executions into single API calls."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
from collections.abc import Callable, Coroutine
7+
from typing import TYPE_CHECKING
8+
9+
if TYPE_CHECKING:
10+
from pyoverkiz.enums import CommandMode
11+
from pyoverkiz.models import Action
12+
13+
14+
class QueuedExecution:
15+
"""Represents a queued action execution that will resolve to an exec_id when the batch executes."""
16+
17+
def __init__(self) -> None:
18+
self._future: asyncio.Future[str] = asyncio.Future()
19+
20+
def set_result(self, exec_id: str) -> None:
21+
"""Set the execution ID result."""
22+
if not self._future.done():
23+
self._future.set_result(exec_id)
24+
25+
def set_exception(self, exception: Exception) -> None:
26+
"""Set an exception if the batch execution failed."""
27+
if not self._future.done():
28+
self._future.set_exception(exception)
29+
30+
def __await__(self):
31+
"""Make this awaitable."""
32+
return self._future.__await__()
33+
34+
35+
class ActionQueue:
36+
"""
37+
Batches multiple action executions into single API calls.
38+
39+
When actions are added, they are held for a configurable delay period.
40+
If more actions arrive during this window, they are batched together.
41+
The batch is flushed when:
42+
- The delay timer expires
43+
- The max actions limit is reached
44+
- The command mode changes
45+
- Manual flush is requested
46+
"""
47+
48+
def __init__(
49+
self,
50+
executor: Callable[
51+
[list[Action], CommandMode | None, str | None], Coroutine[None, None, str]
52+
],
53+
delay: float = 0.5,
54+
max_actions: int = 20,
55+
) -> None:
56+
"""
57+
Initialize the action queue.
58+
59+
:param executor: Async function to execute batched actions
60+
:param delay: Seconds to wait before auto-flushing (default 0.5)
61+
:param max_actions: Maximum actions per batch before forced flush (default 20)
62+
"""
63+
self._executor = executor
64+
self._delay = delay
65+
self._max_actions = max_actions
66+
67+
self._pending_actions: list[Action] = []
68+
self._pending_mode: CommandMode | None = None
69+
self._pending_label: str | None = None
70+
self._pending_waiters: list[QueuedExecution] = []
71+
72+
self._flush_task: asyncio.Task[None] | None = None
73+
self._lock = asyncio.Lock()
74+
75+
async def add(
76+
self,
77+
actions: list[Action],
78+
mode: CommandMode | None = None,
79+
label: str | None = None,
80+
) -> QueuedExecution:
81+
"""
82+
Add actions to the queue.
83+
84+
:param actions: Actions to queue
85+
:param mode: Command mode (will flush if different from pending mode)
86+
:param label: Label for the action group
87+
:return: QueuedExecution that resolves to exec_id when batch executes
88+
"""
89+
async with self._lock:
90+
# If mode or label changes, flush existing queue first
91+
if self._pending_actions and (
92+
mode != self._pending_mode or label != self._pending_label
93+
):
94+
await self._flush_now()
95+
96+
# Add actions to pending queue
97+
self._pending_actions.extend(actions)
98+
self._pending_mode = mode
99+
self._pending_label = label
100+
101+
# Create waiter for this caller
102+
waiter = QueuedExecution()
103+
self._pending_waiters.append(waiter)
104+
105+
# If we hit max actions, flush immediately
106+
if len(self._pending_actions) >= self._max_actions:
107+
await self._flush_now()
108+
else:
109+
# Schedule delayed flush if not already scheduled
110+
if self._flush_task is None or self._flush_task.done():
111+
self._flush_task = asyncio.create_task(self._delayed_flush())
112+
113+
return waiter
114+
115+
async def _delayed_flush(self) -> None:
116+
"""Wait for the delay period, then flush the queue."""
117+
await asyncio.sleep(self._delay)
118+
async with self._lock:
119+
if not self._pending_actions:
120+
return
121+
122+
# Take snapshot and clear state while holding lock
123+
actions = self._pending_actions
124+
mode = self._pending_mode
125+
label = self._pending_label
126+
waiters = self._pending_waiters
127+
128+
self._pending_actions = []
129+
self._pending_mode = None
130+
self._pending_label = None
131+
self._pending_waiters = []
132+
self._flush_task = None
133+
134+
# Execute outside the lock
135+
try:
136+
exec_id = await self._executor(actions, mode, label)
137+
for waiter in waiters:
138+
waiter.set_result(exec_id)
139+
except Exception as exc:
140+
for waiter in waiters:
141+
waiter.set_exception(exc)
142+
143+
async def _flush_now(self) -> None:
144+
"""Execute pending actions immediately (must be called with lock held)."""
145+
if not self._pending_actions:
146+
return
147+
148+
# Cancel any pending flush task
149+
if self._flush_task and not self._flush_task.done():
150+
self._flush_task.cancel()
151+
self._flush_task = None
152+
153+
# Take snapshot of current batch
154+
actions = self._pending_actions
155+
mode = self._pending_mode
156+
label = self._pending_label
157+
waiters = self._pending_waiters
158+
159+
# Clear pending state
160+
self._pending_actions = []
161+
self._pending_mode = None
162+
self._pending_label = None
163+
self._pending_waiters = []
164+
165+
# Execute the batch (must release lock before calling executor to avoid deadlock)
166+
# Note: This is called within a lock context, we'll execute outside
167+
try:
168+
exec_id = await self._executor(actions, mode, label)
169+
# Notify all waiters
170+
for waiter in waiters:
171+
waiter.set_result(exec_id)
172+
except Exception as exc:
173+
# Propagate exception to all waiters
174+
for waiter in waiters:
175+
waiter.set_exception(exc)
176+
raise
177+
178+
async def flush(self) -> list[str]:
179+
"""
180+
Force flush all pending actions immediately.
181+
182+
:return: List of exec_ids from flushed batches
183+
"""
184+
async with self._lock:
185+
if not self._pending_actions:
186+
return []
187+
188+
# Since we can only have one batch pending at a time,
189+
# this will return a single exec_id in a list
190+
exec_ids: list[str] = []
191+
192+
try:
193+
await self._flush_now()
194+
# If flush succeeded, we can't actually return the exec_id here
195+
# since it's delivered via the waiters. This method is mainly
196+
# for forcing a flush, not retrieving results.
197+
# Return empty list to indicate flush completed
198+
except Exception:
199+
# If flush fails, the exception will be propagated to waiters
200+
# and also raised here
201+
raise
202+
203+
return exec_ids
204+
205+
def get_pending_count(self) -> int:
206+
"""Get the number of actions currently waiting in the queue."""
207+
return len(self._pending_actions)
208+
209+
async def shutdown(self) -> None:
210+
"""Shutdown the queue, flushing any pending actions."""
211+
async with self._lock:
212+
if self._flush_task and not self._flush_task.done():
213+
self._flush_task.cancel()
214+
self._flush_task = None
215+
216+
if self._pending_actions:
217+
await self._flush_now()

pyoverkiz/client.py

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
ServerDisconnectedError,
2020
)
2121

22+
from pyoverkiz.action_queue import ActionQueue, QueuedExecution
2223
from pyoverkiz.auth import AuthStrategy, Credentials, build_auth_strategy
2324
from pyoverkiz.const import SUPPORTED_SERVERS
2425
from pyoverkiz.enums import APIType, CommandMode, Server
@@ -141,6 +142,7 @@ class OverkizClient:
141142
session: ClientSession
142143
_ssl: ssl.SSLContext | bool = True
143144
_auth: AuthStrategy
145+
_action_queue: ActionQueue | None = None
144146

145147
def __init__(
146148
self,
@@ -149,11 +151,17 @@ def __init__(
149151
credentials: Credentials,
150152
verify_ssl: bool = True,
151153
session: ClientSession | None = None,
154+
action_queue_enabled: bool = False,
155+
action_queue_delay: float = 0.5,
156+
action_queue_max_actions: int = 20,
152157
) -> None:
153158
"""Constructor.
154159
155160
:param server: ServerConfig
156161
:param session: optional ClientSession
162+
:param action_queue_enabled: enable action batching queue (default False)
163+
:param action_queue_delay: seconds to wait before flushing queue (default 0.5)
164+
:param action_queue_max_actions: max actions per batch (default 20)
157165
"""
158166
self.server_config = self._normalize_server(server)
159167

@@ -173,6 +181,14 @@ def __init__(
173181
# Use the prebuilt SSL context with disabled strict validation for local API.
174182
self._ssl = SSL_CONTEXT_LOCAL_API
175183

184+
# Initialize action queue if enabled
185+
if action_queue_enabled:
186+
self._action_queue = ActionQueue(
187+
executor=self._execute_action_group_direct,
188+
delay=action_queue_delay,
189+
max_actions=action_queue_max_actions,
190+
)
191+
176192
self._auth = build_auth_strategy(
177193
server_config=self.server_config,
178194
credentials=credentials,
@@ -210,6 +226,10 @@ def _normalize_server(server: ServerConfig | Server | str) -> ServerConfig:
210226

211227
async def close(self) -> None:
212228
"""Close the session."""
229+
# Flush any pending actions in queue
230+
if self._action_queue:
231+
await self._action_queue.shutdown()
232+
213233
if self.event_listener_id:
214234
await self.unregister_event_listener()
215235

@@ -431,13 +451,13 @@ async def get_api_version(self) -> str:
431451

432452
@retry_on_too_many_executions
433453
@retry_on_auth_error
434-
async def execute_action_group(
454+
async def _execute_action_group_direct(
435455
self,
436456
actions: list[Action],
437457
mode: CommandMode | None = None,
438458
label: str | None = "python-overkiz-api",
439459
) -> str:
440-
"""Execute a non-persistent action group.
460+
"""Execute a non-persistent action group directly (internal method).
441461
442462
The executed action group does not have to be persisted on the server before use.
443463
Per-session rate-limit : 1 calls per 28min 48s period for all operations of the same category (exec)
@@ -462,6 +482,48 @@ async def execute_action_group(
462482

463483
return cast(str, response["execId"])
464484

485+
async def execute_action_group(
486+
self,
487+
actions: list[Action],
488+
mode: CommandMode | None = None,
489+
label: str | None = "python-overkiz-api",
490+
) -> str | QueuedExecution:
491+
"""Execute a non-persistent action group.
492+
493+
If action queue is enabled, actions will be batched with other actions
494+
executed within the configured delay window. Returns a QueuedExecution
495+
that can be awaited to get the exec_id.
496+
497+
If action queue is disabled, executes immediately and returns exec_id directly.
498+
499+
:param actions: List of actions to execute
500+
:param mode: Command mode (GEOLOCATED, INTERNAL, HIGH_PRIORITY, or None)
501+
:param label: Label for the action group
502+
:return: exec_id string (if queue disabled) or QueuedExecution (if queue enabled)
503+
"""
504+
if self._action_queue:
505+
return await self._action_queue.add(actions, mode, label)
506+
else:
507+
return await self._execute_action_group_direct(actions, mode, label)
508+
509+
async def flush_action_queue(self) -> None:
510+
"""Force flush all pending actions in the queue immediately.
511+
512+
If action queue is disabled, this method does nothing.
513+
If there are no pending actions, this method does nothing.
514+
"""
515+
if self._action_queue:
516+
await self._action_queue.flush()
517+
518+
def get_pending_actions_count(self) -> int:
519+
"""Get the number of actions currently waiting in the queue.
520+
521+
Returns 0 if action queue is disabled.
522+
"""
523+
if self._action_queue:
524+
return self._action_queue.get_pending_count()
525+
return 0
526+
465527
@retry_on_auth_error
466528
async def cancel_command(self, exec_id: str) -> None:
467529
"""Cancel a running setup-level execution."""

0 commit comments

Comments
 (0)