|
216 | 216 | end
|
217 | 217 |
|
218 | 218 | specify 'a #post task is never executed when the queue is at capacity' do
|
219 |
| - executed = Concurrent::AtomicFixnum.new(0) |
220 |
| - 10.times do |
221 |
| - begin |
222 |
| - subject.post{ executed.increment; sleep(0.1) } |
223 |
| - rescue |
224 |
| - end |
| 219 | + all_tasks_posted = Concurrent::Event.new |
| 220 | + |
| 221 | + latch = Concurrent::CountDownLatch.new(max_threads) |
| 222 | + |
| 223 | + initial_executed = Concurrent::AtomicFixnum.new(0) |
| 224 | + subsequent_executed = Concurrent::AtomicFixnum.new(0) |
| 225 | + |
| 226 | + # Fill up all the threads (with a task that won't complete until |
| 227 | + # all tasks are posted) |
| 228 | + max_threads.times do |
| 229 | + subject.post{ latch.count_down; all_tasks_posted.wait ; initial_executed.increment;} |
225 | 230 | end
|
226 |
| - sleep(0.2) |
227 |
| - expect(executed.value).to be < 10 |
| 231 | + |
| 232 | + # Wait for all those tasks to be taken off the queue onto a |
| 233 | + # worker thread and start executing |
| 234 | + latch.wait |
| 235 | + |
| 236 | + # Fill up the queue (with a task that won't complete until |
| 237 | + # all tasks are posted) |
| 238 | + max_queue.times do |
| 239 | + subject.post{ all_tasks_posted.wait; initial_executed.increment; } |
| 240 | + end |
| 241 | + |
| 242 | + # Inject 100 more tasks, which should throw an exception |
| 243 | + 100.times do |
| 244 | + expect { |
| 245 | + subject.post { subsequent_executed.increment; } |
| 246 | + }.to raise_error(Concurrent::RejectedExecutionError) |
| 247 | + end |
| 248 | + |
| 249 | + # Trigger the event, so that the tasks in the threads and on |
| 250 | + # the queue can run to completion |
| 251 | + all_tasks_posted.set |
| 252 | + |
| 253 | + # Wait for all tasks to finish |
| 254 | + subject.shutdown |
| 255 | + subject.wait_for_termination |
| 256 | + |
| 257 | + # The tasks should have run until all the threads and the |
| 258 | + # queue filled up... |
| 259 | + expect(initial_executed.value).to be (max_threads + max_queue) |
| 260 | + |
| 261 | + # ..but been dropped after that |
| 262 | + expect(subsequent_executed.value).to be 0 |
228 | 263 | end
|
229 | 264 |
|
230 | 265 | specify 'a #<< task is never executed when the queue is at capacity' do
|
231 |
| - executed = Concurrent::AtomicFixnum.new(0) |
232 |
| - 10.times do |
233 |
| - begin |
234 |
| - subject << proc { executed.increment; sleep(0.1) } |
235 |
| - rescue |
236 |
| - end |
| 266 | + all_tasks_posted = Concurrent::Event.new |
| 267 | + |
| 268 | + latch = Concurrent::CountDownLatch.new(max_threads) |
| 269 | + |
| 270 | + initial_executed = Concurrent::AtomicFixnum.new(0) |
| 271 | + subsequent_executed = Concurrent::AtomicFixnum.new(0) |
| 272 | + |
| 273 | + # Fill up all the threads (with a task that won't complete until |
| 274 | + # all tasks are posted) |
| 275 | + max_threads.times do |
| 276 | + subject << proc { latch.count_down; all_tasks_posted.wait ; initial_executed.increment;} |
| 277 | + end |
| 278 | + |
| 279 | + # Wait for all those tasks to be taken off the queue onto a |
| 280 | + # worker thread and start executing |
| 281 | + latch.wait |
| 282 | + |
| 283 | + # Fill up the queue (with a task that won't complete until |
| 284 | + # all tasks are posted) |
| 285 | + max_queue.times do |
| 286 | + subject << proc { all_tasks_posted.wait; initial_executed.increment; } |
237 | 287 | end
|
238 |
| - sleep(0.2) |
239 |
| - expect(executed.value).to be < 10 |
| 288 | + |
| 289 | + # Inject 100 more tasks, which should throw an exeption |
| 290 | + 100.times do |
| 291 | + expect { |
| 292 | + subject << proc { subsequent_executed.increment; } |
| 293 | + }.to raise_error(Concurrent::RejectedExecutionError) |
| 294 | + end |
| 295 | + |
| 296 | + # Trigger the event, so that the tasks in the threads and on |
| 297 | + # the queue can run to completion |
| 298 | + all_tasks_posted.set |
| 299 | + |
| 300 | + # Wait for all tasks to finish |
| 301 | + subject.shutdown |
| 302 | + subject.wait_for_termination |
| 303 | + |
| 304 | + # The tasks should have run until all the threads and the |
| 305 | + # queue filled up... |
| 306 | + expect(initial_executed.value).to be (max_threads + max_queue) |
| 307 | + |
| 308 | + # ..but been rejected after that |
| 309 | + expect(subsequent_executed.value).to be 0 |
240 | 310 | end
|
241 | 311 | end
|
242 |
| - |
| 312 | + |
243 | 313 | context ':discard' do
|
244 | 314 |
|
245 | 315 | subject do
|
|
367 | 437 | end
|
368 | 438 |
|
369 | 439 | specify '#post returns false when the executor is shutting down' do
|
370 |
| - executed = Concurrent::AtomicFixnum.new(0) |
371 | 440 | subject.shutdown
|
372 |
| - ret = subject.post{ executed.increment } |
| 441 | + ret = subject.post{ nil } |
373 | 442 | expect(ret).to be false
|
374 | 443 | end
|
375 | 444 | end
|
|
0 commit comments