11import sys
22import threading
3- import time
43import unittest
54from concurrent import futures
65from test import support
6+ from test .support import threading_helper
77
88from .util import (
99 CANCELLED_FUTURE , CANCELLED_AND_NOTIFIED_FUTURE , EXCEPTION_FUTURE ,
1616def mul (x , y ):
1717 return x * y
1818
19- def sleep_and_raise ( t ):
20- time . sleep ( t )
19+ def wait_and_raise ( e ):
20+ e . wait ( )
2121 raise Exception ('this is an exception' )
2222
2323
2424class WaitTests :
2525 def test_20369 (self ):
2626 # See https://bugs.python.org/issue20369
27- future = self .executor .submit (time . sleep , 1.5 )
27+ future = self .executor .submit (mul , 1 , 2 )
2828 done , not_done = futures .wait ([future , future ],
2929 return_when = futures .ALL_COMPLETED )
3030 self .assertEqual ({future }, done )
3131 self .assertEqual (set (), not_done )
3232
3333
3434 def test_first_completed (self ):
35+ event = self .create_event ()
3536 future1 = self .executor .submit (mul , 21 , 2 )
36- future2 = self .executor .submit (time . sleep , 1.5 )
37+ future2 = self .executor .submit (event . wait )
3738
38- done , not_done = futures .wait (
39- [CANCELLED_FUTURE , future1 , future2 ],
40- return_when = futures .FIRST_COMPLETED )
39+ try :
40+ done , not_done = futures .wait (
41+ [CANCELLED_FUTURE , future1 , future2 ],
42+ return_when = futures .FIRST_COMPLETED )
4143
42- self .assertEqual (set ([future1 ]), done )
43- self .assertEqual (set ([CANCELLED_FUTURE , future2 ]), not_done )
44+ self .assertEqual (set ([future1 ]), done )
45+ self .assertEqual (set ([CANCELLED_FUTURE , future2 ]), not_done )
46+ finally :
47+ event .set ()
48+ future2 .result () # wait for job to finish
4449
4550 def test_first_completed_some_already_completed (self ):
46- future1 = self .executor .submit (time .sleep , 1.5 )
47-
48- finished , pending = futures .wait (
49- [CANCELLED_AND_NOTIFIED_FUTURE , SUCCESSFUL_FUTURE , future1 ],
50- return_when = futures .FIRST_COMPLETED )
51+ event = self .create_event ()
52+ future1 = self .executor .submit (event .wait )
5153
52- self .assertEqual (
53- set ([CANCELLED_AND_NOTIFIED_FUTURE , SUCCESSFUL_FUTURE ]),
54- finished )
55- self .assertEqual (set ([future1 ]), pending )
54+ try :
55+ finished , pending = futures .wait (
56+ [CANCELLED_AND_NOTIFIED_FUTURE , SUCCESSFUL_FUTURE , future1 ],
57+ return_when = futures .FIRST_COMPLETED )
58+
59+ self .assertEqual (
60+ set ([CANCELLED_AND_NOTIFIED_FUTURE , SUCCESSFUL_FUTURE ]),
61+ finished )
62+ self .assertEqual (set ([future1 ]), pending )
63+ finally :
64+ event .set ()
65+ future1 .result () # wait for job to finish
5666
57- @support .requires_resource ('walltime' )
5867 def test_first_exception (self ):
59- future1 = self .executor .submit (mul , 2 , 21 )
60- future2 = self .executor .submit (sleep_and_raise , 1.5 )
61- future3 = self .executor .submit (time .sleep , 3 )
68+ event1 = self .create_event ()
69+ event2 = self .create_event ()
70+ try :
71+ future1 = self .executor .submit (mul , 2 , 21 )
72+ future2 = self .executor .submit (wait_and_raise , event1 )
73+ future3 = self .executor .submit (event2 .wait )
6274
63- finished , pending = futures .wait (
64- [future1 , future2 , future3 ],
65- return_when = futures .FIRST_EXCEPTION )
75+ # Ensure that future1 is completed before future2 finishes
76+ def wait_for_future1 ():
77+ future1 .result ()
78+ event1 .set ()
79+
80+ t = threading .Thread (target = wait_for_future1 )
81+ t .start ()
82+
83+ finished , pending = futures .wait (
84+ [future1 , future2 , future3 ],
85+ return_when = futures .FIRST_EXCEPTION )
6686
67- self .assertEqual (set ([future1 , future2 ]), finished )
68- self .assertEqual (set ([future3 ]), pending )
87+ self .assertEqual (set ([future1 , future2 ]), finished )
88+ self .assertEqual (set ([future3 ]), pending )
89+
90+ threading_helper .join_thread (t )
91+ finally :
92+ event1 .set ()
93+ event2 .set ()
94+ future3 .result () # wait for job to finish
6995
7096 def test_first_exception_some_already_complete (self ):
97+ event = self .create_event ()
7198 future1 = self .executor .submit (divmod , 21 , 0 )
72- future2 = self .executor .submit (time .sleep , 1.5 )
73-
74- finished , pending = futures .wait (
75- [SUCCESSFUL_FUTURE ,
76- CANCELLED_FUTURE ,
77- CANCELLED_AND_NOTIFIED_FUTURE ,
78- future1 , future2 ],
79- return_when = futures .FIRST_EXCEPTION )
99+ future2 = self .executor .submit (event .wait )
80100
81- self .assertEqual (set ([SUCCESSFUL_FUTURE ,
82- CANCELLED_AND_NOTIFIED_FUTURE ,
83- future1 ]), finished )
84- self .assertEqual (set ([CANCELLED_FUTURE , future2 ]), pending )
101+ try :
102+ finished , pending = futures .wait (
103+ [SUCCESSFUL_FUTURE ,
104+ CANCELLED_FUTURE ,
105+ CANCELLED_AND_NOTIFIED_FUTURE ,
106+ future1 , future2 ],
107+ return_when = futures .FIRST_EXCEPTION )
108+
109+ self .assertEqual (set ([SUCCESSFUL_FUTURE ,
110+ CANCELLED_AND_NOTIFIED_FUTURE ,
111+ future1 ]), finished )
112+ self .assertEqual (set ([CANCELLED_FUTURE , future2 ]), pending )
113+ finally :
114+ event .set ()
115+ future2 .result () # wait for job to finish
85116
86117 def test_first_exception_one_already_failed (self ):
87- future1 = self .executor .submit (time .sleep , 2 )
118+ event = self .create_event ()
119+ future1 = self .executor .submit (event .wait )
88120
89- finished , pending = futures .wait (
90- [EXCEPTION_FUTURE , future1 ],
91- return_when = futures .FIRST_EXCEPTION )
121+ try :
122+ finished , pending = futures .wait (
123+ [EXCEPTION_FUTURE , future1 ],
124+ return_when = futures .FIRST_EXCEPTION )
92125
93- self .assertEqual (set ([EXCEPTION_FUTURE ]), finished )
94- self .assertEqual (set ([future1 ]), pending )
126+ self .assertEqual (set ([EXCEPTION_FUTURE ]), finished )
127+ self .assertEqual (set ([future1 ]), pending )
128+ finally :
129+ event .set ()
130+ future1 .result () # wait for job to finish
95131
96132 def test_all_completed (self ):
97133 future1 = self .executor .submit (divmod , 2 , 0 )
@@ -114,23 +150,27 @@ def test_all_completed(self):
114150
115151 def test_timeout (self ):
116152 short_timeout = 0.050
117- long_timeout = short_timeout * 10
118153
119- future = self .executor .submit (time .sleep , long_timeout )
154+ event = self .create_event ()
155+ future = self .executor .submit (event .wait )
120156
121- finished , pending = futures .wait (
122- [CANCELLED_AND_NOTIFIED_FUTURE ,
123- EXCEPTION_FUTURE ,
124- SUCCESSFUL_FUTURE ,
125- future ],
126- timeout = short_timeout ,
127- return_when = futures .ALL_COMPLETED )
128-
129- self .assertEqual (set ([CANCELLED_AND_NOTIFIED_FUTURE ,
130- EXCEPTION_FUTURE ,
131- SUCCESSFUL_FUTURE ]),
132- finished )
133- self .assertEqual (set ([future ]), pending )
157+ try :
158+ finished , pending = futures .wait (
159+ [CANCELLED_AND_NOTIFIED_FUTURE ,
160+ EXCEPTION_FUTURE ,
161+ SUCCESSFUL_FUTURE ,
162+ future ],
163+ timeout = short_timeout ,
164+ return_when = futures .ALL_COMPLETED )
165+
166+ self .assertEqual (set ([CANCELLED_AND_NOTIFIED_FUTURE ,
167+ EXCEPTION_FUTURE ,
168+ SUCCESSFUL_FUTURE ]),
169+ finished )
170+ self .assertEqual (set ([future ]), pending )
171+ finally :
172+ event .set ()
173+ future .result () # wait for job to finish
134174
135175
136176class ThreadPoolWaitTests (ThreadPoolMixin , WaitTests , BaseTestCase ):
0 commit comments