Skip to content

Commit 2e514fc

Browse files
Do not assume one-worker-per-task.
1 parent b4f4ae6 commit 2e514fc

File tree

1 file changed

+56
-68
lines changed

1 file changed

+56
-68
lines changed

Lib/test/test_concurrent_futures/test_interpreter_pool.py

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import os
55
import sys
66
import time
7-
from threading import Thread
87
import unittest
98
from concurrent.futures.interpreter import BrokenInterpreterPool
109
from concurrent import interpreters
@@ -355,93 +354,82 @@ def test_saturation(self):
355354
executor.shutdown(wait=True)
356355

357356
def test_blocking(self):
357+
# There is no guarantee that a worker will be created for every
358+
# submitted task. That's because there's a race between:
359+
#
360+
# * a new worker thread, created when task A was just submitted,
361+
# becoming non-idle when it picks up task A
362+
# * after task B is added to the queue, a new worker thread
363+
# is started only if there are no idle workers
364+
# (the check in ThreadPoolExecutor._adjust_thread_count())
365+
#
366+
# That means we must not block waiting for *all* tasks to report
367+
# "ready" before we unblock the known-ready workers.
358368
ready = queues.create()
359369
blocker = queues.create()
360370

361371
def run(taskid, ready, blocker):
362-
print(f'{taskid}: starting', flush=True)
372+
# There can't be any globals here.
363373
ready.put_nowait(taskid)
364-
print(f'{taskid}: ready', flush=True)
365-
# blocker.get(timeout=20) # blocking
366374
blocker.get() # blocking
367-
print(f'{taskid}: done', flush=True)
368375

369376
numtasks = 10
370377
futures = []
371-
executor = self.executor_type()
372-
try:
378+
with self.executor_type() as executor:
373379
# Request the jobs.
374380
for i in range(numtasks):
375381
fut = executor.submit(run, i, ready, blocker)
376382
futures.append(fut)
377-
# assert len(executor._threads) == numtasks, len(executor._threads)
378-
379-
try:
380-
# Wait for them all to be ready.
381-
pending = numtasks
382-
def wait_for_ready():
383-
nonlocal pending
383+
pending = numtasks
384+
while pending > 0:
385+
# Wait for any to be ready.
386+
done = 0
387+
for _ in range(pending):
384388
try:
385-
ready.get(timeout=10) # blocking
389+
ready.get(timeout=1) # blocking
386390
except interpreters.QueueEmpty:
387391
pass
388392
else:
389-
pending -= 1
390-
threads = [Thread(target=wait_for_ready)
391-
for _ in range(pending)]
392-
for t in threads:
393-
t.start()
394-
for t in threads:
395-
t.join()
396-
if pending:
397-
if pending < numtasks:
398-
# At least one was ready, so wait longer.
399-
for _ in range(pending):
400-
ready.get() # blocking
401-
else:
402-
# Something is probably wrong. Bail out.
403-
group = []
404-
for fut in futures:
405-
try:
406-
fut.result(timeout=0)
407-
except TimeoutError:
408-
# Still running.
409-
try:
410-
ready.get_nowait()
411-
except interpreters.QueueEmpty as exc:
412-
# It's hung.
413-
group.append(exc)
414-
else:
415-
pending -= 1
416-
except Exception as exc:
417-
group.append(exc)
418-
if group:
419-
raise ExceptionGroup('futures', group)
420-
assert not pending, pending
421-
# for _ in range(numtasks):
422-
# ready.get() # blocking
423-
finally:
393+
done += 1
394+
pending -= done
424395
# Unblock the workers.
425-
for i in range(numtasks):
396+
for _ in range(done):
426397
blocker.put_nowait(None)
427398

428-
# Make sure they finished.
429-
group = []
430-
def wait_for_done(fut):
431-
try:
432-
fut.result(timeout=10)
433-
except Exception as exc:
434-
group.append(exc)
435-
threads = [Thread(target=wait_for_done, args=(fut,))
436-
for fut in futures]
437-
for t in threads:
438-
t.start()
439-
for t in threads:
440-
t.join()
441-
if group:
442-
raise ExceptionGroup('futures', group)
443-
finally:
444-
executor.shutdown(wait=False)
399+
def test_blocking_with_limited_workers(self):
400+
# This is essentially the same as test_blocking,
401+
# but we explicitly force a limited number of workers,
402+
# instead of it happening implicitly sometimes due to a race.
403+
ready = queues.create()
404+
blocker = queues.create()
405+
406+
def run(taskid, ready, blocker):
407+
# There can't be any globals here.
408+
ready.put_nowait(taskid)
409+
blocker.get() # blocking
410+
411+
numtasks = 10
412+
futures = []
413+
with self.executor_type(4) as executor:
414+
# Request the jobs.
415+
for i in range(numtasks):
416+
fut = executor.submit(run, i, ready, blocker)
417+
futures.append(fut)
418+
pending = numtasks
419+
while pending > 0:
420+
# Wait for any to be ready.
421+
done = 0
422+
for _ in range(pending):
423+
try:
424+
ready.get(timeout=1) # blocking
425+
except interpreters.QueueEmpty:
426+
pass
427+
else:
428+
done += 1
429+
pending -= done
430+
# Unblock the workers.
431+
for _ in range(done):
432+
blocker.put_nowait(None)
445433

446434
@support.requires_gil_enabled("gh-117344: test is flaky without the GIL")
447435
def test_idle_thread_reuse(self):

0 commit comments

Comments
 (0)