Skip to content

Commit 5509cdb

Browse files
committed
PerThreadExecutor is now an executor service.
1 parent c067cab commit 5509cdb

File tree

2 files changed

+70
-19
lines changed

2 files changed

+70
-19
lines changed
Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,78 @@
1+
require 'concurrent/atomic/event'
2+
require 'concurrent/executor/executor'
3+
14
module Concurrent
25

36
class PerThreadExecutor
4-
include Executor
7+
include SerialExecutor
8+
9+
def initialize
10+
@running = Concurrent::AtomicBoolean.new(true)
11+
@stopped = Concurrent::Event.new
12+
@count = Concurrent::AtomicFixnum.new(0)
13+
end
514

615
def self.post(*args)
716
raise ArgumentError.new('no block given') unless block_given?
817
Thread.new(*args) do
918
Thread.current.abort_on_exception = false
1019
yield(*args)
1120
end
12-
return true
21+
true
1322
end
1423

1524
def post(*args, &task)
16-
return PerThreadExecutor.post(*args, &task)
25+
raise ArgumentError.new('no block given') unless block_given?
26+
return false unless running?
27+
@count.increment
28+
Thread.new(*args) do
29+
Thread.current.abort_on_exception = false
30+
begin
31+
yield(*args)
32+
ensure
33+
@count.decrement
34+
@stopped.set if @running.false? && @count.value == 0
35+
end
36+
end
1737
end
1838

1939
def <<(task)
20-
PerThreadExecutor.post(&task)
21-
return self
40+
post(&task)
41+
self
42+
end
43+
44+
# @!macro executor_method_running_question
45+
def running?
46+
@running.true?
47+
end
48+
49+
# @!macro executor_method_shuttingdown_question
50+
def shuttingdown?
51+
@running.false? && ! @stopped.set?
52+
end
53+
54+
# @!macro executor_method_shutdown_question
55+
def shutdown?
56+
@stopped.set?
57+
end
58+
59+
# @!macro executor_method_shutdown
60+
def shutdown
61+
@running.make_false
62+
@stopped.set if @count.value == 0
63+
true
64+
end
65+
66+
# @!macro executor_method_kill
67+
def kill
68+
@running.make_false
69+
@stopped.set
70+
true
71+
end
72+
73+
# @!macro executor_method_wait_for_termination
74+
def wait_for_termination(timeout = nil)
75+
@stopped.wait(timeout)
2276
end
2377
end
2478
end

spec/concurrent/executor/per_thread_executor_spec.rb

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
require 'spec_helper'
2-
require_relative 'global_thread_pool_shared'
2+
require_relative 'thread_pool_shared'
33

44
module Concurrent
55

66
describe PerThreadExecutor do
77

88
subject { PerThreadExecutor.new }
99

10-
it_should_behave_like :global_thread_pool
10+
it_should_behave_like :executor_service
1111

1212
context '#post' do
1313

@@ -19,10 +19,9 @@ module Concurrent
1919
end
2020

2121
it 'executes a call without arguments' do
22-
@expected = false
23-
subject.post{ @expected = true }
24-
sleep(0.1)
25-
@expected.should be_true
22+
latch = CountDownLatch.new(1)
23+
subject.post{ latch.count_down }
24+
latch.wait(1).should be_true
2625
end
2726

2827
it 'creates a new thread for a call with arguments' do
@@ -33,17 +32,15 @@ module Concurrent
3332
end
3433

3534
it 'executes a call with one argument' do
36-
@expected = 0
37-
subject.post(1){|one| @expected = one }
38-
sleep(0.1)
39-
@expected.should == 1
35+
latch = CountDownLatch.new(3)
36+
subject.post(3){|count| count.times{ latch.count_down } }
37+
latch.wait(1).should be_true
4038
end
4139

4240
it 'executes a call with multiple arguments' do
43-
@expected = nil
44-
subject.post(1,2,3,4,5){|*args| @expected = args }
45-
sleep(0.1)
46-
@expected.should eq [1,2,3,4,5]
41+
latch = CountDownLatch.new(10)
42+
subject.post(1,2,3,4){|*count| count.reduce(:+).times{ latch.count_down } }
43+
latch.wait(1).should be_true
4744
end
4845

4946
it 'aliases #<<' do

0 commit comments

Comments
 (0)