Skip to content

Commit 65a956a

Browse files
committed
Merge pull request #187 from pitr-ch/actor
Fix #185: allow Util::Pool to be asked
2 parents beba284 + c61f25a commit 65a956a

File tree

3 files changed

+20
-3
lines changed

3 files changed

+20
-3
lines changed

lib/concurrent/actor/utils/balancer.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ def on_message(message)
2525
when :subscribed?
2626
@receivers.include? envelope.sender
2727
else
28-
@buffer << message
28+
@buffer << envelope
2929
distribute
30+
Behaviour::MESSAGE_PROCESSED
3031
end
3132
end
3233

3334
def distribute
3435
while !@receivers.empty? && !@buffer.empty?
35-
@receivers.shift << @buffer.shift
36+
redirect @receivers.shift, @buffer.shift
3637
end
3738
end
3839
end

lib/concurrent/actor/utils/pool.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def initialize(size, &worker_initializer)
3434
end
3535

3636
def on_message(message)
37-
@balancer << message
37+
redirect @balancer
3838
end
3939
end
4040

spec/concurrent/actor_spec.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,22 @@ def on_message(message)
377377

378378
end
379379

380+
describe 'pool' do
381+
it 'supports asks' do
382+
worker = Class.new Concurrent::Actor::Utils::AbstractWorker do
383+
def work(message)
384+
5 + message
385+
end
386+
end
387+
388+
pool = Concurrent::Actor::Utils::Pool.spawn! 'pool', 5 do |balancer, index|
389+
worker.spawn name: "worker-#{index}", supervise: true, args: [balancer]
390+
end
391+
392+
expect(pool.ask!(5)).to eq 10
393+
terminate_actors pool
394+
end
395+
end
380396

381397
end
382398
end

0 commit comments

Comments
 (0)