Skip to content

Commit eff5d73

Browse files
committed
gh-128364: Fix flaky test_concurrent_futures.test_wait tests
Use events instead of relying on `time.sleep()`. The tests are also now about four times faster.
1 parent 5221d9c commit eff5d73

File tree

2 files changed

+67
-13
lines changed

2 files changed

+67
-13
lines changed

Lib/test/test_concurrent_futures/test_wait.py

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import sys
22
import threading
3-
import time
43
import unittest
54
from concurrent import futures
65
from test import support
@@ -16,24 +15,25 @@
1615
def mul(x, y):
1716
return x * y
1817

19-
def sleep_and_raise(t):
20-
time.sleep(t)
18+
def wait_and_raise(e):
19+
e.wait()
2120
raise Exception('this is an exception')
2221

2322

2423
class WaitTests:
2524
def test_20369(self):
2625
# See https://bugs.python.org/issue20369
27-
future = self.executor.submit(time.sleep, 1.5)
26+
future = self.executor.submit(mul, 1, 2)
2827
done, not_done = futures.wait([future, future],
2928
return_when=futures.ALL_COMPLETED)
3029
self.assertEqual({future}, done)
3130
self.assertEqual(set(), not_done)
3231

3332

3433
def test_first_completed(self):
34+
event = self.create_event()
3535
future1 = self.executor.submit(mul, 21, 2)
36-
future2 = self.executor.submit(time.sleep, 1.5)
36+
future2 = self.executor.submit(event.wait)
3737

3838
done, not_done = futures.wait(
3939
[CANCELLED_FUTURE, future1, future2],
@@ -42,8 +42,12 @@ def test_first_completed(self):
4242
self.assertEqual(set([future1]), done)
4343
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
4444

45+
event.set()
46+
future2.result() # wait for job to finish
47+
4548
def test_first_completed_some_already_completed(self):
46-
future1 = self.executor.submit(time.sleep, 1.5)
49+
event = self.create_event()
50+
future1 = self.executor.submit(event.wait)
4751

4852
finished, pending = futures.wait(
4953
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
@@ -54,11 +58,24 @@ def test_first_completed_some_already_completed(self):
5458
finished)
5559
self.assertEqual(set([future1]), pending)
5660

57-
@support.requires_resource('walltime')
61+
event.set()
62+
future1.result() # wait for job to finish
63+
5864
def test_first_exception(self):
65+
event1 = self.create_event()
66+
event2 = self.create_event()
67+
5968
future1 = self.executor.submit(mul, 2, 21)
60-
future2 = self.executor.submit(sleep_and_raise, 1.5)
61-
future3 = self.executor.submit(time.sleep, 3)
69+
future2 = self.executor.submit(wait_and_raise, event1)
70+
future3 = self.executor.submit(event2.wait)
71+
72+
# Ensure that future1 is completed before future2 finishes
73+
def wait_for_future1():
74+
future1.result()
75+
event1.set()
76+
77+
t = threading.Thread(target=wait_for_future1)
78+
t.start()
6279

6380
finished, pending = futures.wait(
6481
[future1, future2, future3],
@@ -67,9 +84,14 @@ def test_first_exception(self):
6784
self.assertEqual(set([future1, future2]), finished)
6885
self.assertEqual(set([future3]), pending)
6986

87+
t.join()
88+
event2.set()
89+
future3.result() # wait for job to finish
90+
7091
def test_first_exception_some_already_complete(self):
92+
event = self.create_event()
7193
future1 = self.executor.submit(divmod, 21, 0)
72-
future2 = self.executor.submit(time.sleep, 1.5)
94+
future2 = self.executor.submit(event.wait)
7395

7496
finished, pending = futures.wait(
7597
[SUCCESSFUL_FUTURE,
@@ -83,8 +105,12 @@ def test_first_exception_some_already_complete(self):
83105
future1]), finished)
84106
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
85107

108+
event.set()
109+
future2.result() # wait for job to finish
110+
86111
def test_first_exception_one_already_failed(self):
87-
future1 = self.executor.submit(time.sleep, 2)
112+
event = self.create_event()
113+
future1 = self.executor.submit(event.wait)
88114

89115
finished, pending = futures.wait(
90116
[EXCEPTION_FUTURE, future1],
@@ -93,6 +119,9 @@ def test_first_exception_one_already_failed(self):
93119
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
94120
self.assertEqual(set([future1]), pending)
95121

122+
event.set()
123+
future1.result() # wait for job to finish
124+
96125
def test_all_completed(self):
97126
future1 = self.executor.submit(divmod, 2, 0)
98127
future2 = self.executor.submit(mul, 2, 21)
@@ -114,9 +143,9 @@ def test_all_completed(self):
114143

115144
def test_timeout(self):
116145
short_timeout = 0.050
117-
long_timeout = short_timeout * 10
118146

119-
future = self.executor.submit(time.sleep, long_timeout)
147+
event = self.create_event()
148+
future = self.executor.submit(event.wait)
120149

121150
finished, pending = futures.wait(
122151
[CANCELLED_AND_NOTIFIED_FUTURE,
@@ -132,6 +161,10 @@ def test_timeout(self):
132161
finished)
133162
self.assertEqual(set([future]), pending)
134163

164+
# Set the event to allow the future to complete
165+
event.set()
166+
future.result() # wait for job to finish
167+
135168

136169
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
137170

Lib/test/test_concurrent_futures/util.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import multiprocessing
22
import sys
3+
import threading
34
import time
45
import unittest
56
from concurrent import futures
@@ -50,14 +51,19 @@ def setUp(self):
5051
max_workers=self.worker_count,
5152
mp_context=self.get_context(),
5253
**self.executor_kwargs)
54+
self.manager = multiprocessing.Manager()
5355
else:
5456
self.executor = self.executor_type(
5557
max_workers=self.worker_count,
5658
**self.executor_kwargs)
59+
self.manager = None
5760

5861
def tearDown(self):
5962
self.executor.shutdown(wait=True)
6063
self.executor = None
64+
if self.manager is not None:
65+
self.manager.shutdown()
66+
self.manager = None
6167

6268
dt = time.monotonic() - self.t1
6369
if support.verbose:
@@ -73,11 +79,17 @@ def get_context(self):
7379
class ThreadPoolMixin(ExecutorMixin):
7480
executor_type = futures.ThreadPoolExecutor
7581

82+
def create_event(self):
83+
return threading.Event()
84+
7685

7786
@support.skip_if_sanitizer("gh-129824: data races in InterpreterPool tests", thread=True)
7887
class InterpreterPoolMixin(ExecutorMixin):
7988
executor_type = futures.InterpreterPoolExecutor
8089

90+
def create_event(self):
91+
self.skipTest("InterpreterPoolExecutor doesn't support events")
92+
8193

8294
class ProcessPoolForkMixin(ExecutorMixin):
8395
executor_type = futures.ProcessPoolExecutor
@@ -94,6 +106,9 @@ def get_context(self):
94106
self.skipTest("TSAN doesn't support threads after fork")
95107
return super().get_context()
96108

109+
def create_event(self):
110+
return self.manager.Event()
111+
97112

98113
class ProcessPoolSpawnMixin(ExecutorMixin):
99114
executor_type = futures.ProcessPoolExecutor
@@ -106,6 +121,9 @@ def get_context(self):
106121
self.skipTest("ProcessPoolExecutor unavailable on this system")
107122
return super().get_context()
108123

124+
def create_event(self):
125+
return self.manager.Event()
126+
109127

110128
class ProcessPoolForkserverMixin(ExecutorMixin):
111129
executor_type = futures.ProcessPoolExecutor
@@ -122,6 +140,9 @@ def get_context(self):
122140
self.skipTest("TSAN doesn't support threads after fork")
123141
return super().get_context()
124142

143+
def create_event(self):
144+
return self.manager.Event()
145+
125146

126147
def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
127148
executor_mixins=(ThreadPoolMixin,

0 commit comments

Comments
 (0)