Skip to content

Commit ced9823

Browse files
committed
actor.ask(:terminate!) is only completed after all its children terminate
1 parent cea34a8 commit ced9823

File tree

6 files changed

+36
-45
lines changed

6 files changed

+36
-45
lines changed

lib/concurrent/actor/behaviour.rb

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,6 @@ module Actor
3737
#
3838
# > {include:Actor::Behaviour::Termination}
3939
#
40-
# - {Behaviour::TerminatesChildren}:
41-
#
42-
# > {include:Actor::Behaviour::TerminatesChildren}
43-
#
4440
# - {Behaviour::RemovesChild}:
4541
#
4642
# > {include:Actor::Behaviour::RemovesChild}
@@ -66,14 +62,12 @@ module Behaviour
6662
require 'concurrent/actor/behaviour/sets_results'
6763
require 'concurrent/actor/behaviour/supervising'
6864
require 'concurrent/actor/behaviour/termination'
69-
require 'concurrent/actor/behaviour/terminates_children'
7065

7166
# Array of behaviours and their construction parameters.
7267
#
7368
# [[Behaviour::SetResults, :terminate!],
7469
# [Behaviour::RemovesChild],
7570
# [Behaviour::Termination],
76-
# [Behaviour::TerminatesChildren],
7771
# [Behaviour::Linking],
7872
# [Behaviour::Awaits],
7973
# [Behaviour::ExecutesContext],
@@ -91,7 +85,6 @@ def self.basic_behaviour_definition
9185
# [[Behaviour::SetResults, :pause!],
9286
# [Behaviour::RemovesChild],
9387
# [Behaviour::Termination],
94-
# [Behaviour::TerminatesChildren],
9588
# [Behaviour::Linking],
9689
# [Behaviour::Pausing],
9790
# [Behaviour::Supervising, :reset!, :one_for_one],
@@ -113,8 +106,7 @@ def self.base(on_error)
113106
[[SetResults, on_error],
114107
# has to be before Termination to be able to remove children from terminated actor
115108
RemovesChild,
116-
Termination,
117-
TerminatesChildren]
109+
Termination]
118110
end
119111

120112
# @see '' its source code

lib/concurrent/actor/behaviour/terminates_children.rb

Lines changed: 0 additions & 14 deletions
This file was deleted.

lib/concurrent/actor/behaviour/termination.rb

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,22 @@ module Concurrent
22
module Actor
33
module Behaviour
44

5-
# Handles actor termination.
5+
# Handles actor termination. Waits until all its children are terminated,
6+
# can be configured on behaviour initialization.
67
# @note Actor rejects envelopes when terminated.
78
# @note TODO missing example
89
class Termination < Abstract
910

1011
# @!attribute [r] terminated
1112
# @return [Edge::Event] event which will become set when actor is terminated.
12-
# @!attribute [r] reason
13-
attr_reader :terminated, :reason
13+
attr_reader :terminated
1414

15-
def initialize(core, subsequent, core_options, trapping = false)
15+
def initialize(core, subsequent, core_options, trapping = false, terminate_children = true)
1616
super core, subsequent, core_options
17-
@terminated = Concurrent.event
18-
@public_terminated = @terminated.hide_completable
19-
@reason = nil
20-
@trapping = trapping
17+
@terminated = Concurrent.future
18+
@public_terminated = @terminated.hide_completable
19+
@trapping = trapping
20+
@terminate_children = terminate_children
2121
end
2222

2323
# @note Actor rejects envelopes when terminated.
@@ -43,7 +43,7 @@ def on_envelope(envelope)
4343
if trapping? && reason != :kill
4444
pass envelope
4545
else
46-
terminate! reason
46+
terminate! reason, envelope
4747
end
4848
when :termination_event
4949
@public_terminated
@@ -59,14 +59,23 @@ def on_envelope(envelope)
5959

6060
# Terminates the actor. Any Envelope received after termination is rejected.
6161
# Terminates all its children, does not wait until they are terminated.
62-
def terminate!(reason = :normal)
63-
# TODO return after all children are terminated
62+
def terminate!(reason = nil, envelope = nil)
6463
return true if terminated?
65-
@reason = reason
66-
terminated.complete
64+
65+
self_termination = Concurrent.completed_future(reason.nil?, reason.nil? || nil, reason)
66+
all_terminations = if @terminate_children
67+
Concurrent.zip(*children.map { |ch| ch.ask(:terminate!) }, self_termination)
68+
else
69+
self_termination
70+
end
71+
72+
all_terminations.chain_completable(@terminated)
73+
all_terminations.chain_completable(envelope.future) if envelope && envelope.future
74+
6775
broadcast(true, [:terminated, reason]) # TODO do not end up in Dead Letter Router
6876
parent << :remove_child if parent
69-
true
77+
78+
MESSAGE_PROCESSED
7079
end
7180
end
7281
end

lib/concurrent/actor/internal_delegations.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ def terminated?
1818
behaviour!(Behaviour::Termination).terminated?
1919
end
2020

21-
# @see Termination#reason
22-
def reason
23-
behaviour!(Behaviour::Termination).reason
24-
end
21+
# # @see Termination#reason
22+
# def reason
23+
# behaviour!(Behaviour::Termination).reason
24+
# end
2525

2626
# delegates to core.log
2727
# @see Logging#log

lib/concurrent/actor/utils/ad_hoc.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ def on_message(message)
1818
# # this block has to return proc defining #on_message behaviour
1919
# -> message { where.tell message }
2020
# end
21-
# @note TODO remove in favor of the module
2221
class AdHoc < Context
2322
include AsAdHoc
2423
end

spec/concurrent/actor_spec.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ def on_message(message)
153153
expect(subject.ask!(:terminated?)).to be_falsey
154154
subject.ask(:terminate!).wait
155155
expect(subject.ask!(:terminated?)).to be_truthy
156-
child.ask!(:termination_event).wait
157156
expect(child.ask!(:terminated?)).to be_truthy
158157

159158
terminate_actors subject, child
@@ -315,18 +314,24 @@ def on_message(message)
315314

316315
describe 'pool' do
317316
it 'supports asks' do
318-
pool = Concurrent::Actor::Utils::Pool.spawn! 'pool', 5 do |index|
319-
Concurrent::Actor::Utils::AdHoc.spawn name: "worker-#{index}", supervised: true do
317+
children = Queue.new
318+
pool = Concurrent::Actor::Utils::Pool.spawn! 'pool', 5 do |index|
319+
worker = Concurrent::Actor::Utils::AdHoc.spawn name: "worker-#{index}", supervised: true do
320320
lambda do |message|
321321
fail if message == :fail
322322
5 + message
323323
end
324324
end
325+
children.push worker
326+
worker
325327
end
326328

327329
10.times { expect(pool.ask!(5)).to eq 10 }
328330
expect(pool.ask(:fail).reason).to be_kind_of RuntimeError
329331
expect(pool.ask!(5)).to eq 10
332+
expect(pool.ask!(:terminate!)).to be_truthy
333+
5.times { expect(children.pop.ask!(:terminated?)).to be_truthy }
334+
330335
terminate_actors pool
331336
end
332337
end

0 commit comments

Comments
 (0)