19
19
handle a number of concurrent requests. The client is given a stub object that
20
20
exposes the same methods as the worker, just that they return Futures.
21
21
22
- There is a pair of queues between a stub and its corresponding process/worker.
23
- One queue is used to place tasks (method calls), the other to receive results.
24
- Tasks and results are correlated by a monotonically incrementing counter
25
- maintained by the stub.
22
+ There is a bidirectional pipe between a stub and its corresponding
23
+ process/worker. One direction is used to place tasks (method calls), the other
24
+ to place results. Tasks and results are correlated by a monotonically
25
+ incrementing counter maintained by the stub.
26
26
27
27
The worker process dequeues tasks promptly and either re-enqueues them to a
28
28
local thread pool, or, if the task is 'urgent', it executes it promptly.
31
31
import dataclasses
32
32
import functools
33
33
import multiprocessing
34
- import multiprocessing .connection
35
- import queue # pylint: disable=unused-import
36
34
import threading
37
35
38
36
from absl import logging
39
37
# pylint: disable=unused-import
40
38
from compiler_opt .distributed .worker import Worker
41
39
42
40
from contextlib import AbstractContextManager
41
+ from multiprocessing import connection
43
42
from typing import Any , Callable , Dict , Optional
44
43
45
44
@@ -59,8 +58,8 @@ class TaskResult:
59
58
value : Any
60
59
61
60
62
- def _run_impl (pipe : multiprocessing . connection .Connection ,
63
- worker_class : 'type[Worker]' , * args , ** kwargs ):
61
+ def _run_impl (pipe : connection .Connection , worker_class : 'type[Worker]' , * args ,
62
+ ** kwargs ):
64
63
"""Worker process entrypoint."""
65
64
66
65
# A setting of 1 does not inhibit the while loop below from running since
@@ -113,7 +112,7 @@ def _run(*args, **kwargs):
113
112
114
113
def _make_stub (cls : 'type[Worker]' , * args , ** kwargs ):
115
114
116
- class _Stub () :
115
+ class _Stub :
117
116
"""Client stub to a worker hosted by a process."""
118
117
119
118
def __init__ (self ):
@@ -125,7 +124,7 @@ def __init__(self):
125
124
# we set aside 1 thread to coordinate running jobs, and the main thread
126
125
# to handle high priority requests. The expectation is that the user
127
126
# achieves concurrency through multiprocessing, not multithreading.
128
- self ._process = multiprocessing .Process (
127
+ self ._process = multiprocessing .get_context (). Process (
129
128
target = functools .partial (
130
129
_run , worker_class = cls , pipe = child_pipe , * args , ** kwargs ))
131
130
# lock for the msgid -> reply future map. The map will be set to None
@@ -235,7 +234,7 @@ def __init__(self, worker_class: 'type[Worker]', count: Optional[int], *args,
235
234
def __enter__ (self ):
236
235
return self ._stubs
237
236
238
- def __exit__ (self , * args , ** kwargs ):
237
+ def __exit__ (self , * args ):
239
238
# first, trigger killing the worker process and exiting of the msg pump,
240
239
# which will also clear out any pending futures.
241
240
for s in self ._stubs :
0 commit comments