Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 109 additions & 58 deletions pyoverkiz/action_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import asyncio
import contextlib
from collections.abc import Callable, Coroutine
from typing import TYPE_CHECKING

Expand All @@ -23,11 +24,15 @@ def set_result(self, exec_id: str) -> None:
if not self._future.done():
self._future.set_result(exec_id)

def set_exception(self, exception: Exception) -> None:
def set_exception(self, exception: BaseException) -> None:
"""Set an exception if the batch execution failed."""
if not self._future.done():
self._future.set_exception(exception)

def is_done(self) -> bool:
"""Check if the execution has completed (either with result or exception)."""
return self._future.done()

def __await__(self):
"""Make this awaitable."""
return self._future.__await__()
Expand Down Expand Up @@ -84,64 +89,92 @@ async def add(
:param label: Label for the action group
:return: QueuedExecution that resolves to exec_id when batch executes
"""
batch_to_execute = None

async with self._lock:
# If mode or label changes, flush existing queue first
if self._pending_actions and (
mode != self._pending_mode or label != self._pending_label
):
await self._flush_now()
batch_to_execute = self._prepare_flush()

# Add actions to pending queue
self._pending_actions.extend(actions)
self._pending_mode = mode
self._pending_label = label

# Create waiter for this caller
# Create waiter for this caller. This waiter is added to the current
# batch being built, even if we flushed a previous batch above due to
# a mode/label change. This ensures the waiter belongs to the batch
# containing the actions we just added.
waiter = QueuedExecution()
self._pending_waiters.append(waiter)

# If we hit max actions, flush immediately
if len(self._pending_actions) >= self._max_actions:
await self._flush_now()
else:
# Prepare the current batch for flushing (which includes the actions
# we just added). If we already flushed due to mode change, this is
# a second batch.
new_batch = self._prepare_flush()
# Execute the first batch if it exists, then the second
if batch_to_execute:
await self._execute_batch(*batch_to_execute)
batch_to_execute = new_batch
elif self._flush_task is None or self._flush_task.done():
# Schedule delayed flush if not already scheduled
if self._flush_task is None or self._flush_task.done():
self._flush_task = asyncio.create_task(self._delayed_flush())
self._flush_task = asyncio.create_task(self._delayed_flush())

# Execute batch outside the lock if we flushed
if batch_to_execute:
await self._execute_batch(*batch_to_execute)

return waiter
return waiter

async def _delayed_flush(self) -> None:
"""Wait for the delay period, then flush the queue."""
await asyncio.sleep(self._delay)
async with self._lock:
if not self._pending_actions:
return

# Take snapshot and clear state while holding lock
actions = self._pending_actions
mode = self._pending_mode
label = self._pending_label
waiters = self._pending_waiters

self._pending_actions = []
self._pending_mode = None
self._pending_label = None
self._pending_waiters = []
self._flush_task = None

# Execute outside the lock
waiters: list[QueuedExecution] = []
try:
exec_id = await self._executor(actions, mode, label)
for waiter in waiters:
waiter.set_result(exec_id)
except Exception as exc:
await asyncio.sleep(self._delay)
async with self._lock:
if not self._pending_actions:
return

# Take snapshot and clear state while holding lock
actions = self._pending_actions
mode = self._pending_mode
label = self._pending_label
waiters = self._pending_waiters

self._pending_actions = []
self._pending_mode = None
self._pending_label = None
self._pending_waiters = []
self._flush_task = None

# Execute outside the lock
try:
exec_id = await self._executor(actions, mode, label)
for waiter in waiters:
waiter.set_result(exec_id)
except Exception as exc:
for waiter in waiters:
waiter.set_exception(exc)
except asyncio.CancelledError as exc:
# Ensure all waiters are notified if this task is cancelled
for waiter in waiters:
waiter.set_exception(exc)
raise

def _prepare_flush(
self,
) -> tuple[list[Action], CommandMode | None, str | None, list[QueuedExecution]]:
"""Prepare a flush by taking snapshot and clearing state (must be called with lock held).

async def _flush_now(self) -> None:
"""Execute pending actions immediately (must be called with lock held)."""
Returns a tuple of (actions, mode, label, waiters) that should be executed
outside the lock using _execute_batch().
"""
if not self._pending_actions:
return
return ([], None, None, [])

# Cancel any pending flush task
if self._flush_task and not self._flush_task.done():
Expand All @@ -160,8 +193,19 @@ async def _flush_now(self) -> None:
self._pending_label = None
self._pending_waiters = []

# Execute the batch (must release lock before calling executor to avoid deadlock)
# Note: This is called within a lock context, we'll execute outside
return (actions, mode, label, waiters)

async def _execute_batch(
self,
actions: list[Action],
mode: CommandMode | None,
label: str | None,
waiters: list[QueuedExecution],
) -> None:
"""Execute a batch of actions and notify waiters (must be called without lock)."""
if not actions:
return

try:
exec_id = await self._executor(actions, mode, label)
# Notify all waiters
Expand All @@ -173,42 +217,49 @@ async def _flush_now(self) -> None:
waiter.set_exception(exc)
raise

async def flush(self) -> list[str]:
async def flush(self) -> None:
"""Force flush all pending actions immediately.

:return: List of exec_ids from flushed batches
This method forces the queue to execute any pending batched actions
without waiting for the delay timer. The execution results are delivered
to the corresponding QueuedExecution objects returned by add().

This method is useful for forcing immediate execution without having to
wait for the delay timer to expire.
"""
batch_to_execute = None
async with self._lock:
if not self._pending_actions:
return []

# Since we can only have one batch pending at a time,
# this will return a single exec_id in a list
exec_ids: list[str] = []
if self._pending_actions:
batch_to_execute = self._prepare_flush()

try:
await self._flush_now()
# If flush succeeded, we can't actually return the exec_id here
# since it's delivered via the waiters. This method is mainly
# for forcing a flush, not retrieving results.
# Return empty list to indicate flush completed
except Exception:
# If flush fails, the exception will be propagated to waiters
# and also raised here
raise

return exec_ids
# Execute outside the lock
if batch_to_execute:
await self._execute_batch(*batch_to_execute)

def get_pending_count(self) -> int:
"""Get the number of actions currently waiting in the queue."""
"""Get the (approximate) number of actions currently waiting in the queue.

This method does not acquire the internal lock and therefore returns a
best-effort snapshot that may be slightly out of date if the queue is
being modified concurrently by other coroutines.
"""
return len(self._pending_actions)

async def shutdown(self) -> None:
"""Shutdown the queue, flushing any pending actions."""
batch_to_execute = None
async with self._lock:
if self._flush_task and not self._flush_task.done():
self._flush_task.cancel()
task = self._flush_task
task.cancel()
self._flush_task = None
# Wait for cancellation to complete
with contextlib.suppress(asyncio.CancelledError):
await task

if self._pending_actions:
await self._flush_now()
batch_to_execute = self._prepare_flush()

# Execute outside the lock
if batch_to_execute:
await self._execute_batch(*batch_to_execute)
32 changes: 23 additions & 9 deletions pyoverkiz/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from botocore.config import Config
from warrant_lite import WarrantLite

from pyoverkiz.action_queue import ActionQueue, QueuedExecution
from pyoverkiz.action_queue import ActionQueue
from pyoverkiz.const import (
COZYTOUCH_ATLANTIC_API,
COZYTOUCH_CLIENT_ID,
Expand Down Expand Up @@ -191,7 +191,7 @@ def __init__(
:param session: optional ClientSession
:param action_queue_enabled: enable action batching queue (default False)
:param action_queue_delay: seconds to wait before flushing queue (default 0.5)
:param action_queue_max_actions: max actions per batch (default 20)
:param action_queue_max_actions: maximum actions per batch before auto-flush (default 20)
"""
self.username = username
self.password = password
Expand All @@ -208,6 +208,11 @@ def __init__(

# Initialize action queue if enabled
if action_queue_enabled:
if action_queue_delay <= 0:
raise ValueError("action_queue_delay must be positive")
if action_queue_max_actions < 1:
raise ValueError("action_queue_max_actions must be at least 1")

self._action_queue = ActionQueue(
executor=self._execute_action_group_direct,
delay=action_queue_delay,
Expand Down Expand Up @@ -689,22 +694,31 @@ async def execute_action_group(
actions: list[Action],
mode: CommandMode | None = None,
label: str | None = "python-overkiz-api",
) -> str | QueuedExecution:
) -> str:
"""Execute a non-persistent action group.

If action queue is enabled, actions will be batched with other actions
executed within the configured delay window. Returns a QueuedExecution
that can be awaited to get the exec_id.
When action queue is enabled, actions will be batched with other actions
executed within the configured delay window. The method will wait for the
batch to execute and return the exec_id.

When action queue is disabled, executes immediately and returns exec_id.

If action queue is disabled, executes immediately and returns exec_id directly.
The API is consistent regardless of queue configuration - always returns
exec_id string directly.

:param actions: List of actions to execute
:param mode: Command mode (GEOLOCATED, INTERNAL, HIGH_PRIORITY, or None)
:param label: Label for the action group
:return: exec_id string (if queue disabled) or QueuedExecution (if queue enabled)
:return: exec_id string from the executed action group

Example usage::

# Works the same with or without queue
exec_id = await client.execute_action_group([action])
"""
if self._action_queue:
return await self._action_queue.add(actions, mode, label)
queued = await self._action_queue.add(actions, mode, label)
return await queued
else:
return await self._execute_action_group_direct(actions, mode, label)

Expand Down
37 changes: 17 additions & 20 deletions test_queue_example.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# mypy: ignore-errors
# ty: ignore
# type: ignore

"""Simple example demonstrating the action queue feature."""

Expand All @@ -26,7 +26,7 @@ async def example_without_queue():
)

# Create some example actions
_action1 = Action(
Action(
device_url="io://1234-5678-9012/12345678",
commands=[Command(name=OverkizCommand.CLOSE)],
)
Expand Down Expand Up @@ -71,28 +71,22 @@ async def example_with_queue():

# These will be queued and batched together!
print("Queueing action 1...")
queued1 = await client.execute_action_group([action1])
print(f"Got QueuedExecution object: {queued1}")
exec_id1 = await client.execute_action_group([action1])
print(f"Got exec_id: {exec_id1}")

print("Queueing action 2...")
_queued2 = await client.execute_action_group([action2])
exec_id2 = await client.execute_action_group([action2])

print("Queueing action 3...")
_queued3 = await client.execute_action_group([action3])
exec_id3 = await client.execute_action_group([action3])
Comment on lines 73 to +81
Copy link

Copilot AI Jan 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example demonstrates sequential execution rather than batching. With the simplified API where execute_action_group awaits the result, each call blocks until the batch executes. Since the calls are sequential (awaited one at a time), each action will execute separately after the 0.5s delay, rather than batching together.

To properly demonstrate batching, the calls should be started concurrently using asyncio.create_task(), similar to the manual flush example (lines 114-128) and the integration tests. Otherwise, the actions will execute one at a time and won't be batched together, making the example misleading about the queue's batching capability.

Copilot uses AI. Check for mistakes.

print(f"Pending actions in queue: {client.get_pending_actions_count()}")
Copy link

Copilot AI Jan 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion will likely show 0 pending actions, not 3, because all three execute_action_group calls above have already been awaited sequentially. Each action was executed (after the 0.5s delay) before the next one was queued. The pending count check should be moved to before awaiting the results, similar to how it's done in the integration tests and manual flush example.

Copilot uses AI. Check for mistakes.

# Wait for all actions to execute (they'll be batched together)
print("\nWaiting for batch to execute...")
# exec_id1 = await queued1
# exec_id2 = await queued2
# exec_id3 = await queued3

# All three will have the same exec_id since they were batched together!
# print(f"Exec ID 1: {exec_id1}")
# print(f"Exec ID 2: {exec_id2}")
# print(f"Exec ID 3: {exec_id3}")
# print(f"All same? {exec_id1 == exec_id2 == exec_id3}")
print(f"\nExec ID 1: {exec_id1}")
print(f"Exec ID 2: {exec_id2}")
print(f"Exec ID 3: {exec_id3}")
print(f"All same? {exec_id1 == exec_id2 == exec_id3}")
Copy link

Copilot AI Jan 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comparison will likely evaluate to False because the three actions were executed sequentially (not batched together), so they would have different exec_ids from separate API calls. The actions only batch if they are queued concurrently within the delay window. Since each execute_action_group call was awaited before starting the next one, they executed as separate batches.

To make this comparison work as intended, the three execute_action_group calls should be started concurrently using asyncio.create_task() and then awaited together.

Copilot uses AI. Check for mistakes.

print("\nWith queue: Multiple actions batched into single API request!")
await client.close()
Expand All @@ -116,8 +110,11 @@ async def example_manual_flush():
)

print("Queueing action with 10s delay...")
_queued = await client.execute_action_group([action])
# Start execution in background (don't await yet)
exec_task = asyncio.create_task(client.execute_action_group([action]))

# Give it a moment to queue
await asyncio.sleep(0.1)
print(f"Pending actions: {client.get_pending_actions_count()}")

# Don't want to wait 10 seconds? Flush manually!
Expand All @@ -126,9 +123,9 @@ async def example_manual_flush():

print(f"Pending actions after flush: {client.get_pending_actions_count()}")

# Now we can await the result
# exec_id = await queued
# print(f"Got exec_id: {exec_id}")
# Now get the result
exec_id = await exec_task
print(f"Got exec_id: {exec_id}")

await client.close()

Expand Down
Loading