Skip to content

Commit 9daf923

Browse files
committed
Actor: Fix restart loop when Pool is used
1 parent 4558131 commit 9daf923

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

lib/concurrent/actor/utils/pool.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,17 @@ def initialize(size, &worker_initializer)
3737
end
3838

3939
def on_message(message)
40+
command, *rest = message
41+
return if [:restarted, :reset, :resumed, :terminated].include? command # ignore events from supervised actors
42+
4043
envelope_to_redirect = if envelope.future
4144
envelope
4245
else
43-
Envelope.new(envelope.message, Concurrent.future, envelope.sender, envelope.sender)
46+
Envelope.new(envelope.message, Concurrent.future, envelope.sender, envelope.address)
4447
end
45-
envelope.future.on_completion! &lambda { |balancer, success, value, reason| balancer << :subscribe }.curry[@balancer]
48+
envelope_to_redirect.future.on_completion!(&lambda do |balancer, success, value, reason|
49+
balancer << :subscribe
50+
end.curry[@balancer])
4651
redirect @balancer, envelope_to_redirect
4752
end
4853
end

spec/concurrent/actor_spec.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,15 @@ def on_message(message)
317317
it 'supports asks' do
318318
pool = Concurrent::Actor::Utils::Pool.spawn! 'pool', 5 do |index|
319319
Concurrent::Actor::Utils::AdHoc.spawn name: "worker-#{index}", supervised: true do
320-
lambda { |message| 5 + message }
320+
lambda do |message|
321+
fail if message == :fail
322+
5 + message
323+
end
321324
end
322325
end
323326

327+
expect(pool.ask!(5)).to eq 10
328+
expect(pool.ask(:fail).reason).to be_kind_of RuntimeError
324329
expect(pool.ask!(5)).to eq 10
325330
terminate_actors pool
326331
end

0 commit comments

Comments
 (0)