diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py index 6d01aab2..118c5245 100644 --- a/pyoverkiz/action_queue.py +++ b/pyoverkiz/action_queue.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import contextlib from collections.abc import Callable, Coroutine from typing import TYPE_CHECKING @@ -23,11 +24,15 @@ def set_result(self, exec_id: str) -> None: if not self._future.done(): self._future.set_result(exec_id) - def set_exception(self, exception: Exception) -> None: + def set_exception(self, exception: BaseException) -> None: """Set an exception if the batch execution failed.""" if not self._future.done(): self._future.set_exception(exception) + def is_done(self) -> bool: + """Check if the execution has completed (either with result or exception).""" + return self._future.done() + def __await__(self): """Make this awaitable.""" return self._future.__await__() @@ -84,64 +89,92 @@ async def add( :param label: Label for the action group :return: QueuedExecution that resolves to exec_id when batch executes """ + batch_to_execute = None + async with self._lock: # If mode or label changes, flush existing queue first if self._pending_actions and ( mode != self._pending_mode or label != self._pending_label ): - await self._flush_now() + batch_to_execute = self._prepare_flush() # Add actions to pending queue self._pending_actions.extend(actions) self._pending_mode = mode self._pending_label = label - # Create waiter for this caller + # Create waiter for this caller. This waiter is added to the current + # batch being built, even if we flushed a previous batch above due to + # a mode/label change. This ensures the waiter belongs to the batch + # containing the actions we just added. waiter = QueuedExecution() self._pending_waiters.append(waiter) # If we hit max actions, flush immediately if len(self._pending_actions) >= self._max_actions: - await self._flush_now() - else: + # Prepare the current batch for flushing (which includes the actions + # we just added). If we already flushed due to mode change, this is + # a second batch. + new_batch = self._prepare_flush() + # Execute the first batch if it exists, then the second + if batch_to_execute: + await self._execute_batch(*batch_to_execute) + batch_to_execute = new_batch + elif self._flush_task is None or self._flush_task.done(): # Schedule delayed flush if not already scheduled - if self._flush_task is None or self._flush_task.done(): - self._flush_task = asyncio.create_task(self._delayed_flush()) + self._flush_task = asyncio.create_task(self._delayed_flush()) + + # Execute batch outside the lock if we flushed + if batch_to_execute: + await self._execute_batch(*batch_to_execute) - return waiter + return waiter async def _delayed_flush(self) -> None: """Wait for the delay period, then flush the queue.""" - await asyncio.sleep(self._delay) - async with self._lock: - if not self._pending_actions: - return - - # Take snapshot and clear state while holding lock - actions = self._pending_actions - mode = self._pending_mode - label = self._pending_label - waiters = self._pending_waiters - - self._pending_actions = [] - self._pending_mode = None - self._pending_label = None - self._pending_waiters = [] - self._flush_task = None - - # Execute outside the lock + waiters: list[QueuedExecution] = [] try: - exec_id = await self._executor(actions, mode, label) - for waiter in waiters: - waiter.set_result(exec_id) - except Exception as exc: + await asyncio.sleep(self._delay) + async with self._lock: + if not self._pending_actions: + return + + # Take snapshot and clear state while holding lock + actions = self._pending_actions + mode = self._pending_mode + label = self._pending_label + waiters = self._pending_waiters + + self._pending_actions = [] + self._pending_mode = None + self._pending_label = None + self._pending_waiters = [] + self._flush_task = None + + # Execute outside the lock + try: + exec_id = await self._executor(actions, mode, label) + for waiter in waiters: + waiter.set_result(exec_id) + except Exception as exc: + for waiter in waiters: + waiter.set_exception(exc) + except asyncio.CancelledError as exc: + # Ensure all waiters are notified if this task is cancelled for waiter in waiters: waiter.set_exception(exc) + raise + + def _prepare_flush( + self, + ) -> tuple[list[Action], CommandMode | None, str | None, list[QueuedExecution]]: + """Prepare a flush by taking snapshot and clearing state (must be called with lock held). - async def _flush_now(self) -> None: - """Execute pending actions immediately (must be called with lock held).""" + Returns a tuple of (actions, mode, label, waiters) that should be executed + outside the lock using _execute_batch(). + """ if not self._pending_actions: - return + return ([], None, None, []) # Cancel any pending flush task if self._flush_task and not self._flush_task.done(): @@ -160,8 +193,19 @@ async def _flush_now(self) -> None: self._pending_label = None self._pending_waiters = [] - # Execute the batch (must release lock before calling executor to avoid deadlock) - # Note: This is called within a lock context, we'll execute outside + return (actions, mode, label, waiters) + + async def _execute_batch( + self, + actions: list[Action], + mode: CommandMode | None, + label: str | None, + waiters: list[QueuedExecution], + ) -> None: + """Execute a batch of actions and notify waiters (must be called without lock).""" + if not actions: + return + try: exec_id = await self._executor(actions, mode, label) # Notify all waiters @@ -173,42 +217,49 @@ async def _flush_now(self) -> None: waiter.set_exception(exc) raise - async def flush(self) -> list[str]: + async def flush(self) -> None: """Force flush all pending actions immediately. - :return: List of exec_ids from flushed batches + This method forces the queue to execute any pending batched actions + without waiting for the delay timer. The execution results are delivered + to the corresponding QueuedExecution objects returned by add(). + + This method is useful for forcing immediate execution without having to + wait for the delay timer to expire. """ + batch_to_execute = None async with self._lock: - if not self._pending_actions: - return [] - - # Since we can only have one batch pending at a time, - # this will return a single exec_id in a list - exec_ids: list[str] = [] + if self._pending_actions: + batch_to_execute = self._prepare_flush() - try: - await self._flush_now() - # If flush succeeded, we can't actually return the exec_id here - # since it's delivered via the waiters. This method is mainly - # for forcing a flush, not retrieving results. - # Return empty list to indicate flush completed - except Exception: - # If flush fails, the exception will be propagated to waiters - # and also raised here - raise - - return exec_ids + # Execute outside the lock + if batch_to_execute: + await self._execute_batch(*batch_to_execute) def get_pending_count(self) -> int: - """Get the number of actions currently waiting in the queue.""" + """Get the (approximate) number of actions currently waiting in the queue. + + This method does not acquire the internal lock and therefore returns a + best-effort snapshot that may be slightly out of date if the queue is + being modified concurrently by other coroutines. + """ return len(self._pending_actions) async def shutdown(self) -> None: """Shutdown the queue, flushing any pending actions.""" + batch_to_execute = None async with self._lock: if self._flush_task and not self._flush_task.done(): - self._flush_task.cancel() + task = self._flush_task + task.cancel() self._flush_task = None + # Wait for cancellation to complete + with contextlib.suppress(asyncio.CancelledError): + await task if self._pending_actions: - await self._flush_now() + batch_to_execute = self._prepare_flush() + + # Execute outside the lock + if batch_to_execute: + await self._execute_batch(*batch_to_execute) diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index e9dc3baa..6e5692d3 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -26,7 +26,7 @@ from botocore.config import Config from warrant_lite import WarrantLite -from pyoverkiz.action_queue import ActionQueue, QueuedExecution +from pyoverkiz.action_queue import ActionQueue from pyoverkiz.const import ( COZYTOUCH_ATLANTIC_API, COZYTOUCH_CLIENT_ID, @@ -191,7 +191,7 @@ def __init__( :param session: optional ClientSession :param action_queue_enabled: enable action batching queue (default False) :param action_queue_delay: seconds to wait before flushing queue (default 0.5) - :param action_queue_max_actions: max actions per batch (default 20) + :param action_queue_max_actions: maximum actions per batch before auto-flush (default 20) """ self.username = username self.password = password @@ -208,6 +208,11 @@ def __init__( # Initialize action queue if enabled if action_queue_enabled: + if action_queue_delay <= 0: + raise ValueError("action_queue_delay must be positive") + if action_queue_max_actions < 1: + raise ValueError("action_queue_max_actions must be at least 1") + self._action_queue = ActionQueue( executor=self._execute_action_group_direct, delay=action_queue_delay, @@ -689,22 +694,31 @@ async def execute_action_group( actions: list[Action], mode: CommandMode | None = None, label: str | None = "python-overkiz-api", - ) -> str | QueuedExecution: + ) -> str: """Execute a non-persistent action group. - If action queue is enabled, actions will be batched with other actions - executed within the configured delay window. Returns a QueuedExecution - that can be awaited to get the exec_id. + When action queue is enabled, actions will be batched with other actions + executed within the configured delay window. The method will wait for the + batch to execute and return the exec_id. + + When action queue is disabled, executes immediately and returns exec_id. - If action queue is disabled, executes immediately and returns exec_id directly. + The API is consistent regardless of queue configuration - always returns + exec_id string directly. :param actions: List of actions to execute :param mode: Command mode (GEOLOCATED, INTERNAL, HIGH_PRIORITY, or None) :param label: Label for the action group - :return: exec_id string (if queue disabled) or QueuedExecution (if queue enabled) + :return: exec_id string from the executed action group + + Example usage:: + + # Works the same with or without queue + exec_id = await client.execute_action_group([action]) """ if self._action_queue: - return await self._action_queue.add(actions, mode, label) + queued = await self._action_queue.add(actions, mode, label) + return await queued else: return await self._execute_action_group_direct(actions, mode, label) diff --git a/test_queue_example.py b/test_queue_example.py index beafe6a0..b39e64eb 100644 --- a/test_queue_example.py +++ b/test_queue_example.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # mypy: ignore-errors -# ty: ignore +# type: ignore """Simple example demonstrating the action queue feature.""" @@ -26,7 +26,7 @@ async def example_without_queue(): ) # Create some example actions - _action1 = Action( + Action( device_url="io://1234-5678-9012/12345678", commands=[Command(name=OverkizCommand.CLOSE)], ) @@ -71,28 +71,22 @@ async def example_with_queue(): # These will be queued and batched together! print("Queueing action 1...") - queued1 = await client.execute_action_group([action1]) - print(f"Got QueuedExecution object: {queued1}") + exec_id1 = await client.execute_action_group([action1]) + print(f"Got exec_id: {exec_id1}") print("Queueing action 2...") - _queued2 = await client.execute_action_group([action2]) + exec_id2 = await client.execute_action_group([action2]) print("Queueing action 3...") - _queued3 = await client.execute_action_group([action3]) + exec_id3 = await client.execute_action_group([action3]) print(f"Pending actions in queue: {client.get_pending_actions_count()}") - # Wait for all actions to execute (they'll be batched together) - print("\nWaiting for batch to execute...") - # exec_id1 = await queued1 - # exec_id2 = await queued2 - # exec_id3 = await queued3 - # All three will have the same exec_id since they were batched together! - # print(f"Exec ID 1: {exec_id1}") - # print(f"Exec ID 2: {exec_id2}") - # print(f"Exec ID 3: {exec_id3}") - # print(f"All same? {exec_id1 == exec_id2 == exec_id3}") + print(f"\nExec ID 1: {exec_id1}") + print(f"Exec ID 2: {exec_id2}") + print(f"Exec ID 3: {exec_id3}") + print(f"All same? {exec_id1 == exec_id2 == exec_id3}") print("\nWith queue: Multiple actions batched into single API request!") await client.close() @@ -116,8 +110,11 @@ async def example_manual_flush(): ) print("Queueing action with 10s delay...") - _queued = await client.execute_action_group([action]) + # Start execution in background (don't await yet) + exec_task = asyncio.create_task(client.execute_action_group([action])) + # Give it a moment to queue + await asyncio.sleep(0.1) print(f"Pending actions: {client.get_pending_actions_count()}") # Don't want to wait 10 seconds? Flush manually! @@ -126,9 +123,9 @@ async def example_manual_flush(): print(f"Pending actions after flush: {client.get_pending_actions_count()}") - # Now we can await the result - # exec_id = await queued - # print(f"Got exec_id: {exec_id}") + # Now get the result + exec_id = await exec_task + print(f"Got exec_id: {exec_id}") await client.close() diff --git a/tests/test_action_queue.py b/tests/test_action_queue.py index aea39313..7e131397 100644 --- a/tests/test_action_queue.py +++ b/tests/test_action_queue.py @@ -94,9 +94,9 @@ async def test_action_queue_max_actions_flush(mock_executor): await asyncio.sleep(0.05) # First 3 should be done - assert queued1._future.done() - assert queued2._future.done() - assert queued3._future.done() + assert queued1.is_done() + assert queued2.is_done() + assert queued3.is_done() # Add 2 more - should start a new batch queued4 = await queue.add([actions[3]]) @@ -180,7 +180,7 @@ async def test_action_queue_manual_flush(mock_executor): await queue.flush() # Should be done now - assert queued._future.done() + assert queued.is_done() exec_id = await queued assert exec_id.startswith("exec-1-") @@ -201,7 +201,7 @@ async def test_action_queue_shutdown(mock_executor): await queue.shutdown() # Should be done - assert queued._future.done() + assert queued.is_done() mock_executor.assert_called_once() @@ -263,8 +263,11 @@ async def set_result(): await asyncio.sleep(0.05) queued.set_result("exec-123") - _task = asyncio.create_task(set_result()) # noqa: RUF006 + task = asyncio.create_task(set_result()) # Await the result result = await queued assert result == "exec-123" + + # Ensure background task has completed + await task diff --git a/tests/test_client_queue_integration.py b/tests/test_client_queue_integration.py index 40cd91f6..5376ef7b 100644 --- a/tests/test_client_queue_integration.py +++ b/tests/test_client_queue_integration.py @@ -68,18 +68,21 @@ async def test_client_with_queue_batches_actions(): ) as mock_post: mock_post.return_value = {"execId": "exec-batched"} - # Queue multiple actions quickly - queued1 = await client.execute_action_group([actions[0]]) - queued2 = await client.execute_action_group([actions[1]]) - queued3 = await client.execute_action_group([actions[2]]) + # Queue multiple actions quickly - start them as tasks to allow batching + task1 = asyncio.create_task(client.execute_action_group([actions[0]])) + task2 = asyncio.create_task(client.execute_action_group([actions[1]])) + task3 = asyncio.create_task(client.execute_action_group([actions[2]])) + + # Give them a moment to queue + await asyncio.sleep(0.01) # Should have 3 actions pending assert client.get_pending_actions_count() == 3 # Wait for all to execute - exec_id1 = await queued1 - exec_id2 = await queued2 - exec_id3 = await queued3 + exec_id1 = await task1 + exec_id2 = await task2 + exec_id3 = await task3 # All should have the same exec_id (batched together) assert exec_id1 == exec_id2 == exec_id3 == "exec-batched" @@ -116,7 +119,11 @@ async def test_client_manual_flush(): ) as mock_post: mock_post.return_value = {"execId": "exec-flushed"} - queued = await client.execute_action_group([action]) + # Start execution as a task to allow checking pending count + exec_task = asyncio.create_task(client.execute_action_group([action])) + + # Give it a moment to queue + await asyncio.sleep(0.01) # Should have 1 action pending assert client.get_pending_actions_count() == 1 @@ -127,7 +134,7 @@ async def test_client_manual_flush(): # Should be executed now assert client.get_pending_actions_count() == 0 - exec_id = await queued + exec_id = await exec_task assert exec_id == "exec-flushed" mock_post.assert_called_once() @@ -156,13 +163,17 @@ async def test_client_close_flushes_queue(): ) as mock_post: mock_post.return_value = {"execId": "exec-closed"} - queued = await client.execute_action_group([action]) + # Start execution as a task + exec_task = asyncio.create_task(client.execute_action_group([action])) + + # Give it a moment to queue + await asyncio.sleep(0.01) # Close should flush await client.close() # Should be executed - exec_id = await queued + exec_id = await exec_task assert exec_id == "exec-closed" mock_post.assert_called_once() @@ -193,26 +204,23 @@ async def test_client_queue_respects_max_actions(): ) as mock_post: mock_post.return_value = {"execId": "exec-123"} - # Add 2 actions - should trigger flush - queued1 = await client.execute_action_group([actions[0]]) - queued2 = await client.execute_action_group([actions[1]]) + # Add 2 actions as tasks to trigger flush + task1 = asyncio.create_task(client.execute_action_group([actions[0]])) + task2 = asyncio.create_task(client.execute_action_group([actions[1]])) # Wait a bit for flush await asyncio.sleep(0.05) # First 2 should be done - exec_id1 = await queued1 - exec_id2 = await queued2 + exec_id1 = await task1 + exec_id2 = await task2 assert exec_id1 == "exec-123" assert exec_id2 == "exec-123" # Add third action - starts new batch - queued3 = await client.execute_action_group([actions[2]]) - - # Flush to complete - await client.flush_action_queue() + exec_id3 = await client.execute_action_group([actions[2]]) - exec_id3 = await queued3 + # Should have exec_id directly (waited for batch to complete) assert exec_id3 == "exec-123" # Should have been called twice (2 batches)