3
3
import heapq
4
4
5
5
from enum import Enum
6
- from threading import Thread , Barrier , Lock
6
+ from threading import Barrier , Lock
7
7
from random import shuffle , randint
8
8
9
9
from test .support import threading_helper
10
+ from test .support .threading_helper import run_concurrently
10
11
from test import test_heapq
11
12
12
13
@@ -28,8 +29,8 @@ def test_racing_heapify(self):
28
29
heap = list (range (OBJECT_COUNT ))
29
30
shuffle (heap )
30
31
31
- self . run_concurrently (
32
- worker_func = heapq .heapify , args = (heap ,), nthreads = NTHREADS
32
+ run_concurrently (
33
+ worker_func = heapq .heapify , nthreads = NTHREADS , args = (heap ,)
33
34
)
34
35
self .test_heapq .check_invariant (heap )
35
36
@@ -40,8 +41,8 @@ def heappush_func(heap):
40
41
for item in reversed (range (OBJECT_COUNT )):
41
42
heapq .heappush (heap , item )
42
43
43
- self . run_concurrently (
44
- worker_func = heappush_func , args = (heap ,), nthreads = NTHREADS
44
+ run_concurrently (
45
+ worker_func = heappush_func , nthreads = NTHREADS , args = (heap ,)
45
46
)
46
47
self .test_heapq .check_invariant (heap )
47
48
@@ -61,10 +62,10 @@ def heappop_func(heap, pop_count):
61
62
# Each local list should be sorted
62
63
self .assertTrue (self .is_sorted_ascending (local_list ))
63
64
64
- self . run_concurrently (
65
+ run_concurrently (
65
66
worker_func = heappop_func ,
66
- args = (heap , per_thread_pop_count ),
67
67
nthreads = NTHREADS ,
68
+ args = (heap , per_thread_pop_count ),
68
69
)
69
70
self .assertEqual (len (heap ), 0 )
70
71
@@ -77,10 +78,10 @@ def heappushpop_func(heap, pushpop_items):
77
78
popped_item = heapq .heappushpop (heap , item )
78
79
self .assertTrue (popped_item <= item )
79
80
80
- self . run_concurrently (
81
+ run_concurrently (
81
82
worker_func = heappushpop_func ,
82
- args = (heap , pushpop_items ),
83
83
nthreads = NTHREADS ,
84
+ args = (heap , pushpop_items ),
84
85
)
85
86
self .assertEqual (len (heap ), OBJECT_COUNT )
86
87
self .test_heapq .check_invariant (heap )
@@ -93,10 +94,10 @@ def heapreplace_func(heap, replace_items):
93
94
for item in replace_items :
94
95
heapq .heapreplace (heap , item )
95
96
96
- self . run_concurrently (
97
+ run_concurrently (
97
98
worker_func = heapreplace_func ,
98
- args = (heap , replace_items ),
99
99
nthreads = NTHREADS ,
100
+ args = (heap , replace_items ),
100
101
)
101
102
self .assertEqual (len (heap ), OBJECT_COUNT )
102
103
self .test_heapq .check_invariant (heap )
@@ -105,8 +106,8 @@ def test_racing_heapify_max(self):
105
106
max_heap = list (range (OBJECT_COUNT ))
106
107
shuffle (max_heap )
107
108
108
- self . run_concurrently (
109
- worker_func = heapq .heapify_max , args = (max_heap ,), nthreads = NTHREADS
109
+ run_concurrently (
110
+ worker_func = heapq .heapify_max , nthreads = NTHREADS , args = (max_heap ,)
110
111
)
111
112
self .test_heapq .check_max_invariant (max_heap )
112
113
@@ -117,8 +118,8 @@ def heappush_max_func(max_heap):
117
118
for item in range (OBJECT_COUNT ):
118
119
heapq .heappush_max (max_heap , item )
119
120
120
- self . run_concurrently (
121
- worker_func = heappush_max_func , args = (max_heap ,), nthreads = NTHREADS
121
+ run_concurrently (
122
+ worker_func = heappush_max_func , nthreads = NTHREADS , args = (max_heap ,)
122
123
)
123
124
self .test_heapq .check_max_invariant (max_heap )
124
125
@@ -138,10 +139,10 @@ def heappop_max_func(max_heap, pop_count):
138
139
# Each local list should be sorted
139
140
self .assertTrue (self .is_sorted_descending (local_list ))
140
141
141
- self . run_concurrently (
142
+ run_concurrently (
142
143
worker_func = heappop_max_func ,
143
- args = (max_heap , per_thread_pop_count ),
144
144
nthreads = NTHREADS ,
145
+ args = (max_heap , per_thread_pop_count ),
145
146
)
146
147
self .assertEqual (len (max_heap ), 0 )
147
148
@@ -154,10 +155,10 @@ def heappushpop_max_func(max_heap, pushpop_items):
154
155
popped_item = heapq .heappushpop_max (max_heap , item )
155
156
self .assertTrue (popped_item >= item )
156
157
157
- self . run_concurrently (
158
+ run_concurrently (
158
159
worker_func = heappushpop_max_func ,
159
- args = (max_heap , pushpop_items ),
160
160
nthreads = NTHREADS ,
161
+ args = (max_heap , pushpop_items ),
161
162
)
162
163
self .assertEqual (len (max_heap ), OBJECT_COUNT )
163
164
self .test_heapq .check_max_invariant (max_heap )
@@ -170,10 +171,10 @@ def heapreplace_max_func(max_heap, replace_items):
170
171
for item in replace_items :
171
172
heapq .heapreplace_max (max_heap , item )
172
173
173
- self . run_concurrently (
174
+ run_concurrently (
174
175
worker_func = heapreplace_max_func ,
175
- args = (max_heap , replace_items ),
176
176
nthreads = NTHREADS ,
177
+ args = (max_heap , replace_items ),
177
178
)
178
179
self .assertEqual (len (max_heap ), OBJECT_COUNT )
179
180
self .test_heapq .check_max_invariant (max_heap )
@@ -203,7 +204,7 @@ def worker():
203
204
except IndexError :
204
205
pass
205
206
206
- self . run_concurrently (worker , () , n_threads * 2 )
207
+ run_concurrently (worker , n_threads * 2 )
207
208
208
209
@staticmethod
209
210
def is_sorted_ascending (lst ):
@@ -241,27 +242,6 @@ def create_random_list(a, b, size):
241
242
"""
242
243
return [randint (- a , b ) for _ in range (size )]
243
244
244
- def run_concurrently (self , worker_func , args , nthreads ):
245
- """
246
- Run the worker function concurrently in multiple threads.
247
- """
248
- barrier = Barrier (nthreads )
249
-
250
- def wrapper_func (* args ):
251
- # Wait for all threads to reach this point before proceeding.
252
- barrier .wait ()
253
- worker_func (* args )
254
-
255
- with threading_helper .catch_threading_exception () as cm :
256
- workers = (
257
- Thread (target = wrapper_func , args = args ) for _ in range (nthreads )
258
- )
259
- with threading_helper .start_threads (workers ):
260
- pass
261
-
262
- # Worker threads should not raise any exceptions
263
- self .assertIsNone (cm .exc_value )
264
-
265
245
266
246
if __name__ == "__main__" :
267
247
unittest .main ()
0 commit comments