Skip to content

Commit 7dd0e49

Browse files
committed
Extend reworking of queue overflow tests
1 parent 445146a commit 7dd0e49

File tree

1 file changed

+61
-18
lines changed

1 file changed

+61
-18
lines changed

spec/concurrent/executor/thread_pool_executor_shared.rb

Lines changed: 61 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -253,38 +253,37 @@
253253
end
254254

255255
specify 'a #post task is never executed when the queue is at capacity' do
256-
lock = Mutex.new
257-
lock.lock
256+
all_tasks_posted = Concurrent::Event.new
258257

259258
latch = Concurrent::CountDownLatch.new(max_threads)
260259

261260
initial_executed = Concurrent::AtomicFixnum.new(0)
262261
subsequent_executed = Concurrent::AtomicFixnum.new(0)
263262

264-
# Fill up all the threads (with a task that won't run until
265-
# lock.unlock is called)
263+
# Fill up all the threads (with a task that won't complete until
264+
# all tasks are posted)
266265
max_threads.times do
267-
subject.post{ latch.count_down; lock.lock; initial_executed.increment; lock.unlock }
266+
subject.post{ latch.count_down; all_tasks_posted.wait ; initial_executed.increment;}
268267
end
269268

270269
# Wait for all those tasks to be taken off the queue onto a
271270
# worker thread and start executing
272271
latch.wait
273272

274-
# Fill up the queue (with a task that won't run until
275-
# lock.unlock is called)
273+
# Fill up the queue (with a task that won't complete until
274+
# all tasks are posted)
276275
max_queue.times do
277-
subject.post{ lock.lock; initial_executed.increment; lock.unlock }
276+
subject.post{ all_tasks_posted.wait; initial_executed.increment; }
278277
end
279278

280279
# Inject 100 more tasks, which should be dropped without an exception
281280
100.times do
282281
subject.post{ subsequent_executed.increment; }
283282
end
284283

285-
# Unlock the lock, so that the tasks in the threads and on
284+
# Trigger the event, so that the tasks in the threads and on
286285
# the queue can run to completion
287-
lock.unlock
286+
all_tasks_posted.set
288287

289288
# Wait for all tasks to finish
290289
subject.shutdown
@@ -299,27 +298,71 @@
299298
end
300299

301300
specify 'a #<< task is never executed when the queue is at capacity' do
302-
executed = Concurrent::AtomicFixnum.new(0)
303-
1000.times do
304-
subject << proc { sleep; executed.increment }
301+
all_tasks_posted = Concurrent::Event.new
302+
303+
latch = Concurrent::CountDownLatch.new(max_threads)
304+
305+
initial_executed = Concurrent::AtomicFixnum.new(0)
306+
subsequent_executed = Concurrent::AtomicFixnum.new(0)
307+
308+
# Fill up all the threads (with a task that won't complete until
309+
# all tasks are posted)
310+
max_threads.times do
311+
subject << proc { latch.count_down; all_tasks_posted.wait ; initial_executed.increment;}
305312
end
306-
sleep(0.1)
307-
expect(executed.value).to be 0
313+
314+
# Wait for all those tasks to be taken off the queue onto a
315+
# worker thread and start executing
316+
latch.wait
317+
318+
# Fill up the queue (with a task that won't complete until
319+
# all tasks are posted)
320+
max_queue.times do
321+
subject << proc { all_tasks_posted.wait; initial_executed.increment; }
322+
end
323+
324+
# Inject 100 more tasks, which should be dropped without an exception
325+
100.times do
326+
subject << proc { subsequent_executed.increment; }
327+
end
328+
329+
# Trigger the event, so that the tasks in the threads and on
330+
# the queue can run to completion
331+
all_tasks_posted.set
332+
333+
# Wait for all tasks to finish
334+
subject.shutdown
335+
subject.wait_for_termination
336+
337+
# The tasks should have run until all the threads and the
338+
# queue filled up...
339+
expect(initial_executed.value).to be (max_threads + max_queue)
340+
341+
# ..but been dropped after that
342+
expect(subsequent_executed.value).to be 0
308343
end
309344

310345
specify 'a #post task is never executed when the executor is shutting down' do
311346
executed = Concurrent::AtomicFixnum.new(0)
347+
312348
subject.shutdown
313-
subject.post{ sleep; executed.increment }
314-
sleep(0.1)
349+
subject.post{ executed.increment }
350+
351+
# Wait for all tasks to finish
352+
subject.wait_for_termination
353+
315354
expect(executed.value).to be 0
316355
end
317356

318357
specify 'a #<< task is never executed when the executor is shutting down' do
319358
executed = Concurrent::AtomicFixnum.new(0)
359+
320360
subject.shutdown
321361
subject << proc { executed.increment }
322-
sleep(0.1)
362+
363+
# Wait for all tasks to finish
364+
subject.wait_for_termination
365+
323366
expect(executed.value).to be 0
324367
end
325368

0 commit comments

Comments
 (0)