Skip to content

Commit 8339106

Browse files
committed
Rename Supervising to Supervised, Add Supervising
allows to set handler: :reset!, :resume!, :terminate! and strategies: :one_for_one, :one_for_all
1 parent 2a8e408 commit 8339106

File tree

9 files changed

+113
-65
lines changed

9 files changed

+113
-65
lines changed

lib/concurrent/actor/behaviour.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ module Behaviour
2727
require 'concurrent/actor/behaviour/pausing'
2828
require 'concurrent/actor/behaviour/removes_child'
2929
require 'concurrent/actor/behaviour/sets_results'
30+
require 'concurrent/actor/behaviour/supervised'
3031
require 'concurrent/actor/behaviour/supervising'
3132
require 'concurrent/actor/behaviour/termination'
3233
require 'concurrent/actor/behaviour/terminates_children'
@@ -38,7 +39,8 @@ def self.basic_behaviour_definition
3839

3940
def self.restarting_behaviour_definition
4041
[*base,
41-
*supervising,
42+
*supervised,
43+
[Behaviour::Supervising, [:reset!, :one_for_one]],
4244
*user_messages(:pause)]
4345
end
4446

@@ -51,8 +53,8 @@ def self.base
5153
[Linking, []]]
5254
end
5355

54-
def self.supervising
55-
[[Supervising, []],
56+
def self.supervised
57+
[[Supervised, []],
5658
[Pausing, []]]
5759
end
5860

lib/concurrent/actor/behaviour/pausing.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,16 @@ def pause!(error = nil)
4040
true
4141
end
4242

43-
def resume!
43+
def resume!(broadcast = true)
4444
@buffer.each { |envelope| core.schedule_execution { pass envelope } }
4545
@buffer.clear
4646
@paused = false
47-
broadcast(:resumed)
47+
broadcast(:resumed) if broadcast
4848
true
4949
end
5050

5151
def from_supervisor?(envelope)
52-
if behaviour!(Supervising).supervisor == envelope.sender
52+
if behaviour!(Supervised).supervisor == envelope.sender
5353
yield
5454
else
5555
false
@@ -59,8 +59,8 @@ def from_supervisor?(envelope)
5959
def reset!
6060
core.allocate_context
6161
core.build_context
62+
resume!(false)
6263
broadcast(:reset)
63-
resume!
6464
true
6565
end
6666

lib/concurrent/actor/behaviour/sets_results.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ def on_envelope(envelope)
2323
terminate!
2424
when :pause
2525
behaviour!(Pausing).pause!(error)
26+
when :just_log
27+
# nothing
2628
else
2729
raise
2830
end
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
module Concurrent
2+
module Actor
3+
module Behaviour
4+
5+
# Sets nad holds the supervisor of the actor if any. There is only one or none supervisor
6+
# for each actor. Each supervisor is automatically linked.
7+
class Supervised < Abstract
8+
attr_reader :supervisor
9+
10+
def initialize(core, subsequent)
11+
super core, subsequent
12+
@supervisor = nil
13+
end
14+
15+
def on_envelope(envelope)
16+
case envelope.message
17+
when :supervise
18+
supervise envelope.sender
19+
when :supervisor
20+
supervisor
21+
when :un_supervise
22+
un_supervise envelope.sender
23+
else
24+
pass envelope
25+
end
26+
end
27+
28+
def supervise(ref)
29+
@supervisor = ref
30+
behaviour!(Linking).link ref
31+
true
32+
end
33+
34+
def un_supervise(ref)
35+
if @supervisor == ref
36+
behaviour!(Linking).unlink ref
37+
@supervisor = nil
38+
true
39+
else
40+
false
41+
end
42+
end
43+
44+
def on_event(event)
45+
@supervisor = nil if event == :terminated
46+
super event
47+
end
48+
end
49+
end
50+
end
51+
end

lib/concurrent/actor/behaviour/supervising.rb

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,33 @@
11
module Concurrent
22
module Actor
33
module Behaviour
4-
5-
# Sets nad holds the supervisor of the actor if any. There is only one or none supervisor
6-
# for each actor. Each supervisor is automatically linked.
74
class Supervising < Abstract
8-
attr_reader :supervisor
9-
10-
def initialize(core, subsequent)
5+
def initialize(core, subsequent, handle, strategy)
116
super core, subsequent
12-
@supervisor = nil
7+
@handle = Match! handle, :terminate!, :resume!, :reset!, :restart!
8+
@strategy = case @handle
9+
when :terminate!
10+
Match! strategy, nil
11+
when :resume!
12+
Match! strategy, :one_for_one
13+
when :reset!, :restart!
14+
Match! strategy, :one_for_one, :one_for_all
15+
end
1316
end
1417

1518
def on_envelope(envelope)
1619
case envelope.message
17-
when :supervise
18-
supervise envelope.sender
19-
when :supervisor
20-
supervisor
21-
when :un_supervise
22-
un_supervise envelope.sender
20+
when Exception, :paused
21+
receivers = if @strategy == :one_for_all
22+
children
23+
else
24+
[envelope.sender]
25+
end
26+
receivers.each { |ch| ch << @handle }
2327
else
2428
pass envelope
2529
end
2630
end
27-
28-
def supervise(ref)
29-
@supervisor = ref
30-
behaviour!(Linking).link ref
31-
true
32-
end
33-
34-
def un_supervise(ref)
35-
if @supervisor == ref
36-
behaviour!(Linking).unlink ref
37-
@supervisor = nil
38-
true
39-
else
40-
false
41-
end
42-
end
43-
44-
def on_event(event)
45-
@supervisor = nil if event == :terminated
46-
super event
47-
end
4831
end
4932
end
5033
end

lib/concurrent/actor/core.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def initialize(opts = {}, &block)
5050
@serialized_execution = SerializedExecution.new
5151
@executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor
5252
@children = Set.new
53-
@context_class = Child! opts.fetch(:class), Context
53+
@context_class = Child! opts.fetch(:class), AbstractContext
5454
allocate_context
5555
@reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
5656
@name = (Type! opts.fetch(:name), String, Symbol).to_s

lib/concurrent/actor/default_dead_letter_handler.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module Concurrent
22
module Actor
3-
class DefaultDeadLetterHandler < Context
3+
class DefaultDeadLetterHandler < RestartingContext
44
def on_message(dead_letter)
55
log Logging::INFO, "got dead letter #{dead_letter.inspect}"
66
end

lib/concurrent/actor/root.rb

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
module Concurrent
22
module Actor
33
# implements the root actor
4-
class Root < Context
4+
class Root < AbstractContext
55

66
def initialize
7-
@dead_letter_router = Core.new(parent: reference,
8-
class: DefaultDeadLetterHandler,
9-
name: :default_dead_letter_handler).reference
7+
# noinspection RubyArgCount
8+
@dead_letter_router = Core.new(parent: reference,
9+
class: DefaultDeadLetterHandler,
10+
supervise: true,
11+
name: :default_dead_letter_handler).reference
1012
end
1113

1214
# to allow spawning of new actors, spawn needs to be called inside the parent Actor
@@ -21,6 +23,12 @@ def on_message(message)
2123
def dead_letter_routing
2224
@dead_letter_router
2325
end
26+
27+
def behaviour_definition
28+
[*Behaviour.base,
29+
[Behaviour::Supervising, [:reset!, :one_for_one]],
30+
*Behaviour.user_messages(:just_log)]
31+
end
2432
end
2533
end
2634
end

spec/concurrent/actor_spec.rb

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,16 @@ def on_message(message)
277277

278278
describe 'pausing' do
279279
it 'pauses on error' do
280-
queue = Queue.new
281-
test = AdHoc.spawn :tester do
280+
queue = Queue.new
281+
resuming_behaviour = Behaviour.restarting_behaviour_definition.map do |c, args|
282+
if Behaviour::Supervising == c
283+
[c, [:resume!, :one_for_one]]
284+
else
285+
[c, args]
286+
end
287+
end
288+
289+
test = AdHoc.spawn name: :tester, behaviour_definition: resuming_behaviour do
282290
actor = AdHoc.spawn name: :pausing,
283291
behaviour_definition: Behaviour.restarting_behaviour_definition do
284292
queue << :init
@@ -288,50 +296,44 @@ def on_message(message)
288296
actor << :supervise
289297
queue << actor.ask!(:supervisor)
290298
actor << nil
299+
queue << actor.ask(:add)
291300

292301
-> m do
293302
queue << m
294-
if UnknownMessage === m
295-
ivar = envelope.sender.ask(:add)
296-
envelope.sender << :resume!
297-
queue << ivar.value!
298-
end
299303
end
300304
end
301305

302306
expect(queue.pop).to eq :init
303307
expect(queue.pop).to eq test
304-
expect(queue.pop).to be_kind_of(UnknownMessage)
305-
expect(queue.pop).to eq 1
308+
expect(queue.pop.value).to eq 1
306309
expect(queue.pop).to eq :resumed
307310
terminate_actors test
308311

309-
test = AdHoc.spawn :tester do
312+
test = AdHoc.spawn name: :tester,
313+
behaviour_definition: Behaviour.restarting_behaviour_definition do
310314
actor = AdHoc.spawn name: :pausing,
311315
supervise: true,
312316
behaviour_definition: Behaviour.restarting_behaviour_definition do
313317
queue << :init
314-
-> m { m == :add ? 1 : pass }
318+
-> m { m == :object_id ? self.object_id : pass }
315319
end
316320

317321
queue << actor.ask!(:supervisor)
322+
queue << actor.ask!(:object_id)
318323
actor << nil
324+
queue << actor.ask(:object_id)
319325

320326
-> m do
321327
queue << m
322-
if UnknownMessage === m
323-
ivar = envelope.sender.ask(:add)
324-
envelope.sender << :reset!
325-
queue << ivar.value!
326-
end
327328
end
328329
end
329330

330331
expect(queue.pop).to eq :init
331332
expect(queue.pop).to eq test
332-
expect(queue.pop).to be_kind_of(UnknownMessage)
333+
first_id = queue.pop
334+
second_id = queue.pop.value
335+
expect(first_id).not_to eq second_id # context already reset
333336
expect(queue.pop).to eq :init # rebuilds context
334-
expect(queue.pop).to eq 1
335337
expect(queue.pop).to eq :reset
336338
terminate_actors test
337339
end

0 commit comments

Comments
 (0)