|
6 | 6 |
|
7 | 7 | after(:each) do
|
8 | 8 | subject.kill
|
9 |
| - sleep(0.1) |
| 9 | + subject.wait_for_termination(0.1) |
10 | 10 | end
|
11 | 11 |
|
12 | 12 | context '#initialize defaults' do
|
|
92 | 92 | end
|
93 | 93 |
|
94 | 94 | it 'returns the set value when running' do
|
95 |
| - 5.times{ subject.post{ sleep(0.1) } } |
96 |
| - sleep(0.1) |
| 95 | + trigger = Concurrent::Event.new |
| 96 | + 5.times{ subject.post{ trigger.wait } } |
97 | 97 | expect(subject.max_queue).to eq expected_max
|
| 98 | + trigger.set |
98 | 99 | end
|
99 | 100 |
|
100 | 101 | it 'returns the set value after stopping' do
|
101 |
| - 5.times{ subject.post{ sleep(0.1) } } |
102 |
| - sleep(0.1) |
| 102 | + 5.times{ subject.post{ nil } } |
103 | 103 | subject.shutdown
|
104 | 104 | subject.wait_for_termination(1)
|
105 | 105 | expect(subject.max_queue).to eq expected_max
|
|
123 | 123 | end
|
124 | 124 |
|
125 | 125 | it 'returns zero when there are no enqueued tasks' do
|
126 |
| - 5.times{ subject.post{ nil } } |
127 |
| - sleep(0.1) |
| 126 | + latch = Concurrent::CountDownLatch.new(5) |
| 127 | + 5.times{ subject.post{ latch.count_down } } |
| 128 | + latch.wait(0.1) |
128 | 129 | expect(subject.queue_length).to eq 0
|
129 | 130 | end
|
130 | 131 |
|
131 | 132 | it 'returns the size of the queue when tasks are enqueued' do
|
132 |
| - 100.times{ subject.post{ sleep(0.5) } } |
133 |
| - sleep(0.1) |
| 133 | + trigger = Concurrent::Event.new |
| 134 | + 20.times{ subject.post{ trigger.wait } } |
134 | 135 | expect(subject.queue_length).to be > 0
|
| 136 | + trigger.set |
135 | 137 | end
|
136 | 138 |
|
137 | 139 | it 'returns zero when stopped' do
|
138 |
| - 100.times{ subject.post{ sleep(0.5) } } |
139 |
| - sleep(0.1) |
| 140 | + trigger = Concurrent::Event.new |
| 141 | + 20.times{ subject.post{ trigger.wait } } |
140 | 142 | subject.shutdown
|
| 143 | + trigger.set |
141 | 144 | subject.wait_for_termination(1)
|
142 | 145 | expect(subject.queue_length).to eq 0
|
143 | 146 | end
|
144 | 147 |
|
145 | 148 | it 'can never be greater than :max_queue' do
|
146 |
| - 100.times{ subject.post{ sleep(0.5) } } |
147 |
| - sleep(0.1) |
| 149 | + trigger = Concurrent::Event.new |
| 150 | + 20.times{ subject.post{ trigger.wait } } |
148 | 151 | expect(subject.queue_length).to be <= expected_max
|
| 152 | + trigger.set |
149 | 153 | end
|
150 | 154 | end
|
151 | 155 |
|
|
165 | 169 |
|
166 | 170 | it 'returns :max_length when stopped' do
|
167 | 171 | 100.times{ subject.post{ nil } }
|
168 |
| - sleep(0.1) |
169 | 172 | subject.shutdown
|
170 | 173 | subject.wait_for_termination(1)
|
171 | 174 | expect(subject.remaining_capacity).to eq expected_max
|
|
192 | 195 | end
|
193 | 196 |
|
194 | 197 | specify '#post raises an error when the queue is at capacity' do
|
| 198 | + trigger = Concurrent::Event.new |
195 | 199 | expect {
|
196 |
| - 100.times{ subject.post{ sleep(1) } } |
| 200 | + 20.times{ subject.post{ trigger.wait } } |
197 | 201 | }.to raise_error(Concurrent::RejectedExecutionError)
|
| 202 | + trigger.set |
198 | 203 | end
|
199 | 204 |
|
200 | 205 | specify '#<< raises an error when the queue is at capacity' do
|
| 206 | + trigger = Concurrent::Event.new |
201 | 207 | expect {
|
202 |
| - 100.times{ subject << proc { sleep(1) } } |
| 208 | + 20.times{ subject << proc { trigger.wait } } |
203 | 209 | }.to raise_error(Concurrent::RejectedExecutionError)
|
| 210 | + trigger.set |
204 | 211 | end
|
205 | 212 |
|
206 | 213 | specify '#post raises an error when the executor is shutting down' do
|
| 214 | + trigger = Concurrent::Event.new |
207 | 215 | expect {
|
208 |
| - subject.shutdown; subject.post{ sleep(1) } |
| 216 | + subject.shutdown; subject.post{ trigger.wait } |
209 | 217 | }.to raise_error(Concurrent::RejectedExecutionError)
|
| 218 | + trigger.set |
210 | 219 | end
|
211 | 220 |
|
212 | 221 | specify '#<< raises an error when the executor is shutting down' do
|
| 222 | + trigger = Concurrent::Event.new |
213 | 223 | expect {
|
214 |
| - subject.shutdown; subject << proc { sleep(1) } |
| 224 | + subject.shutdown; subject << proc { trigger.wait } |
215 | 225 | }.to raise_error(Concurrent::RejectedExecutionError)
|
| 226 | + trigger.set |
216 | 227 | end
|
217 | 228 |
|
218 | 229 | specify 'a #post task is never executed when the queue is at capacity' do
|
|
239 | 250 | subject.post{ all_tasks_posted.wait; initial_executed.increment; }
|
240 | 251 | end
|
241 | 252 |
|
242 |
| - # Inject 100 more tasks, which should throw an exception |
243 |
| - 100.times do |
| 253 | + # Inject 20 more tasks, which should throw an exception |
| 254 | + 20.times do |
244 | 255 | expect {
|
245 | 256 | subject.post { subsequent_executed.increment; }
|
246 | 257 | }.to raise_error(Concurrent::RejectedExecutionError)
|
|
286 | 297 | subject << proc { all_tasks_posted.wait; initial_executed.increment; }
|
287 | 298 | end
|
288 | 299 |
|
289 |
| - # Inject 100 more tasks, which should throw an exeption |
290 |
| - 100.times do |
| 300 | + # Inject 20 more tasks, which should throw an exeption |
| 301 | + 20.times do |
291 | 302 | expect {
|
292 | 303 | subject << proc { subsequent_executed.increment; }
|
293 | 304 | }.to raise_error(Concurrent::RejectedExecutionError)
|
|
346 | 357 | subject.post{ all_tasks_posted.wait; initial_executed.increment; }
|
347 | 358 | end
|
348 | 359 |
|
349 |
| - # Inject 100 more tasks, which should be dropped without an exception |
350 |
| - 100.times do |
| 360 | + # Inject 20 more tasks, which should be dropped without an exception |
| 361 | + 20.times do |
351 | 362 | subject.post{ subsequent_executed.increment; }
|
352 | 363 | end
|
353 | 364 |
|
|
391 | 402 | subject << proc { all_tasks_posted.wait; initial_executed.increment; }
|
392 | 403 | end
|
393 | 404 |
|
394 |
| - # Inject 100 more tasks, which should be dropped without an exception |
395 |
| - 100.times do |
| 405 | + # Inject 20 more tasks, which should be dropped without an exception |
| 406 | + 20.times do |
396 | 407 | subject << proc { subsequent_executed.increment; }
|
397 | 408 | end
|
398 | 409 |
|
|
456 | 467 | end
|
457 | 468 |
|
458 | 469 | specify '#post does not create any new threads when the queue is at capacity' do
|
| 470 | + trigger = Concurrent::Event.new |
459 | 471 | initial = Thread.list.length
|
460 |
| - 5.times{ subject.post{ sleep(0.1) } } |
461 |
| - expect(Thread.list.length).to be < initial + 5 |
| 472 | + |
| 473 | + # Post several tasks to the executor. Has to be a new thread, |
| 474 | + # because it will start blocking once the queue fills up. |
| 475 | + Thread.new do |
| 476 | + 5.times{ subject.post{ trigger.wait } } |
| 477 | + end |
| 478 | + |
| 479 | + expect(Thread.list.length).to be < initial + 1 + 5 |
| 480 | + |
| 481 | + # Let the executor tasks complete. |
| 482 | + trigger.set |
462 | 483 | end
|
463 | 484 |
|
464 | 485 | specify '#<< executes the task on the current thread when the queue is at capacity' do
|
| 486 | + trigger = Concurrent::Event.new |
465 | 487 | latch = Concurrent::CountDownLatch.new(5)
|
466 |
| - subject.post{ sleep(1) } |
| 488 | + subject.post{ trigger.wait } |
467 | 489 | 5.times{|i| subject << proc { latch.count_down } }
|
468 | 490 | latch.wait(0.1)
|
| 491 | + trigger.set |
469 | 492 | end
|
470 | 493 |
|
471 | 494 | specify '#post executes the task on the current thread when the queue is at capacity' do
|
| 495 | + trigger = Concurrent::Event.new |
472 | 496 | latch = Concurrent::CountDownLatch.new(5)
|
473 |
| - subject.post{ sleep(1) } |
| 497 | + subject.post{ trigger.wait } |
474 | 498 | 5.times{|i| subject.post{ latch.count_down } }
|
475 | 499 | latch.wait(0.1)
|
| 500 | + trigger.set |
476 | 501 | end
|
477 | 502 |
|
478 | 503 | specify '#post executes the task on the current thread when the executor is shutting down' do
|
|
0 commit comments