Skip to content

Commit ff3aca7

Browse files
committed
Added observation and better exception handling to SimpleActorRef.
1 parent cd97610 commit ff3aca7

File tree

5 files changed

+204
-8
lines changed

5 files changed

+204
-8
lines changed

lib/concurrent/actor_context.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class << base
2020

2121
def spawn(opts = {})
2222
args = opts.fetch(:args, [])
23-
Concurrent::SimpleActorRef.new(self.new(*args))
23+
Concurrent::SimpleActorRef.new(self.new(*args), opts)
2424
end
2525
end
2626
end

lib/concurrent/actor_ref.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
require 'concurrent/copy_on_notify_observer_set'
12
require 'concurrent/utilities'
23

34
module Concurrent
@@ -24,5 +25,23 @@ def <<(message)
2425
post(*message)
2526
self
2627
end
28+
29+
def add_observer(*args)
30+
@observers.add_observer(*args)
31+
end
32+
33+
def delete_observer(*args)
34+
@observers.delete_observer(*args)
35+
end
36+
37+
def delete_observers
38+
@observers.delete_observers
39+
end
40+
41+
protected
42+
43+
def observers
44+
@observers ||= CopyOnNotifyObserverSet.new
45+
end
2746
end
2847
end

lib/concurrent/simple_actor_ref.rb

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@ module Concurrent
88
class SimpleActorRef
99
include ActorRef
1010

11-
def initialize(actor)
11+
def initialize(actor, opts = {})
1212
@actor = actor
1313
@mutex = Mutex.new
1414
@queue = Queue.new
1515
@thread = nil
1616
@stopped = false
17+
@abort_on_exception = opts.fetch(:abort_on_exception, true)
18+
@reset_on_error = opts.fetch(:reset_on_error, true)
19+
@exception_class = opts.fetch(:rescue_exception, false) ? Exception : StandardError
20+
@observers = CopyOnNotifyObserverSet.new
1721
end
1822

1923
def running?
@@ -72,7 +76,7 @@ def supervise
7276

7377
def new_worker_thread
7478
Thread.new do
75-
Thread.current.abort_on_exception = true
79+
Thread.current.abort_on_exception = @abort_on_exception
7680
run_message_loop
7781
end
7882
end
@@ -84,11 +88,13 @@ def run_message_loop
8488

8589
begin
8690
result = @actor.receive(*message.payload)
87-
rescue => ex
88-
# suppress
91+
rescue @exception_class => ex
92+
@actor.on_reset if @reset_on_error
8993
ensure
94+
now = Time.now
9095
message.ivar.complete(ex.nil?, result, ex)
91-
message.callback.call(Time.now, result, ex) if message.callback
96+
message.callback.call(now, result, ex) if message.callback
97+
observers.notify_observers(now, message.payload, result, ex)
9298
end
9399
end
94100
end

spec/concurrent/actor_ref_shared.rb

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,19 @@
33
def shared_actor_test_class
44
Class.new do
55
include Concurrent::ActorContext
6+
67
attr_reader :argv
8+
79
def initialize(*args)
810
@argv = args
911
end
12+
1013
def receive(*msg)
1114
case msg.first
1215
when :poison
1316
raise StandardError
17+
when :bullet
18+
raise Exception
1419
when :terminate
1520
Thread.current.kill
1621
when :sleep
@@ -163,4 +168,57 @@ def receive(*msg)
163168
}.to raise_error(StandardError)
164169
end
165170
end
171+
172+
context 'observation' do
173+
174+
let(:observer_class) do
175+
Class.new do
176+
attr_reader :time, :msg, :value, :reason
177+
def update(time, msg, value, reason)
178+
@msg = msg
179+
@time = time
180+
@value = value
181+
@reason = reason
182+
end
183+
end
184+
end
185+
186+
it 'notifies observers' do
187+
o1 = observer_class.new
188+
o2 = observer_class.new
189+
190+
subject.add_observer(o1)
191+
subject.add_observer(o2)
192+
193+
subject << :foo
194+
sleep(0.1)
195+
196+
o1.value.should eq :foo
197+
o1.reason.should be_nil
198+
199+
o2.value.should eq :foo
200+
o2.reason.should be_nil
201+
end
202+
203+
it 'does not notify removed observers' do
204+
o1 = observer_class.new
205+
o2 = observer_class.new
206+
207+
subject.add_observer(o1)
208+
subject.add_observer(o2)
209+
210+
subject << :foo
211+
sleep(0.1)
212+
213+
subject.delete_observer(o1)
214+
subject << :bar
215+
sleep(0.1)
216+
o1.value.should_not eq :bar
217+
218+
subject.delete_observers
219+
subject << :baz
220+
sleep(0.1)
221+
o1.value.should_not eq :baz
222+
end
223+
end
166224
end

spec/concurrent/simple_actor_ref_spec.rb

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,18 @@ module Concurrent
2424
actor.argv.should be_empty
2525
end
2626

27-
it 'passes all :args option to the constructor' do
27+
it 'passes all :args option to the actor constructor' do
2828
subject = shared_actor_test_class.spawn(args: [1, 2, 3, 4])
2929
actor = subject.instance_variable_get(:@actor)
3030
actor.argv.should eq [1, 2, 3, 4]
3131
end
32+
33+
it 'passes the options hash to the ActorRef constructor' do
34+
subject # prevent the after(:all) block from breaking this test
35+
opts = {foo: :bar, hello: :world}
36+
described_class.should_receive(:new).once.with(anything, opts)
37+
shared_actor_test_class.spawn(opts)
38+
end
3239
end
3340

3441
context 'supervision' do
@@ -82,7 +89,7 @@ module Concurrent
8289
subject << :foo
8390
end
8491

85-
it 'calls #on_reset when the thread is reseted' do
92+
it 'calls #on_reset when the thread is started after the first time' do
8693
actor = subject.instance_variable_get(:@actor)
8794
actor.should_receive(:on_reset).once.with(no_args)
8895
subject << :terminate
@@ -91,6 +98,112 @@ module Concurrent
9198
end
9299
end
93100

101+
context 'abort_on_exception' do
102+
103+
after(:each) do
104+
@ref.shutdown if @ref
105+
end
106+
107+
it 'gets set on the actor thread' do
108+
@ref = shared_actor_test_class.spawn(abort_on_exception: true)
109+
@ref << :foo
110+
sleep(0.1)
111+
@ref.instance_variable_get(:@thread).abort_on_exception.should be_true
112+
113+
@ref = shared_actor_test_class.spawn(abort_on_exception: false)
114+
@ref << :foo
115+
sleep(0.1)
116+
@ref.instance_variable_get(:@thread).abort_on_exception.should be_false
117+
end
118+
119+
it 'defaults to true' do
120+
@ref = shared_actor_test_class.spawn
121+
@ref << :foo
122+
sleep(0.1)
123+
@ref.instance_variable_get(:@thread).abort_on_exception.should be_true
124+
end
125+
end
126+
127+
context 'reset_on_error' do
128+
129+
after(:each) do
130+
@ref.shutdown if @ref
131+
end
132+
133+
it 'causes #on_reset to be called on exception when true' do
134+
@ref = shared_actor_test_class.spawn(reset_on_error: true)
135+
actor = @ref.instance_variable_get(:@actor)
136+
actor.should_receive(:on_reset).once.with(no_args)
137+
@ref << :poison
138+
sleep(0.1)
139+
end
140+
141+
it 'prevents #on_reset form being called on exception when false' do
142+
@ref = shared_actor_test_class.spawn(reset_on_error: false)
143+
actor = @ref.instance_variable_get(:@actor)
144+
actor.should_not_receive(:on_reset).with(any_args)
145+
@ref << :poison
146+
sleep(0.1)
147+
end
148+
149+
it 'defaults to true' do
150+
@ref = shared_actor_test_class.spawn
151+
actor = @ref.instance_variable_get(:@actor)
152+
actor.should_receive(:on_reset).once.with(no_args)
153+
@ref << :poison
154+
sleep(0.1)
155+
end
156+
end
157+
158+
context 'rescue_exception' do
159+
160+
after(:each) do
161+
@ref.shutdown if @ref
162+
end
163+
164+
it 'rescues Exception in the actor thread when true' do
165+
@ref = shared_actor_test_class.spawn(
166+
abort_on_exception: false,
167+
rescue_exception: true
168+
)
169+
170+
ivar = @ref.post(:poison)
171+
sleep(0.1)
172+
ivar.reason.should be_a StandardError
173+
174+
ivar = @ref.post(:bullet)
175+
sleep(0.1)
176+
ivar.reason.should be_a Exception
177+
end
178+
179+
it 'rescues StandardError in the actor thread when false' do
180+
@ref = shared_actor_test_class.spawn(
181+
abort_on_exception: false,
182+
rescue_exception: false
183+
)
184+
185+
ivar = @ref.post(:poison)
186+
sleep(0.1)
187+
ivar.reason.should be_a StandardError
188+
189+
ivar = @ref.post(:bullet)
190+
sleep(0.1)
191+
ivar.reason.should be_nil
192+
end
193+
194+
it 'defaults to false' do
195+
@ref = shared_actor_test_class.spawn(abort_on_exception: false)
196+
197+
ivar = @ref.post(:poison)
198+
sleep(0.1)
199+
ivar.reason.should be_a StandardError
200+
201+
ivar = @ref.post(:bullet)
202+
sleep(0.1)
203+
ivar.reason.should be_nil
204+
end
205+
end
206+
94207
context '#shutdown' do
95208

96209
it 'calls #on_shutdown when shutdown' do

0 commit comments

Comments
 (0)