Skip to content

Commit 0c9732e

Browse files
committed
Run main tests with forkserver
1 parent 32dd102 commit 0c9732e

File tree

3 files changed

+13
-6
lines changed

3 files changed

+13
-6
lines changed

dispatcher/main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from dispatcher.producers.brokered import BrokeredProducer
1111
from dispatcher.producers.scheduled import ScheduledProducer
1212
from dispatcher.utils import MODULE_METHOD_DELIMITER
13+
from dispatcher.process import ForkServerManager
1314

1415
logger = logging.getLogger(__name__)
1516

@@ -79,7 +80,7 @@ def __init__(self) -> None:
7980

8081

8182
class DispatcherMain:
82-
def __init__(self, config: dict):
83+
def __init__(self, config: dict, manager_cls: type = ForkServerManager):
8384
self.delayed_messages: list[SimpleNamespace] = []
8485
self.received_count = 0
8586
self.control_count = 0
@@ -88,7 +89,7 @@ def __init__(self, config: dict):
8889
# Lock for file descriptor mgmnt - hold lock when forking or connecting, to avoid DNS hangs
8990
# psycopg is well-behaved IFF you do not connect while forking, compare to AWX __clean_on_fork__
9091
self.fd_lock = asyncio.Lock()
91-
self.pool = WorkerPool(config.get('pool', {}).get('max_workers', 3), self.fd_lock)
92+
self.pool = WorkerPool(config.get('pool', {}).get('max_workers', 3), fd_lock=self.fd_lock, manager_cls=manager_cls)
9293

9394
# Initialize all the producers, this should not start anything, just establishes objects
9495
self.producers: list[Union[ScheduledProducer, BrokeredProducer]] = []

dispatcher/pool.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,11 @@ def __init__(self) -> None:
9494

9595

9696
class WorkerPool:
97-
def __init__(self, num_workers: int, fd_lock: Optional[asyncio.Lock] = None):
97+
def __init__(self, num_workers: int, fd_lock: Optional[asyncio.Lock] = None, manager_cls: type = ForkServerManager):
9898
self.num_workers = num_workers
9999
self.workers: dict[int, PoolWorker] = {}
100100
self.next_worker_id = 0
101-
self.process_manager = ForkServerManager()
101+
self.process_manager = manager_cls()
102102
self.queued_messages: list[dict] = [] # TODO: use deque, invent new kinds of logging anxiety
103103
self.read_results_task: Optional[Task] = None
104104
self.start_worker_task: Optional[Task] = None

tests/conftest.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from dispatcher.control import Control
1313

1414
from dispatcher.brokers.pg_notify import apublish_message, aget_connection, get_connection
15+
from dispatcher.process import ProcessManager, ForkServerManager
1516

1617

1718
# List of channels to listen on
@@ -28,10 +29,15 @@ def pg_dispatcher() -> DispatcherMain:
2829
return DispatcherMain(BASIC_CONFIG)
2930

3031

31-
@pytest_asyncio.fixture(loop_scope="function", scope="function")
32+
@pytest_asyncio.fixture(
33+
loop_scope="function",
34+
scope="function",
35+
params=[ProcessManager, ForkServerManager],
36+
ids=["fork", "forkserver"],
37+
)
3238
async def apg_dispatcher(request) -> AsyncIterator[DispatcherMain]:
3339
try:
34-
dispatcher = DispatcherMain(BASIC_CONFIG)
40+
dispatcher = DispatcherMain(BASIC_CONFIG, manager_cls=request.param)
3541

3642
await dispatcher.connect_signals()
3743
await dispatcher.start_working()

0 commit comments

Comments
 (0)