Skip to content

Commit fc25ecc

Browse files
committed
Implement ActionQueue for batching actions in OverkizClient and add integration tests
1 parent 9b2bc1e commit fc25ecc

File tree

5 files changed

+928
-5
lines changed

5 files changed

+928
-5
lines changed

pyoverkiz/action_queue.py

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
"""Action queue for batching multiple action executions into single API calls."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
from collections.abc import Callable, Coroutine
7+
from typing import TYPE_CHECKING
8+
9+
if TYPE_CHECKING:
10+
from pyoverkiz.enums import CommandMode
11+
from pyoverkiz.models import Action
12+
13+
14+
class QueuedExecution:
15+
"""Represents a queued action execution that will resolve to an exec_id when the batch executes."""
16+
17+
def __init__(self) -> None:
18+
self._future: asyncio.Future[str] = asyncio.Future()
19+
20+
def set_result(self, exec_id: str) -> None:
21+
"""Set the execution ID result."""
22+
if not self._future.done():
23+
self._future.set_result(exec_id)
24+
25+
def set_exception(self, exception: Exception) -> None:
26+
"""Set an exception if the batch execution failed."""
27+
if not self._future.done():
28+
self._future.set_exception(exception)
29+
30+
def __await__(self):
31+
"""Make this awaitable."""
32+
return self._future.__await__()
33+
34+
35+
class ActionQueue:
36+
"""
37+
Batches multiple action executions into single API calls.
38+
39+
When actions are added, they are held for a configurable delay period.
40+
If more actions arrive during this window, they are batched together.
41+
The batch is flushed when:
42+
- The delay timer expires
43+
- The max actions limit is reached
44+
- The command mode changes
45+
- Manual flush is requested
46+
"""
47+
48+
def __init__(
49+
self,
50+
executor: Callable[
51+
[list[Action], CommandMode | None, str | None], Coroutine[None, None, str]
52+
],
53+
delay: float = 0.5,
54+
max_actions: int = 20,
55+
) -> None:
56+
"""
57+
Initialize the action queue.
58+
59+
:param executor: Async function to execute batched actions
60+
:param delay: Seconds to wait before auto-flushing (default 0.5)
61+
:param max_actions: Maximum actions per batch before forced flush (default 20)
62+
"""
63+
self._executor = executor
64+
self._delay = delay
65+
self._max_actions = max_actions
66+
67+
self._pending_actions: list[Action] = []
68+
self._pending_mode: CommandMode | None = None
69+
self._pending_label: str | None = None
70+
self._pending_waiters: list[QueuedExecution] = []
71+
72+
self._flush_task: asyncio.Task[None] | None = None
73+
self._lock = asyncio.Lock()
74+
75+
async def add(
76+
self,
77+
actions: list[Action],
78+
mode: CommandMode | None = None,
79+
label: str | None = None,
80+
) -> QueuedExecution:
81+
"""
82+
Add actions to the queue.
83+
84+
:param actions: Actions to queue
85+
:param mode: Command mode (will flush if different from pending mode)
86+
:param label: Label for the action group
87+
:return: QueuedExecution that resolves to exec_id when batch executes
88+
"""
89+
async with self._lock:
90+
# If mode or label changes, flush existing queue first
91+
if self._pending_actions and (
92+
mode != self._pending_mode or label != self._pending_label
93+
):
94+
await self._flush_now()
95+
96+
# Add actions to pending queue
97+
self._pending_actions.extend(actions)
98+
self._pending_mode = mode
99+
self._pending_label = label
100+
101+
# Create waiter for this caller
102+
waiter = QueuedExecution()
103+
self._pending_waiters.append(waiter)
104+
105+
# If we hit max actions, flush immediately
106+
if len(self._pending_actions) >= self._max_actions:
107+
await self._flush_now()
108+
else:
109+
# Schedule delayed flush if not already scheduled
110+
if self._flush_task is None or self._flush_task.done():
111+
self._flush_task = asyncio.create_task(self._delayed_flush())
112+
113+
return waiter
114+
115+
async def _delayed_flush(self) -> None:
116+
"""Wait for the delay period, then flush the queue."""
117+
await asyncio.sleep(self._delay)
118+
async with self._lock:
119+
if not self._pending_actions:
120+
return
121+
122+
# Take snapshot and clear state while holding lock
123+
actions = self._pending_actions
124+
mode = self._pending_mode
125+
label = self._pending_label
126+
waiters = self._pending_waiters
127+
128+
self._pending_actions = []
129+
self._pending_mode = None
130+
self._pending_label = None
131+
self._pending_waiters = []
132+
self._flush_task = None
133+
134+
# Execute outside the lock
135+
try:
136+
exec_id = await self._executor(actions, mode, label)
137+
for waiter in waiters:
138+
waiter.set_result(exec_id)
139+
except Exception as exc:
140+
for waiter in waiters:
141+
waiter.set_exception(exc)
142+
143+
async def _flush_now(self) -> None:
144+
"""Execute pending actions immediately (must be called with lock held)."""
145+
if not self._pending_actions:
146+
return
147+
148+
# Cancel any pending flush task
149+
if self._flush_task and not self._flush_task.done():
150+
self._flush_task.cancel()
151+
self._flush_task = None
152+
153+
# Take snapshot of current batch
154+
actions = self._pending_actions
155+
mode = self._pending_mode
156+
label = self._pending_label
157+
waiters = self._pending_waiters
158+
159+
# Clear pending state
160+
self._pending_actions = []
161+
self._pending_mode = None
162+
self._pending_label = None
163+
self._pending_waiters = []
164+
165+
# Execute the batch (must release lock before calling executor to avoid deadlock)
166+
# Note: This is called within a lock context, we'll execute outside
167+
try:
168+
exec_id = await self._executor(actions, mode, label)
169+
# Notify all waiters
170+
for waiter in waiters:
171+
waiter.set_result(exec_id)
172+
except Exception as exc:
173+
# Propagate exception to all waiters
174+
for waiter in waiters:
175+
waiter.set_exception(exc)
176+
raise
177+
178+
async def flush(self) -> list[str]:
179+
"""
180+
Force flush all pending actions immediately.
181+
182+
:return: List of exec_ids from flushed batches
183+
"""
184+
async with self._lock:
185+
if not self._pending_actions:
186+
return []
187+
188+
# Since we can only have one batch pending at a time,
189+
# this will return a single exec_id in a list
190+
exec_ids: list[str] = []
191+
192+
try:
193+
await self._flush_now()
194+
# If flush succeeded, we can't actually return the exec_id here
195+
# since it's delivered via the waiters. This method is mainly
196+
# for forcing a flush, not retrieving results.
197+
# Return empty list to indicate flush completed
198+
except Exception:
199+
# If flush fails, the exception will be propagated to waiters
200+
# and also raised here
201+
raise
202+
203+
return exec_ids
204+
205+
def get_pending_count(self) -> int:
206+
"""Get the number of actions currently waiting in the queue."""
207+
return len(self._pending_actions)
208+
209+
async def shutdown(self) -> None:
210+
"""Shutdown the queue, flushing any pending actions."""
211+
async with self._lock:
212+
if self._flush_task and not self._flush_task.done():
213+
self._flush_task.cancel()
214+
self._flush_task = None
215+
216+
if self._pending_actions:
217+
await self._flush_now()

pyoverkiz/client.py

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from botocore.config import Config
2626
from warrant_lite import WarrantLite
2727

28+
from pyoverkiz.action_queue import ActionQueue, QueuedExecution
2829
from pyoverkiz.const import (
2930
COZYTOUCH_ATLANTIC_API,
3031
COZYTOUCH_CLIENT_ID,
@@ -163,6 +164,7 @@ class OverkizClient:
163164
_expires_in: datetime.datetime | None = None
164165
_access_token: str | None = None
165166
_ssl: ssl.SSLContext | bool = True
167+
_action_queue: ActionQueue | None = None
166168

167169
def __init__(
168170
self,
@@ -172,14 +174,22 @@ def __init__(
172174
verify_ssl: bool = True,
173175
token: str | None = None,
174176
session: ClientSession | None = None,
177+
action_queue_enabled: bool = False,
178+
action_queue_delay: float = 0.5,
179+
action_queue_max_actions: int = 20,
175180
) -> None:
176181
"""
177182
Constructor
178183
179184
:param username: the username
180185
:param password: the password
181186
:param server: OverkizServer
187+
:param verify_ssl: whether to verify SSL certificates
188+
:param token: optional access token
182189
:param session: optional ClientSession
190+
:param action_queue_enabled: enable action batching queue (default False)
191+
:param action_queue_delay: seconds to wait before flushing queue (default 0.5)
192+
:param action_queue_max_actions: max actions per batch (default 20)
183193
"""
184194

185195
self.username = username
@@ -195,6 +205,14 @@ def __init__(
195205
self.session = session if session else ClientSession()
196206
self._ssl = verify_ssl
197207

208+
# Initialize action queue if enabled
209+
if action_queue_enabled:
210+
self._action_queue = ActionQueue(
211+
executor=self._execute_action_group_direct,
212+
delay=action_queue_delay,
213+
max_actions=action_queue_max_actions,
214+
)
215+
198216
if LOCAL_API_PATH in self.server.endpoint:
199217
self.api_type = APIType.LOCAL
200218

@@ -222,6 +240,10 @@ async def __aexit__(
222240

223241
async def close(self) -> None:
224242
"""Close the session."""
243+
# Flush any pending actions in queue
244+
if self._action_queue:
245+
await self._action_queue.shutdown()
246+
225247
if self.event_listener_id:
226248
await self.unregister_event_listener()
227249

@@ -646,21 +668,18 @@ async def get_api_version(self) -> str:
646668

647669
@retry_on_too_many_executions
648670
@retry_on_auth_error
649-
async def execute_action_group(
671+
async def _execute_action_group_direct(
650672
self,
651673
actions: list[Action],
652674
mode: CommandMode | None = None,
653675
label: str | None = "python-overkiz-api",
654676
) -> str:
655677
"""
656-
Execute a non-persistent action group
678+
Execute a non-persistent action group directly (internal method).
657679
658680
The executed action group does not have to be persisted on the server before use.
659681
Per-session rate-limit : 1 calls per 28min 48s period for all operations of the same category (exec)
660682
"""
661-
662-
"""Send several commands in one call"""
663-
664683
# Build a logical (snake_case) payload using model helpers and convert it
665684
# to the exact JSON schema expected by the API (camelCase + small fixes).
666685
payload = {"label": label, "actions": [a.to_payload() for a in actions]}
@@ -681,6 +700,51 @@ async def execute_action_group(
681700

682701
return cast(str, response["execId"])
683702

703+
async def execute_action_group(
704+
self,
705+
actions: list[Action],
706+
mode: CommandMode | None = None,
707+
label: str | None = "python-overkiz-api",
708+
) -> str | QueuedExecution:
709+
"""
710+
Execute a non-persistent action group.
711+
712+
If action queue is enabled, actions will be batched with other actions
713+
executed within the configured delay window. Returns a QueuedExecution
714+
that can be awaited to get the exec_id.
715+
716+
If action queue is disabled, executes immediately and returns exec_id directly.
717+
718+
:param actions: List of actions to execute
719+
:param mode: Command mode (GEOLOCATED, INTERNAL, HIGH_PRIORITY, or None)
720+
:param label: Label for the action group
721+
:return: exec_id string (if queue disabled) or QueuedExecution (if queue enabled)
722+
"""
723+
if self._action_queue:
724+
return await self._action_queue.add(actions, mode, label)
725+
else:
726+
return await self._execute_action_group_direct(actions, mode, label)
727+
728+
async def flush_action_queue(self) -> None:
729+
"""
730+
Force flush all pending actions in the queue immediately.
731+
732+
If action queue is disabled, this method does nothing.
733+
If there are no pending actions, this method does nothing.
734+
"""
735+
if self._action_queue:
736+
await self._action_queue.flush()
737+
738+
def get_pending_actions_count(self) -> int:
739+
"""
740+
Get the number of actions currently waiting in the queue.
741+
742+
Returns 0 if action queue is disabled.
743+
"""
744+
if self._action_queue:
745+
return self._action_queue.get_pending_count()
746+
return 0
747+
684748
@retry_on_auth_error
685749
async def cancel_command(self, exec_id: str) -> None:
686750
"""Cancel a running setup-level execution"""

0 commit comments

Comments
 (0)