Skip to content

Commit aaeef49

Browse files
committed
Merge pull request #211 from rkday/queue_overflow_uts
Further reworking of queue overflow tests
2 parents c3662b9 + 45106b4 commit aaeef49

File tree

1 file changed

+149
-37
lines changed

1 file changed

+149
-37
lines changed

spec/concurrent/executor/thread_pool_executor_shared.rb

Lines changed: 149 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -216,30 +216,100 @@
216216
end
217217

218218
specify 'a #post task is never executed when the queue is at capacity' do
219-
executed = Concurrent::AtomicFixnum.new(0)
220-
10.times do
221-
begin
222-
subject.post{ executed.increment; sleep(0.1) }
223-
rescue
224-
end
219+
all_tasks_posted = Concurrent::Event.new
220+
221+
latch = Concurrent::CountDownLatch.new(max_threads)
222+
223+
initial_executed = Concurrent::AtomicFixnum.new(0)
224+
subsequent_executed = Concurrent::AtomicFixnum.new(0)
225+
226+
# Fill up all the threads (with a task that won't complete until
227+
# all tasks are posted)
228+
max_threads.times do
229+
subject.post{ latch.count_down; all_tasks_posted.wait ; initial_executed.increment;}
225230
end
226-
sleep(0.2)
227-
expect(executed.value).to be < 10
231+
232+
# Wait for all those tasks to be taken off the queue onto a
233+
# worker thread and start executing
234+
latch.wait
235+
236+
# Fill up the queue (with a task that won't complete until
237+
# all tasks are posted)
238+
max_queue.times do
239+
subject.post{ all_tasks_posted.wait; initial_executed.increment; }
240+
end
241+
242+
# Inject 100 more tasks, which should throw an exception
243+
100.times do
244+
expect {
245+
subject.post { subsequent_executed.increment; }
246+
}.to raise_error(Concurrent::RejectedExecutionError)
247+
end
248+
249+
# Trigger the event, so that the tasks in the threads and on
250+
# the queue can run to completion
251+
all_tasks_posted.set
252+
253+
# Wait for all tasks to finish
254+
subject.shutdown
255+
subject.wait_for_termination
256+
257+
# The tasks should have run until all the threads and the
258+
# queue filled up...
259+
expect(initial_executed.value).to be (max_threads + max_queue)
260+
261+
# ..but been dropped after that
262+
expect(subsequent_executed.value).to be 0
228263
end
229264

230265
specify 'a #<< task is never executed when the queue is at capacity' do
231-
executed = Concurrent::AtomicFixnum.new(0)
232-
10.times do
233-
begin
234-
subject << proc { executed.increment; sleep(0.1) }
235-
rescue
236-
end
266+
all_tasks_posted = Concurrent::Event.new
267+
268+
latch = Concurrent::CountDownLatch.new(max_threads)
269+
270+
initial_executed = Concurrent::AtomicFixnum.new(0)
271+
subsequent_executed = Concurrent::AtomicFixnum.new(0)
272+
273+
# Fill up all the threads (with a task that won't complete until
274+
# all tasks are posted)
275+
max_threads.times do
276+
subject << proc { latch.count_down; all_tasks_posted.wait ; initial_executed.increment;}
277+
end
278+
279+
# Wait for all those tasks to be taken off the queue onto a
280+
# worker thread and start executing
281+
latch.wait
282+
283+
# Fill up the queue (with a task that won't complete until
284+
# all tasks are posted)
285+
max_queue.times do
286+
subject << proc { all_tasks_posted.wait; initial_executed.increment; }
237287
end
238-
sleep(0.2)
239-
expect(executed.value).to be < 10
288+
289+
# Inject 100 more tasks, which should throw an exeption
290+
100.times do
291+
expect {
292+
subject << proc { subsequent_executed.increment; }
293+
}.to raise_error(Concurrent::RejectedExecutionError)
294+
end
295+
296+
# Trigger the event, so that the tasks in the threads and on
297+
# the queue can run to completion
298+
all_tasks_posted.set
299+
300+
# Wait for all tasks to finish
301+
subject.shutdown
302+
subject.wait_for_termination
303+
304+
# The tasks should have run until all the threads and the
305+
# queue filled up...
306+
expect(initial_executed.value).to be (max_threads + max_queue)
307+
308+
# ..but been rejected after that
309+
expect(subsequent_executed.value).to be 0
240310
end
241311
end
242-
312+
243313
context ':discard' do
244314

245315
subject do
@@ -253,38 +323,37 @@
253323
end
254324

255325
specify 'a #post task is never executed when the queue is at capacity' do
256-
lock = Mutex.new
257-
lock.lock
326+
all_tasks_posted = Concurrent::Event.new
258327

259328
latch = Concurrent::CountDownLatch.new(max_threads)
260329

261330
initial_executed = Concurrent::AtomicFixnum.new(0)
262331
subsequent_executed = Concurrent::AtomicFixnum.new(0)
263332

264-
# Fill up all the threads (with a task that won't run until
265-
# lock.unlock is called)
333+
# Fill up all the threads (with a task that won't complete until
334+
# all tasks are posted)
266335
max_threads.times do
267-
subject.post{ latch.count_down; lock.lock; initial_executed.increment; lock.unlock }
336+
subject.post{ latch.count_down; all_tasks_posted.wait ; initial_executed.increment;}
268337
end
269338

270339
# Wait for all those tasks to be taken off the queue onto a
271340
# worker thread and start executing
272341
latch.wait
273342

274-
# Fill up the queue (with a task that won't run until
275-
# lock.unlock is called)
343+
# Fill up the queue (with a task that won't complete until
344+
# all tasks are posted)
276345
max_queue.times do
277-
subject.post{ lock.lock; initial_executed.increment; lock.unlock }
346+
subject.post{ all_tasks_posted.wait; initial_executed.increment; }
278347
end
279348

280349
# Inject 100 more tasks, which should be dropped without an exception
281350
100.times do
282351
subject.post{ subsequent_executed.increment; }
283352
end
284353

285-
# Unlock the lock, so that the tasks in the threads and on
354+
# Trigger the event, so that the tasks in the threads and on
286355
# the queue can run to completion
287-
lock.unlock
356+
all_tasks_posted.set
288357

289358
# Wait for all tasks to finish
290359
subject.shutdown
@@ -299,34 +368,77 @@
299368
end
300369

301370
specify 'a #<< task is never executed when the queue is at capacity' do
302-
executed = Concurrent::AtomicFixnum.new(0)
303-
1000.times do
304-
subject << proc { sleep; executed.increment }
371+
all_tasks_posted = Concurrent::Event.new
372+
373+
latch = Concurrent::CountDownLatch.new(max_threads)
374+
375+
initial_executed = Concurrent::AtomicFixnum.new(0)
376+
subsequent_executed = Concurrent::AtomicFixnum.new(0)
377+
378+
# Fill up all the threads (with a task that won't complete until
379+
# all tasks are posted)
380+
max_threads.times do
381+
subject << proc { latch.count_down; all_tasks_posted.wait ; initial_executed.increment;}
305382
end
306-
sleep(0.1)
307-
expect(executed.value).to be 0
383+
384+
# Wait for all those tasks to be taken off the queue onto a
385+
# worker thread and start executing
386+
latch.wait
387+
388+
# Fill up the queue (with a task that won't complete until
389+
# all tasks are posted)
390+
max_queue.times do
391+
subject << proc { all_tasks_posted.wait; initial_executed.increment; }
392+
end
393+
394+
# Inject 100 more tasks, which should be dropped without an exception
395+
100.times do
396+
subject << proc { subsequent_executed.increment; }
397+
end
398+
399+
# Trigger the event, so that the tasks in the threads and on
400+
# the queue can run to completion
401+
all_tasks_posted.set
402+
403+
# Wait for all tasks to finish
404+
subject.shutdown
405+
subject.wait_for_termination
406+
407+
# The tasks should have run until all the threads and the
408+
# queue filled up...
409+
expect(initial_executed.value).to be (max_threads + max_queue)
410+
411+
# ..but been dropped after that
412+
expect(subsequent_executed.value).to be 0
308413
end
309414

310415
specify 'a #post task is never executed when the executor is shutting down' do
311416
executed = Concurrent::AtomicFixnum.new(0)
417+
312418
subject.shutdown
313-
subject.post{ sleep; executed.increment }
314-
sleep(0.1)
419+
subject.post{ executed.increment }
420+
421+
# Wait for all tasks to finish
422+
subject.wait_for_termination
423+
315424
expect(executed.value).to be 0
316425
end
317426

318427
specify 'a #<< task is never executed when the executor is shutting down' do
319428
executed = Concurrent::AtomicFixnum.new(0)
429+
320430
subject.shutdown
321431
subject << proc { executed.increment }
322-
sleep(0.1)
432+
433+
# Wait for all tasks to finish
434+
subject.wait_for_termination
435+
323436
expect(executed.value).to be 0
324437
end
325438

326439
specify '#post returns false when the executor is shutting down' do
327-
executed = Concurrent::AtomicFixnum.new(0)
328440
subject.shutdown
329-
ret = subject.post{ executed.increment }
441+
ret = subject.post{ nil }
330442
expect(ret).to be false
331443
end
332444
end

0 commit comments

Comments
 (0)