35
35
import queue # pylint: disable=unused-import
36
36
import threading
37
37
38
- import gin
39
38
from absl import logging
40
39
# pylint: disable=unused-import
41
40
from compiler_opt .distributed .worker import Worker
@@ -61,7 +60,7 @@ class TaskResult:
61
60
62
61
63
62
def _run_impl (in_q : 'queue.Queue[Task]' , out_q : 'queue.Queue[TaskResult]' ,
64
- worker_class : 'type[Worker]' , threads : int , * args , ** kwargs ):
63
+ worker_class : 'type[Worker]' , * args , ** kwargs ):
65
64
"""Worker process entrypoint."""
66
65
# Note: the out_q is typed as taking only TaskResult objects, not
67
66
# Optional[TaskResult], despite that being the type it is used on the Stub
@@ -73,7 +72,7 @@ def _run_impl(in_q: 'queue.Queue[Task]', out_q: 'queue.Queue[TaskResult]',
73
72
# process near-immediately. `threads` only controls how many threads are
74
73
# spawned at a time which execute given tasks. In the typical clang-spawning
75
74
# jobs, this effectively limits the number of clang instances spawned.
76
- pool = concurrent .futures .ThreadPoolExecutor (max_workers = threads )
75
+ pool = concurrent .futures .ThreadPoolExecutor (max_workers = 1 )
77
76
obj = worker_class (* args , ** kwargs )
78
77
79
78
def make_ondone (msgid ):
@@ -109,7 +108,7 @@ def _run(*args, **kwargs):
109
108
raise e
110
109
111
110
112
- def _make_stub (cls : 'type[Worker]' , pool_threads : int , * args , ** kwargs ):
111
+ def _make_stub (cls : 'type[Worker]' , * args , ** kwargs ):
113
112
114
113
class _Stub ():
115
114
"""Client stub to a worker hosted by a process."""
@@ -120,13 +119,15 @@ def __init__(self):
120
119
multiprocessing .get_context ().Queue ()
121
120
122
121
# this is the process hosting one worker instance.
122
+ # we set aside 1 thread to coordinate running jobs, and the main thread
123
+ # to handle high priority requests. The expectation is that the user
124
+ # achieves concurrency through multiprocessing, not multithreading.
123
125
self ._process = multiprocessing .Process (
124
126
target = functools .partial (
125
127
_run ,
126
128
worker_class = cls ,
127
129
in_q = self ._send ,
128
130
out_q = self ._receive ,
129
- threads = pool_threads ,
130
131
* args ,
131
132
** kwargs ))
132
133
# lock for the msgid -> reply future map. The map will be set to None
@@ -217,25 +218,16 @@ def __dir__(self):
217
218
return _Stub ()
218
219
219
220
220
- @gin .configurable
221
221
class LocalWorkerPool (AbstractContextManager ):
222
222
"""A pool of workers hosted on the local machines, each in its own process."""
223
223
224
- def __init__ (self ,
225
- worker_class : 'type[Worker]' ,
226
- count : Optional [int ],
227
- * args ,
228
- pool_threads : int = 1 ,
224
+ def __init__ (self , worker_class : 'type[Worker]' , count : Optional [int ], * args ,
229
225
** kwargs ):
230
226
if not count :
231
227
count = multiprocessing .cpu_count ()
232
228
self ._stubs = [
233
- _make_stub (worker_class , pool_threads , * args , ** kwargs )
234
- for _ in range (count // pool_threads )
229
+ _make_stub (worker_class , * args , ** kwargs ) for _ in range (count )
235
230
]
236
- # Make sure there's always `count` worker threads, not a rounded `count`
237
- if (remainder := count % pool_threads ) != 0 :
238
- self ._stubs .append (_make_stub (worker_class , remainder , * args , ** kwargs ))
239
231
240
232
def __enter__ (self ):
241
233
return self ._stubs
0 commit comments