Skip to content

Commit c834db2

Browse files
authored
Tidy up local_worker_manager (#88)
- Add a get_context() - Fix a signature - Update module docstring
1 parent 54d3e5d commit c834db2

File tree

1 file changed

+10
-11
lines changed

1 file changed

+10
-11
lines changed

compiler_opt/distributed/local/local_worker_manager.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
handle a number of concurrent requests. The client is given a stub object that
2020
exposes the same methods as the worker, just that they return Futures.
2121
22-
There is a pair of queues between a stub and its corresponding process/worker.
23-
One queue is used to place tasks (method calls), the other to receive results.
24-
Tasks and results are correlated by a monotonically incrementing counter
25-
maintained by the stub.
22+
There is a bidirectional pipe between a stub and its corresponding
23+
process/worker. One direction is used to place tasks (method calls), the other
24+
to place results. Tasks and results are correlated by a monotonically
25+
incrementing counter maintained by the stub.
2626
2727
The worker process dequeues tasks promptly and either re-enqueues them to a
2828
local thread pool, or, if the task is 'urgent', it executes it promptly.
@@ -31,15 +31,14 @@
3131
import dataclasses
3232
import functools
3333
import multiprocessing
34-
import multiprocessing.connection
35-
import queue # pylint: disable=unused-import
3634
import threading
3735

3836
from absl import logging
3937
# pylint: disable=unused-import
4038
from compiler_opt.distributed.worker import Worker
4139

4240
from contextlib import AbstractContextManager
41+
from multiprocessing import connection
4342
from typing import Any, Callable, Dict, Optional
4443

4544

@@ -59,8 +58,8 @@ class TaskResult:
5958
value: Any
6059

6160

62-
def _run_impl(pipe: multiprocessing.connection.Connection,
63-
worker_class: 'type[Worker]', *args, **kwargs):
61+
def _run_impl(pipe: connection.Connection, worker_class: 'type[Worker]', *args,
62+
**kwargs):
6463
"""Worker process entrypoint."""
6564

6665
# A setting of 1 does not inhibit the while loop below from running since
@@ -113,7 +112,7 @@ def _run(*args, **kwargs):
113112

114113
def _make_stub(cls: 'type[Worker]', *args, **kwargs):
115114

116-
class _Stub():
115+
class _Stub:
117116
"""Client stub to a worker hosted by a process."""
118117

119118
def __init__(self):
@@ -125,7 +124,7 @@ def __init__(self):
125124
# we set aside 1 thread to coordinate running jobs, and the main thread
126125
# to handle high priority requests. The expectation is that the user
127126
# achieves concurrency through multiprocessing, not multithreading.
128-
self._process = multiprocessing.Process(
127+
self._process = multiprocessing.get_context().Process(
129128
target=functools.partial(
130129
_run, worker_class=cls, pipe=child_pipe, *args, **kwargs))
131130
# lock for the msgid -> reply future map. The map will be set to None
@@ -235,7 +234,7 @@ def __init__(self, worker_class: 'type[Worker]', count: Optional[int], *args,
235234
def __enter__(self):
236235
return self._stubs
237236

238-
def __exit__(self, *args, **kwargs):
237+
def __exit__(self, *args):
239238
# first, trigger killing the worker process and exiting of the msg pump,
240239
# which will also clear out any pending futures.
241240
for s in self._stubs:

0 commit comments

Comments
 (0)