File tree Expand file tree Collapse file tree 3 files changed +20
-3
lines changed
lib/concurrent/actor/utils Expand file tree Collapse file tree 3 files changed +20
-3
lines changed Original file line number Diff line number Diff line change @@ -25,14 +25,15 @@ def on_message(message)
25
25
when :subscribed?
26
26
@receivers . include? envelope . sender
27
27
else
28
- @buffer << message
28
+ @buffer << envelope
29
29
distribute
30
+ Behaviour ::MESSAGE_PROCESSED
30
31
end
31
32
end
32
33
33
34
def distribute
34
35
while !@receivers . empty? && !@buffer . empty?
35
- @receivers . shift << @buffer . shift
36
+ redirect @receivers . shift , @buffer . shift
36
37
end
37
38
end
38
39
end
Original file line number Diff line number Diff line change @@ -34,7 +34,7 @@ def initialize(size, &worker_initializer)
34
34
end
35
35
36
36
def on_message ( message )
37
- @balancer << message
37
+ redirect @balancer
38
38
end
39
39
end
40
40
Original file line number Diff line number Diff line change @@ -377,6 +377,22 @@ def on_message(message)
377
377
378
378
end
379
379
380
+ describe 'pool' do
381
+ it 'supports asks' do
382
+ worker = Class . new Concurrent ::Actor ::Utils ::AbstractWorker do
383
+ def work ( message )
384
+ 5 + message
385
+ end
386
+ end
387
+
388
+ pool = Concurrent ::Actor ::Utils ::Pool . spawn! 'pool' , 5 do |balancer , index |
389
+ worker . spawn name : "worker-#{ index } " , supervise : true , args : [ balancer ]
390
+ end
391
+
392
+ expect ( pool . ask! ( 5 ) ) . to eq 10
393
+ terminate_actors pool
394
+ end
395
+ end
380
396
381
397
end
382
398
end
You can’t perform that action at this time.
0 commit comments