Skip to content

Commit f6d7b11

Browse files
committed
implement limited WorkerPool size
1 parent 116637a commit f6d7b11

File tree

2 files changed

+26
-3
lines changed

2 files changed

+26
-3
lines changed

execnet/gateway_base.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,29 @@ def run(self):
227227
self.running = False
228228

229229

230+
231+
if sys.version_info >= (3, 7):
232+
from contextlib import nullcontext
233+
else:
234+
class nullcontext(object):
235+
"""Context manager that does no additional processing.
236+
Used as a stand-in for a normal context manager, when a particular
237+
block of code is only sometimes used with a normal context manager:
238+
cm = optional_cm if condition else nullcontext()
239+
with cm:
240+
# Perform operation, using optional_cm if condition is True
241+
"""
242+
243+
def __init__(self, enter_result=None):
244+
self.enter_result = enter_result
245+
246+
def __enter__(self):
247+
return self.enter_result
248+
249+
def __exit__(self, *excinfo):
250+
pass
251+
252+
230253
class WorkerPool(object):
231254
""" A WorkerPool allows to spawn function executions
232255
to threads, returning a reply object on which you
@@ -238,13 +261,14 @@ class WorkerPool(object):
238261
when the pool received a trigger_shutdown().
239262
"""
240263

241-
def __init__(self, execmodel, hasprimary=False):
264+
def __init__(self, execmodel, hasprimary=False, size=None):
242265
""" by default allow unlimited number of spawns. """
243266
self.execmodel = execmodel
244267
self._running_lock = self.execmodel.Lock()
245268
self._running = set()
246269
self._shuttingdown = False
247270
self._waitall_events = []
271+
self._semaphore = nullcontext() if size is None else self.execmodel.Semaphore(size)
248272
if hasprimary:
249273
if self.execmodel.backend != "thread":
250274
raise ValueError("hasprimary=True requires thread model")
@@ -307,7 +331,7 @@ def spawn(self, func, *args, **kwargs):
307331
of the given func(*args, **kwargs).
308332
"""
309333
reply = Reply((func, args, kwargs), self.execmodel)
310-
with self._running_lock:
334+
with self._semaphore, self._running_lock:
311335
if self._shuttingdown:
312336
raise ValueError("pool is shutting down")
313337
self._running.add(reply)

testing/test_threadpool.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ def test_waitfinish_on_reply(pool):
7474
pytest.raises(ZeroDivisionError, reply.get)
7575

7676

77-
@pytest.mark.xfail(reason="WorkerPool does not implement limited size")
7877
def test_limited_size(execmodel):
7978
pool = WorkerPool(execmodel, size=1)
8079
q = execmodel.queue.Queue()

0 commit comments

Comments
 (0)