Skip to content

Commit 2336b6f

Browse files
committed
Merge pull request #49 from jdantonio/channels
First channel implementation
2 parents 0592010 + e78e7d7 commit 2336b6f

14 files changed

+950
-0
lines changed

lib/concurrent.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@
3232
require 'concurrent/tvar'
3333
require 'concurrent/utilities'
3434

35+
require 'concurrent/channel/probe'
36+
require 'concurrent/channel/channel'
37+
require 'concurrent/channel/unbuffered_channel'
38+
require 'concurrent/channel/buffered_channel'
39+
require 'concurrent/channel/ring_buffer'
40+
require 'concurrent/channel/blocking_ring_buffer'
41+
3542
require 'concurrent/actor_context'
3643
require 'concurrent/simple_actor_ref'
3744

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
module Concurrent
2+
class BlockingRingBuffer
3+
4+
def initialize(capacity)
5+
@buffer = RingBuffer.new(capacity)
6+
@first = @last = 0
7+
@count = 0
8+
@mutex = Mutex.new
9+
@condition = Condition.new
10+
end
11+
12+
def capacity
13+
@mutex.synchronize { @buffer.capacity }
14+
end
15+
16+
def count
17+
@mutex.synchronize { @buffer.count }
18+
end
19+
20+
def full?
21+
@mutex.synchronize { @buffer.full? }
22+
end
23+
24+
def empty?
25+
@mutex.synchronize { @buffer.empty? }
26+
end
27+
28+
def put(value)
29+
@mutex.synchronize do
30+
wait_while_full
31+
@buffer.offer(value)
32+
@condition.signal
33+
end
34+
end
35+
36+
def take
37+
@mutex.synchronize do
38+
wait_while_empty
39+
result = @buffer.poll
40+
@condition.signal
41+
result
42+
end
43+
end
44+
45+
def peek
46+
@mutex.synchronize { @buffer.peek }
47+
end
48+
49+
private
50+
51+
def wait_while_full
52+
@condition.wait(@mutex) while @buffer.full?
53+
end
54+
55+
def wait_while_empty
56+
@condition.wait(@mutex) while @buffer.empty?
57+
end
58+
59+
end
60+
end
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
require_relative 'waitable_list'
2+
3+
module Concurrent
4+
class BufferedChannel
5+
6+
def initialize(size)
7+
@mutex = Mutex.new
8+
@condition = Condition.new
9+
@buffer_condition = Condition.new
10+
11+
@probe_set = WaitableList.new
12+
@buffer = RingBuffer.new(size)
13+
end
14+
15+
def probe_set_size
16+
@probe_set.size
17+
end
18+
19+
def buffer_queue_size
20+
@mutex.synchronize { @buffer.count }
21+
end
22+
23+
def push(value)
24+
until set_probe_or_push_into_buffer(value)
25+
end
26+
end
27+
28+
def pop
29+
probe = Probe.new
30+
select(probe)
31+
probe.value
32+
end
33+
34+
def select(probe)
35+
@mutex.synchronize do
36+
37+
if @buffer.empty?
38+
@probe_set.put(probe)
39+
true
40+
else
41+
shift_buffer if probe.set_unless_assigned peek_buffer
42+
end
43+
44+
end
45+
end
46+
47+
def remove_probe(probe)
48+
@probe_set.delete(probe)
49+
end
50+
51+
private
52+
53+
def push_into_buffer(value)
54+
@buffer_condition.wait(@mutex) while @buffer.full?
55+
@buffer.offer value
56+
@buffer_condition.broadcast
57+
end
58+
59+
def peek_buffer
60+
@buffer_condition.wait(@mutex) while @buffer.empty?
61+
@buffer.peek
62+
end
63+
64+
def shift_buffer
65+
@buffer_condition.wait(@mutex) while @buffer.empty?
66+
result = @buffer.poll
67+
@buffer_condition.broadcast
68+
result
69+
end
70+
71+
def set_probe_or_push_into_buffer(value)
72+
@mutex.synchronize do
73+
if @probe_set.empty?
74+
push_into_buffer(value)
75+
true
76+
else
77+
@probe_set.take.set_unless_assigned(value)
78+
end
79+
end
80+
end
81+
82+
end
83+
end

lib/concurrent/channel/channel.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
module Concurrent
2+
class Channel
3+
def self.select(*channels)
4+
probe = Probe.new
5+
channels.each { |channel| channel.select(probe) }
6+
result = probe.value
7+
channels.each { |channel| channel.remove_probe(probe) }
8+
result
9+
end
10+
end
11+
end

lib/concurrent/channel/probe.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
module Concurrent
2+
class Probe < IVar
3+
4+
def initialize(value = NO_VALUE, opts = {})
5+
super(value, opts)
6+
end
7+
8+
def set_unless_assigned(value)
9+
mutex.synchronize do
10+
return false if [:fulfilled, :rejected].include? @state
11+
12+
set_state(true, value, nil)
13+
event.set
14+
true
15+
end
16+
17+
end
18+
end
19+
end

lib/concurrent/channel/ring_buffer.rb

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
module Concurrent
2+
3+
# not thread safe buffer
4+
class RingBuffer
5+
6+
def initialize(capacity)
7+
@buffer = Array.new(capacity)
8+
@first = @last = 0
9+
@count = 0
10+
end
11+
12+
def capacity
13+
@buffer.size
14+
end
15+
16+
def count
17+
@count
18+
end
19+
20+
def empty?
21+
@count == 0
22+
end
23+
24+
def full?
25+
@count == capacity
26+
end
27+
28+
# @param [Object] value
29+
# @return [Boolean] true if value has been inserted, false otherwise
30+
def offer(value)
31+
return false if full?
32+
33+
@buffer[@last] = value
34+
@last = (@last + 1) % @buffer.size
35+
@count += 1
36+
true
37+
end
38+
39+
# @return [Object] the first available value and removes it from the buffer. If buffer is empty returns nil
40+
def poll
41+
result = @buffer[@first]
42+
@buffer[@first] = nil
43+
@first = (@first + 1) % @buffer.size
44+
@count -= 1
45+
result
46+
end
47+
48+
# @return [Object] the first available value and without removing it from the buffer. If buffer is empty returns nil
49+
def peek
50+
@buffer[@first]
51+
end
52+
53+
end
54+
end
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
require_relative 'waitable_list'
2+
3+
module Concurrent
4+
class UnbufferedChannel
5+
6+
def initialize
7+
@probe_set = WaitableList.new
8+
end
9+
10+
def probe_set_size
11+
@probe_set.size
12+
end
13+
14+
def push(value)
15+
until @probe_set.take.set_unless_assigned(value)
16+
end
17+
end
18+
19+
def pop
20+
probe = Probe.new
21+
select(probe)
22+
probe.value
23+
end
24+
25+
def select(probe)
26+
@probe_set.put(probe)
27+
end
28+
29+
def remove_probe(probe)
30+
@probe_set.delete(probe)
31+
end
32+
33+
end
34+
end
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
module Concurrent
2+
class WaitableList
3+
4+
def initialize
5+
@mutex = Mutex.new
6+
@condition = Condition.new
7+
8+
@list = []
9+
end
10+
11+
def size
12+
@mutex.synchronize { @list.size }
13+
end
14+
15+
def empty?
16+
@mutex.synchronize { @list.empty? }
17+
end
18+
19+
def put(value)
20+
@mutex.synchronize do
21+
@list << value
22+
@condition.signal
23+
end
24+
end
25+
26+
def delete(value)
27+
@mutex.synchronize { @list.delete(value) }
28+
end
29+
30+
def take
31+
@mutex.synchronize do
32+
@condition.wait(@mutex) while @list.empty?
33+
@list.shift
34+
end
35+
end
36+
37+
end
38+
end

0 commit comments

Comments
 (0)