Skip to content

Commit 477439f

Browse files
committed
Complete worker settings initialization
1 parent 2edf0e4 commit 477439f

File tree

4 files changed

+8
-7
lines changed

4 files changed

+8
-7
lines changed

dispatcher/factories.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def from_settings(settings: LazySettings = global_settings) -> DispatcherMain:
5353
between the service, publisher, and any other interacting processes.
5454
"""
5555
producers = producers_from_settings(settings=settings)
56-
return DispatcherMain(settings.service, producers)
56+
return DispatcherMain(settings.service, producers, settings=settings)
5757

5858

5959
# ---- Publisher objects ----

dispatcher/main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from dispatcher.pool import WorkerPool
99
from dispatcher.producers import BaseProducer
10+
from dispatcher.config import settings as global_settings, LazySettings
1011

1112
logger = logging.getLogger(__name__)
1213

@@ -76,7 +77,7 @@ def __init__(self) -> None:
7677

7778

7879
class DispatcherMain:
79-
def __init__(self, service_config: dict, producers: Iterable[BaseProducer]):
80+
def __init__(self, service_config: dict, producers: Iterable[BaseProducer], settings: LazySettings = global_settings):
8081
self.delayed_messages: list[SimpleNamespace] = []
8182
self.received_count = 0
8283
self.control_count = 0
@@ -85,7 +86,7 @@ def __init__(self, service_config: dict, producers: Iterable[BaseProducer]):
8586
# Lock for file descriptor mgmnt - hold lock when forking or connecting, to avoid DNS hangs
8687
# psycopg is well-behaved IFF you do not connect while forking, compare to AWX __clean_on_fork__
8788
self.fd_lock = asyncio.Lock()
88-
self.pool = WorkerPool(fd_lock=self.fd_lock, **service_config)
89+
self.pool = WorkerPool(fd_lock=self.fd_lock, settings=settings, **service_config)
8990

9091
# Set all the producers, this should still not start anything, just establishes objects
9192
self.producers = producers

dispatcher/pool.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,12 @@
1212

1313

1414
class PoolWorker:
15-
def __init__(self, worker_id: int, process: ProcessProxy, settings: LazySettings = global_settings) -> None:
15+
def __init__(self, worker_id: int, process: ProcessProxy) -> None:
1616
self.worker_id = worker_id
1717
self.process = process
1818
self.current_task: Optional[dict] = None
1919
self.started_at: Optional[int] = None
2020
self.is_active_cancel: bool = False
21-
self.settings_stash: dict = settings.serialize()
2221

2322
# Tracking information for worker
2423
self.finished_count = 0
@@ -96,9 +95,10 @@ def __init__(self) -> None:
9695

9796

9897
class WorkerPool:
99-
def __init__(self, max_workers: int, fd_lock: Optional[asyncio.Lock] = None):
98+
def __init__(self, max_workers: int, fd_lock: Optional[asyncio.Lock] = None, settings: LazySettings = global_settings):
10099
self.max_workers = max_workers
101100
self.workers: dict[int, PoolWorker] = {}
101+
self.settings_stash: dict = settings.serialize() # These are passed to the workers to initialize dispatcher settings
102102
self.next_worker_id = 0
103103
self.process_manager = ProcessManager()
104104
self.queued_messages: list[dict] = [] # TODO: use deque, invent new kinds of logging anxiety

tests/integration/test_main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async def test_run_decorated_function(apg_dispatcher, test_settings):
4545
@pytest.mark.asyncio
4646
async def test_submit_with_global_settings(apg_dispatcher, test_settings):
4747
clearing_task = asyncio.create_task(apg_dispatcher.pool.events.work_cleared.wait())
48-
with temporary_settings(test_settings):
48+
with temporary_settings(test_settings.serialize()):
4949
test_methods.print_hello.delay() # settings are inferred from global context
5050
await asyncio.wait_for(clearing_task, timeout=3)
5151

0 commit comments

Comments
 (0)