Skip to content

Commit 115309f

Browse files
committed
SimpleActorRef now uses SingleThreadExecutor.
1 parent 60deb03 commit 115309f

File tree

3 files changed

+25
-134
lines changed

3 files changed

+25
-134
lines changed
Lines changed: 19 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
require 'thread'
2-
31
require 'concurrent/actor/actor_ref'
42
require 'concurrent/atomic/event'
3+
require 'concurrent/executor/single_thread_executor'
54
require 'concurrent/ivar'
65

76
module Concurrent
@@ -12,15 +11,14 @@ class SimpleActorRef
1211
def initialize(actor, opts = {})
1312
@actor = actor
1413
@mutex = Mutex.new
15-
@queue = Queue.new
16-
@thread = nil
14+
@executor = SingleThreadExecutor.new
1715
@stop_event = Event.new
18-
@abort_on_exception = opts.fetch(:abort_on_exception, true)
1916
@reset_on_error = opts.fetch(:reset_on_error, true)
2017
@exception_class = opts.fetch(:rescue_exception, false) ? Exception : StandardError
2118
self.observers = CopyOnNotifyObserverSet.new
2219

2320
@actor.define_singleton_method(:shutdown, &method(:set_stop_event))
21+
@actor.on_start
2422
end
2523

2624
def running?
@@ -33,11 +31,8 @@ def shutdown?
3331

3432
def post(*msg, &block)
3533
raise ArgumentError.new('message cannot be empty') if msg.empty?
36-
@mutex.synchronize do
37-
supervise unless shutdown?
38-
end
3934
ivar = IVar.new
40-
@queue.push(Message.new(msg, ivar, block))
35+
@executor.post(Message.new(msg, ivar, block), &method(:process_message))
4136
ivar
4237
end
4338

@@ -56,10 +51,8 @@ def post!(timeout, *msg)
5651
def shutdown
5752
@mutex.synchronize do
5853
return if shutdown?
59-
if @thread && @thread.alive?
60-
@thread.kill
61-
@actor.on_shutdown
62-
end
54+
@executor.shutdown
55+
@actor.on_shutdown
6356
@stop_event.set
6457
end
6558
end
@@ -76,49 +69,26 @@ def set_stop_event
7669
@stop_event.set
7770
end
7871

79-
def supervise
80-
if @thread.nil?
81-
@actor.on_start
82-
@thread = new_worker_thread
83-
elsif ! @thread.alive?
84-
@actor.on_reset
85-
@thread = new_worker_thread
86-
end
87-
end
88-
89-
def new_worker_thread
90-
Thread.new do
91-
Thread.current.abort_on_exception = @abort_on_exception
92-
run_message_loop
93-
end
94-
end
72+
def process_message(message)
73+
result = ex = nil
9574

96-
def run_message_loop
97-
loop do
98-
message = @queue.pop
99-
result = ex = nil
75+
begin
76+
result = @actor.receive(*message.payload)
77+
rescue @exception_class => ex
78+
@actor.on_error(Time.now, message.payload, ex)
79+
@actor.on_reset if @reset_on_error
80+
ensure
81+
now = Time.now
82+
message.ivar.complete(ex.nil?, result, ex)
10083

10184
begin
102-
result = @actor.receive(*message.payload)
85+
message.callback.call(now, result, ex) if message.callback
10386
rescue @exception_class => ex
104-
@actor.on_error(Time.now, message.payload, ex)
105-
@actor.on_reset if @reset_on_error
106-
ensure
107-
now = Time.now
108-
message.ivar.complete(ex.nil?, result, ex)
109-
110-
begin
111-
message.callback.call(now, result, ex) if message.callback
112-
rescue @exception_class => ex
113-
# suppress
114-
end
115-
116-
observers.notify_observers(now, message.payload, result, ex)
87+
# suppress
11788
end
11889

119-
break if @stop_event.set?
90+
observers.notify_observers(now, message.payload, result, ex)
12091
end
121-
@actor.on_shutdown
12292
end
12393
end
12494
end

spec/concurrent/actor/actor_ref_shared.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@ def receive(*msg)
9999
end
100100

101101
it 'returns self' do
102-
pending('intermittently failing on Travis CI -- SimpleActorRef')
103-
(subject << [1,2,3,4]).should eq subject
102+
expected = subject
103+
result = subject << [1,2,3,4]
104+
result.should eq expected
104105
end
105106
end
106107

spec/concurrent/actor/simple_actor_ref_spec.rb

Lines changed: 3 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -36,91 +36,11 @@ module Concurrent
3636
described_class.should_receive(:new).once.with(anything, opts)
3737
shared_actor_test_class.spawn(opts)
3838
end
39-
end
40-
41-
context 'supervision' do
42-
43-
it 'does not start a new thread on construction' do
44-
Thread.should_not_receive(:new).with(any_args)
45-
subject = shared_actor_test_class.spawn
46-
end
47-
48-
it 'starts a new thread on the first post' do
49-
thread = Thread.new{ nil }
50-
Thread.should_receive(:new).once.with(no_args).and_return(thread)
51-
subject << :foo
52-
end
53-
54-
it 'does not start a new thread after the first post' do
55-
subject << :foo
56-
sleep(0.1)
57-
expected = Thread.list.length
58-
5.times{ subject << :foo }
59-
Thread.list.length.should eq expected
60-
end
61-
62-
it 'starts a new thread when the prior thread has died' do
63-
subject << :foo
64-
sleep(0.1)
65-
66-
subject << :terminate
67-
sleep(0.1)
68-
69-
thread = Thread.new{ nil }
70-
Thread.should_receive(:new).once.with(no_args).and_return(thread)
71-
subject << :foo
72-
end
73-
74-
it 'does not reset the thread after shutdown' do
75-
thread = Thread.new{ nil }
76-
Thread.should_receive(:new).once.with(no_args).and_return(thread)
77-
subject << :foo
78-
sleep(0.1)
79-
80-
subject.shutdown
81-
sleep(0.1)
82-
83-
subject << :foo
84-
end
8539

86-
it 'calls #on_start when the thread is first started' do
87-
actor = subject.instance_variable_get(:@actor)
40+
it 'calls #on_start on the actor' do
41+
actor = double(:shared_actor_test_class)
8842
actor.should_receive(:on_start).once.with(no_args)
89-
subject << :foo
90-
end
91-
92-
it 'calls #on_reset when the thread is started after the first time' do
93-
actor = subject.instance_variable_get(:@actor)
94-
actor.should_receive(:on_reset).once.with(no_args)
95-
subject << :terminate
96-
sleep(0.1)
97-
subject << :foo
98-
end
99-
end
100-
101-
context 'abort_on_exception' do
102-
103-
after(:each) do
104-
@ref.shutdown if @ref
105-
end
106-
107-
it 'gets set on the actor thread' do
108-
@ref = shared_actor_test_class.spawn(abort_on_exception: true)
109-
@ref << :foo
110-
sleep(0.1)
111-
@ref.instance_variable_get(:@thread).abort_on_exception.should be_true
112-
113-
@ref = shared_actor_test_class.spawn(abort_on_exception: false)
114-
@ref << :foo
115-
sleep(0.1)
116-
@ref.instance_variable_get(:@thread).abort_on_exception.should be_false
117-
end
118-
119-
it 'defaults to true' do
120-
@ref = shared_actor_test_class.spawn
121-
@ref << :foo
122-
sleep(0.1)
123-
@ref.instance_variable_get(:@thread).abort_on_exception.should be_true
43+
SimpleActorRef.new(actor)
12444
end
12545
end
12646

0 commit comments

Comments
 (0)