Skip to content

Commit 301719a

Browse files
committed
Add Pool
1 parent f409578 commit 301719a

File tree

4 files changed

+104
-3
lines changed

4 files changed

+104
-3
lines changed

lib/concurrent/actor/utils.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
module Concurrent
22
module Actor
33
module Utils
4-
require 'concurrent/actor/utils/broadcast'
54
require 'concurrent/actor/utils/ad_hoc'
5+
require 'concurrent/actor/utils/broadcast'
6+
require 'concurrent/actor/utils/balancer'
7+
require 'concurrent/actor/utils/pool'
68
end
79
end
810
end
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
module Concurrent
2+
module Actor
3+
module Utils
4+
5+
# Distributes messages between subscribed actors. Each actor'll get only one message then
6+
# it's unsubscribed. The actor needs to resubscribe when it's ready to receive next message.
7+
# @see Pool
8+
class Balancer < RestartingContext
9+
10+
def initialize
11+
@receivers = []
12+
@buffer = []
13+
end
14+
15+
def on_message(message)
16+
case message
17+
when :subscribe
18+
@receivers << envelope.sender
19+
distribute
20+
true
21+
when :unsubscribe
22+
@receivers.delete envelope.sender
23+
true
24+
when :subscribed?
25+
@receivers.include? envelope.sender
26+
else
27+
@buffer << message
28+
distribute
29+
end
30+
end
31+
32+
def distribute
33+
while !@receivers.empty? && !@buffer.empty?
34+
@receivers.shift << @buffer.shift
35+
end
36+
end
37+
end
38+
end
39+
end
40+
end

lib/concurrent/actor/utils/broadcast.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ module Utils
77
# Allows to build pub/sub easily.
88
# @example news
99
# news_channel = Concurrent::Actor::Utils::Broadcast.spawn :news
10-
10+
#
1111
# 2.times do |i|
1212
# Concurrent::Actor::Utils::AdHoc.spawn "listener-#{i}" do
1313
# news_channel << :subscribe
@@ -17,7 +17,7 @@ module Utils
1717
#
1818
# news_channel << 'Ruby rocks!'
1919
# # prints: 'Ruby rocks!' twice
20-
class Broadcast < Context
20+
class Broadcast < RestartingContext
2121

2222
def initialize
2323
@receivers = Set.new

lib/concurrent/actor/utils/pool.rb

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
require 'concurrent/actor/utils/balancer'
2+
3+
module Concurrent
4+
module Actor
5+
module Utils
6+
7+
# Allows to create a pool of workers and distribute work between them
8+
# @param [Integer] size number of workers
9+
# @yield [balancer, index] a block spawning an worker instance. called +size+ times.
10+
# The worker should be descendant of AbstractWorker and supervised, see example.
11+
# @yieldparam [Balancer] balancer to pass to the worker
12+
# @yieldparam [Integer] index of the worker, usually used in its name
13+
# @yieldreturn [Reference] the reference of newly created worker
14+
# @example
15+
# class Worker < Concurrent::Actor::Utils::AbstractWorker
16+
# def work(message)
17+
# p message * 5
18+
# end
19+
# end
20+
#
21+
# pool = Concurrent::Actor::Utils::Pool.spawn! 'pool', 5 do |balancer, index|
22+
# Worker.spawn name: "worker-#{index}", supervise: true, args: [balancer]
23+
# end
24+
#
25+
# pool << 'asd' << 2
26+
# # prints:
27+
# # "asdasdasdasdasd"
28+
# # 10
29+
class Pool < RestartingContext
30+
def initialize(size, &worker_initializer)
31+
@balancer = Balancer.spawn name: :balancer, supervise: true
32+
@workers = Array.new(size, &worker_initializer.curry[@balancer])
33+
@workers.each { |w| Type! w, Reference }
34+
end
35+
36+
def on_message(message)
37+
@balancer << message
38+
end
39+
end
40+
41+
class AbstractWorker < RestartingContext
42+
def initialize(balancer)
43+
@balancer = balancer
44+
@balancer << :subscribe
45+
end
46+
47+
def on_message(message)
48+
work message
49+
ensure
50+
@balancer << :subscribe
51+
end
52+
53+
def work(message)
54+
raise NotImplementedError
55+
end
56+
end
57+
end
58+
end
59+
end

0 commit comments

Comments
 (0)