35
35
import queue # pylint: disable=unused-import
36
36
import threading
37
37
38
+ import gin
38
39
from absl import logging
39
40
# pylint: disable=unused-import
40
41
from compiler_opt .distributed .worker import Worker
@@ -60,12 +61,19 @@ class TaskResult:
60
61
61
62
62
63
def _run_impl (in_q : 'queue.Queue[Task]' , out_q : 'queue.Queue[TaskResult]' ,
63
- worker_class : 'type[Worker]' , * args , ** kwargs ):
64
+ worker_class : 'type[Worker]' , threads : int , * args , ** kwargs ):
64
65
"""Worker process entrypoint."""
65
66
# Note: the out_q is typed as taking only TaskResult objects, not
66
67
# Optional[TaskResult], despite that being the type it is used on the Stub
67
68
# side. This is because the `None` value is only injected by the Stub itself.
68
- pool = concurrent .futures .ThreadPoolExecutor ()
69
+
70
+ # `threads` is defaulted to 1 in LocalWorkerPool's constructor.
71
+ # A setting of 1 does not inhibit the while loop below from running since
72
+ # that runs on the main thread of the process. Urgent tasks will still
73
+ # process near-immediately. `threads` only controls how many threads are
74
+ # spawned at a time which execute given tasks. In the typical clang-spawning
75
+ # jobs, this effectively limits the number of clang instances spawned.
76
+ pool = concurrent .futures .ThreadPoolExecutor (max_workers = threads )
69
77
obj = worker_class (* args , ** kwargs )
70
78
71
79
def make_ondone (msgid ):
@@ -101,7 +109,7 @@ def _run(*args, **kwargs):
101
109
raise e
102
110
103
111
104
- def _make_stub (cls : 'type[Worker]' , * args , ** kwargs ):
112
+ def _make_stub (cls : 'type[Worker]' , pool_threads : int , * args , ** kwargs ):
105
113
106
114
class _Stub ():
107
115
"""Client stub to a worker hosted by a process."""
@@ -118,6 +126,7 @@ def __init__(self):
118
126
worker_class = cls ,
119
127
in_q = self ._send ,
120
128
out_q = self ._receive ,
129
+ threads = pool_threads ,
121
130
* args ,
122
131
** kwargs ))
123
132
# lock for the msgid -> reply future map. The map will be set to None
@@ -208,16 +217,25 @@ def __dir__(self):
208
217
return _Stub ()
209
218
210
219
220
+ @gin .configurable
211
221
class LocalWorkerPool (AbstractContextManager ):
212
222
"""A pool of workers hosted on the local machines, each in its own process."""
213
223
214
- def __init__ (self , worker_class : 'type[Worker]' , count : Optional [int ], * args ,
224
+ def __init__ (self ,
225
+ worker_class : 'type[Worker]' ,
226
+ count : Optional [int ],
227
+ * args ,
228
+ pool_threads : int = 1 ,
215
229
** kwargs ):
216
230
if not count :
217
231
count = multiprocessing .cpu_count ()
218
232
self ._stubs = [
219
- _make_stub (worker_class , * args , ** kwargs ) for _ in range (count )
233
+ _make_stub (worker_class , pool_threads , * args , ** kwargs )
234
+ for _ in range (count // pool_threads )
220
235
]
236
+ # Make sure there's always `count` worker threads, not a rounded `count`
237
+ if (remainder := count % pool_threads ) != 0 :
238
+ self ._stubs .append (_make_stub (worker_class , remainder , * args , ** kwargs ))
221
239
222
240
def __enter__ (self ):
223
241
return self ._stubs
0 commit comments