Skip to content

Commit 469eaa3

Browse files
committed
Improve actor events
Events can be private and public, so far only difference is that Linking will pass to linked actors only public messages. Adding private :restarting and :resetting events which are send before the actor restarts or resets allowing to add callbacks to cleanup current child actors.
1 parent 3c18951 commit 469eaa3

File tree

9 files changed

+81
-44
lines changed

9 files changed

+81
-44
lines changed

lib/concurrent/actor/behaviour/abstract.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ def pass(envelope)
2525

2626
# override to add extra behaviour
2727
# @note super needs to be called not to break the chain
28-
def on_event(event)
29-
subsequent.on_event event if subsequent
28+
def on_event(public, event)
29+
subsequent.on_event public, event if subsequent
3030
end
3131

3232
# broadcasts event to all behaviours and context
3333
# @see #on_event
3434
# @see AbstractContext#on_event
35-
def broadcast(event)
36-
core.broadcast(event)
35+
def broadcast(public, event)
36+
core.broadcast(public, event)
3737
end
3838

3939
def reject_envelope(envelope)

lib/concurrent/actor/behaviour/buffer.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ def process_envelope
4040
core.schedule_execution { process_envelopes? }
4141
end
4242

43-
def on_event(event)
43+
def on_event(public, event)
4444
case event
4545
when :terminated, :restarted
4646
@buffer.each { |envelope| reject_envelope envelope }
4747
@buffer.clear
4848
end
49-
super event
49+
super public, event
5050
end
5151
end
5252
end

lib/concurrent/actor/behaviour/executes_context.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ def on_envelope(envelope)
77
context.on_envelope envelope
88
end
99

10-
def on_event(event)
10+
def on_event(public, event)
1111
context.on_event(event)
12-
core.log Logging::DEBUG, "event: #{event.inspect}"
13-
super event
12+
super public, event
1413
end
1514
end
1615
end

lib/concurrent/actor/behaviour/linking.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ module Actor
33
module Behaviour
44

55
# Links the actor to other actors and sends actor's events to them,
6-
# like: `:terminated`, `:paused`, errors, etc.
6+
# like: `:terminated`, `:paused`, `:resumed`, errors, etc.
7+
# Linked actor needs to handle those messages.
78
#
89
# listener = AdHoc.spawn name: :listener do
910
# lambda do |message|
@@ -65,10 +66,10 @@ def unlink(ref)
6566
true
6667
end
6768

68-
def on_event(event)
69-
@linked.each { |a| a << event }
69+
def on_event(public, event)
70+
@linked.each { |a| a << event } if public
7071
@linked.clear if event == :terminated
71-
super event
72+
super public, event
7273
end
7374
end
7475
end

lib/concurrent/actor/behaviour/pausing.rb

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,41 +35,77 @@ def on_envelope(envelope)
3535
end
3636

3737
def pause!(error = nil)
38-
@paused = true
39-
broadcast(error || :paused)
38+
do_pause
39+
broadcast true, error || :paused
4040
true
4141
end
4242

43-
def resume!(broadcast = true)
44-
@paused = false
45-
broadcast(:resumed) if broadcast
43+
def resume!
44+
do_resume
45+
broadcast(true, :resumed)
4646
true
4747
end
4848

49-
def reset!(broadcast = true)
50-
core.allocate_context
51-
core.build_context
52-
resume!(false)
53-
broadcast(:reset) if broadcast
49+
def reset!
50+
broadcast(false, :resetting)
51+
do_reset
52+
broadcast(true, :reset)
5453
true
5554
end
5655

5756
def restart!
58-
reset! false
59-
broadcast(:restarted)
57+
broadcast(false, :restarting)
58+
do_restart
59+
broadcast(true, :restarted)
6060
true
6161
end
6262

63-
def on_event(event)
64-
case event
65-
when :terminated, :restarted
66-
@buffer.each { |envelope| reject_envelope envelope }
67-
@buffer.clear
68-
when :resumed, :reset
69-
@buffer.each { |envelope| core.schedule_execution { core.process_envelope envelope } }
70-
@buffer.clear
71-
end
72-
super event
63+
def on_event(public, event)
64+
reject_buffer if event == :terminated
65+
super public, event
66+
end
67+
68+
private
69+
70+
def do_pause
71+
@paused = true
72+
nil
73+
end
74+
75+
def do_resume
76+
@paused = false
77+
reschedule_buffer
78+
nil
79+
end
80+
81+
def do_reset
82+
rebuild_context
83+
do_resume
84+
reschedule_buffer
85+
nil
86+
end
87+
88+
def do_restart
89+
rebuild_context
90+
reject_buffer
91+
do_resume
92+
nil
93+
end
94+
95+
def rebuild_context
96+
core.allocate_context
97+
core.build_context
98+
nil
99+
end
100+
101+
def reschedule_buffer
102+
@buffer.each { |envelope| core.schedule_execution { core.process_envelope envelope } }
103+
@buffer.clear
104+
end
105+
106+
def reject_buffer
107+
@buffer.each { |envelope| reject_envelope envelope }
108+
@buffer.clear
73109
end
74110
end
75111
end

lib/concurrent/actor/behaviour/supervised.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ def un_supervise(ref)
6868
end
6969
end
7070

71-
def on_event(event)
71+
def on_event(public, event)
7272
@supervisor = nil if event == :terminated
73-
super event
73+
super public, event
7474
end
7575
end
7676
end

lib/concurrent/actor/behaviour/terminates_children.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ module Actor
33
module Behaviour
44
# Terminates all children when the actor terminates.
55
class TerminatesChildren < Abstract
6-
def on_event(event)
6+
def on_event(public, event)
77
# TODO set event in Termination after all children are terminated, requires new non-blocking join on Future
88
children.map { |ch| ch << :terminate! } if event == :terminated
9-
super event
9+
super public, event
1010
end
1111
end
1212
end

lib/concurrent/actor/behaviour/termination.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def on_envelope(envelope)
2828
terminated?
2929
when :terminate!
3030
terminate!
31-
when :terminated_event
31+
when :terminated_event # TODO rename to :termination_event
3232
terminated
3333
else
3434
if terminated?
@@ -45,7 +45,7 @@ def on_envelope(envelope)
4545
def terminate!
4646
return true if terminated?
4747
terminated.set
48-
broadcast(:terminated) # TODO do not end up in Dead Letter Router
48+
broadcast(true, :terminated) # TODO do not end up in Dead Letter Router
4949
parent << :remove_child if parent
5050
true
5151
end

lib/concurrent/actor/core.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def initialize(opts = {}, &block)
8787
build_context
8888

8989
messages.each do |message|
90-
log DEBUG, "preprocessing #{message} from #{parent}"
90+
log DEBUG, "preprocessing #{message.inspect} from #{parent}"
9191
process_envelope Envelope.new(message, nil, parent, reference)
9292
end
9393

@@ -176,8 +176,9 @@ def schedule_execution
176176
nil
177177
end
178178

179-
def broadcast(event)
180-
@first_behaviour.on_event(event)
179+
def broadcast(public, event)
180+
log Logging::DEBUG, "event: #{event.inspect} (#{public ? 'public' : 'private'})"
181+
@first_behaviour.on_event(public, event)
181182
end
182183

183184
# @param [Class] behaviour_class

0 commit comments

Comments
 (0)