From 04d906eaac63fd5d89ffa65d43560009c0b8b7d3 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 3 Jun 2025 16:38:54 -0400 Subject: [PATCH 1/4] Add test for worker ordering --- dispatcherd/protocols.py | 2 + dispatcherd/service/pool.py | 7 ++- tests/unit/service/test_worker_order.py | 69 +++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 tests/unit/service/test_worker_order.py diff --git a/dispatcherd/protocols.py b/dispatcherd/protocols.py index fd0c7920..f463a730 100644 --- a/dispatcherd/protocols.py +++ b/dispatcherd/protocols.py @@ -210,6 +210,8 @@ def __iter__(self) -> Iterator[PoolWorker]: ... def get_by_id(self, worker_id: int) -> PoolWorker: ... + def move_to_end(self, worker_id: int) -> None: ... + class SharedAsyncObjects: exit_event: asyncio.Event diff --git a/dispatcherd/service/pool.py b/dispatcherd/service/pool.py index bf1b7609..e43e4887 100644 --- a/dispatcherd/service/pool.py +++ b/dispatcherd/service/pool.py @@ -5,6 +5,7 @@ import signal import time from typing import Any, Iterator, Literal, Optional +from collections import OrderedDict from ..processors.blocker import Blocker from ..processors.queuer import Queuer @@ -173,7 +174,7 @@ def __init__(self) -> None: class WorkerData(WorkerDataProtocol): def __init__(self) -> None: - self.workers: dict[int, PoolWorker] = {} + self.workers: OrderedDict[int, PoolWorker] = OrderedDict() self.management_lock = asyncio.Lock() def __iter__(self) -> Iterator[PoolWorker]: @@ -194,6 +195,9 @@ def get_by_id(self, worker_id: int) -> PoolWorker: def remove_by_id(self, worker_id: int) -> None: del self.workers[worker_id] + def move_to_end(self, worker_id: int) -> None: + self.workers.move_to_end(worker_id) + class WorkerPool(WorkerPoolProtocol): def __init__( @@ -559,6 +563,7 @@ async def process_finished(self, worker: PoolWorker, message: dict) -> None: else: self.finished_count += 1 worker.mark_finished_task() + self.workers.move_to_end(worker.worker_id) if not self.queuer.queued_messages and all(worker.current_task is None for worker in self.workers): self.events.work_cleared.set() diff --git a/tests/unit/service/test_worker_order.py b/tests/unit/service/test_worker_order.py new file mode 100644 index 00000000..f8928978 --- /dev/null +++ b/tests/unit/service/test_worker_order.py @@ -0,0 +1,69 @@ +import asyncio +from typing import AsyncIterator + +import pytest +import pytest_asyncio + +from dispatcherd.protocols import DispatcherMain +from dispatcherd.testing.asyncio import adispatcher_service + + +@pytest.fixture(scope='session') +def order_config(): + return { + "version": 2, + "service": { + "pool_kwargs": {"min_workers": 2, "max_workers": 2}, + "main_kwargs": {"node_id": "order-test"}, + }, + } + + +@pytest_asyncio.fixture +async def aorder_dispatcher(order_config) -> AsyncIterator[DispatcherMain]: + async with adispatcher_service(order_config) as dispatcher: + yield dispatcher + + +@pytest.mark.asyncio +async def test_workers_reorder_and_dispatch_longest_idle(aorder_dispatcher): + pool = aorder_dispatcher.pool + assert list(pool.workers.workers.keys()) == [0, 1] + + pool.events.work_cleared.clear() + await aorder_dispatcher.process_message({ + "task": "tests.data.methods.sleep_function", + "kwargs": {"seconds": 0.1}, + "uuid": "t1", + }) + await aorder_dispatcher.process_message({ + "task": "tests.data.methods.sleep_function", + "kwargs": {"seconds": 0.05}, + "uuid": "t2", + }) + await asyncio.wait_for(pool.events.work_cleared.wait(), timeout=1) + + assert list(pool.workers.workers.keys()) == [1, 0] + + pool.events.work_cleared.clear() + await aorder_dispatcher.process_message({ + "task": "tests.data.methods.sleep_function", + "kwargs": {"seconds": 0.01}, + "uuid": "t3", + }) + await asyncio.sleep(0.01) + assert pool.workers.get_by_id(1).current_task["uuid"] == "t3" + assert pool.workers.get_by_id(0).current_task is None + await asyncio.wait_for(pool.events.work_cleared.wait(), timeout=1) + + pool.events.work_cleared.clear() + await aorder_dispatcher.process_message({ + "task": "tests.data.methods.sleep_function", + "kwargs": {"seconds": 0.01}, + "uuid": "t4", + }) + await asyncio.sleep(0.01) + assert pool.workers.get_by_id(0).current_task["uuid"] == "t4" + await asyncio.wait_for(pool.events.work_cleared.wait(), timeout=1) + + assert list(pool.workers.workers.keys()) == [1, 0] From 9ba2f4f2998ba99362cacb7a77b255ffe6756498 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 3 Jun 2025 16:40:03 -0400 Subject: [PATCH 2/4] Run linters --- dispatcherd/service/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dispatcherd/service/pool.py b/dispatcherd/service/pool.py index e43e4887..3cec7cec 100644 --- a/dispatcherd/service/pool.py +++ b/dispatcherd/service/pool.py @@ -4,8 +4,8 @@ import os import signal import time -from typing import Any, Iterator, Literal, Optional from collections import OrderedDict +from typing import Any, Iterator, Literal, Optional from ..processors.blocker import Blocker from ..processors.queuer import Queuer From 717b2692a70eb5695ad1a46b86c5d685134bacd1 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Mon, 20 Oct 2025 17:22:02 -0400 Subject: [PATCH 3/4] Handle KeyError --- dispatcherd/service/pool.py | 5 +++- tests/unit/service/test_worker_order.py | 37 +++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/dispatcherd/service/pool.py b/dispatcherd/service/pool.py index 3cec7cec..542b6164 100644 --- a/dispatcherd/service/pool.py +++ b/dispatcherd/service/pool.py @@ -196,7 +196,10 @@ def remove_by_id(self, worker_id: int) -> None: del self.workers[worker_id] def move_to_end(self, worker_id: int) -> None: - self.workers.move_to_end(worker_id) + try: + self.workers.move_to_end(worker_id) + except KeyError: + logger.warning(f'Attempted to move worker_id={worker_id} to end, but worker was already removed from workers dict') class WorkerPool(WorkerPoolProtocol): diff --git a/tests/unit/service/test_worker_order.py b/tests/unit/service/test_worker_order.py index f8928978..86f3fcd8 100644 --- a/tests/unit/service/test_worker_order.py +++ b/tests/unit/service/test_worker_order.py @@ -67,3 +67,40 @@ async def test_workers_reorder_and_dispatch_longest_idle(aorder_dispatcher): await asyncio.wait_for(pool.events.work_cleared.wait(), timeout=1) assert list(pool.workers.workers.keys()) == [1, 0] + + +@pytest.mark.asyncio +async def test_process_finished_with_removed_worker(aorder_dispatcher, caplog): + """Test that process_finished handles KeyError gracefully when worker has been removed + + This simulates the race condition where a worker finishes a task, but has been + removed from self.workers before process_finished is called. + """ + import logging + pool = aorder_dispatcher.pool + + # Get a reference to a worker before removing it + worker = pool.workers.get_by_id(0) + assert worker is not None + + # Simulate the worker having a task + worker.current_task = {"uuid": "test-uuid", "task": "test.task"} + worker.finished_count = 0 + + # Remove the worker from the workers dict (simulating concurrent removal) + pool.workers.remove_by_id(0) + assert 0 not in pool.workers + + # Now call process_finished with the removed worker - should not raise KeyError + message = {"uuid": "test-uuid", "result": "success"} + + with caplog.at_level(logging.WARNING): + await pool.process_finished(worker, message) + + # Verify the warning was logged + assert any("Attempted to move worker_id=0 to end, but worker was already removed" in record.message + for record in caplog.records) + + # Verify the worker was marked as finished despite being removed + assert worker.current_task is None + assert worker.finished_count == 1 From 8b8afecf2bd0d45b5f7f4a2684efa88aa72abe04 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Mon, 20 Oct 2025 17:45:29 -0400 Subject: [PATCH 4/4] Avoid caplog, not async --- tests/unit/service/test_worker_order.py | 42 ++++++++++++------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/tests/unit/service/test_worker_order.py b/tests/unit/service/test_worker_order.py index 86f3fcd8..95058a75 100644 --- a/tests/unit/service/test_worker_order.py +++ b/tests/unit/service/test_worker_order.py @@ -70,37 +70,35 @@ async def test_workers_reorder_and_dispatch_longest_idle(aorder_dispatcher): @pytest.mark.asyncio -async def test_process_finished_with_removed_worker(aorder_dispatcher, caplog): +async def test_process_finished_with_removed_worker(): """Test that process_finished handles KeyError gracefully when worker has been removed This simulates the race condition where a worker finishes a task, but has been removed from self.workers before process_finished is called. """ - import logging - pool = aorder_dispatcher.pool + from unittest.mock import MagicMock, patch + from dispatcherd.service.pool import WorkerData, PoolWorker - # Get a reference to a worker before removing it - worker = pool.workers.get_by_id(0) - assert worker is not None + # Create a minimal WorkerData instance + worker_data = WorkerData() - # Simulate the worker having a task + # Create a mock worker + mock_process = MagicMock() + worker = PoolWorker(worker_id=0, process=mock_process) worker.current_task = {"uuid": "test-uuid", "task": "test.task"} worker.finished_count = 0 - # Remove the worker from the workers dict (simulating concurrent removal) - pool.workers.remove_by_id(0) - assert 0 not in pool.workers - - # Now call process_finished with the removed worker - should not raise KeyError - message = {"uuid": "test-uuid", "result": "success"} - - with caplog.at_level(logging.WARNING): - await pool.process_finished(worker, message) + # Add worker then remove it (simulating the race condition) + worker_data.add_worker(worker) + worker_data.remove_by_id(0) + assert 0 not in worker_data - # Verify the warning was logged - assert any("Attempted to move worker_id=0 to end, but worker was already removed" in record.message - for record in caplog.records) + # Mock the logger to verify the warning is logged + with patch('dispatcherd.service.pool.logger') as mock_logger: + # Call move_to_end on the removed worker - should not raise KeyError + worker_data.move_to_end(0) - # Verify the worker was marked as finished despite being removed - assert worker.current_task is None - assert worker.finished_count == 1 + # Verify the warning was logged + mock_logger.warning.assert_called_once() + warning_call = mock_logger.warning.call_args[0][0] + assert "Attempted to move worker_id=0 to end, but worker was already removed" in warning_call