Skip to content

Commit e944e27

Browse files
committed
added buffered channel
1 parent 3adcaa0 commit e944e27

File tree

3 files changed

+242
-0
lines changed

3 files changed

+242
-0
lines changed

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
require 'concurrent/channel/probe'
3636
require 'concurrent/channel/channel'
3737
require 'concurrent/channel/unbuffered_channel'
38+
require 'concurrent/channel/buffered_channel'
3839

3940
require 'concurrent/cached_thread_pool'
4041
require 'concurrent/fixed_thread_pool'
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
module Concurrent
2+
class BufferedChannel
3+
4+
def initialize(size)
5+
@mutex = Mutex.new
6+
@condition = Condition.new
7+
@buffer_condition = Condition.new
8+
9+
@probe_set = []
10+
@buffer = []
11+
@size = size
12+
end
13+
14+
def probe_set_size
15+
@mutex.synchronize { @probe_set.size }
16+
end
17+
18+
def buffer_queue_size
19+
@mutex.synchronize { @buffer.size }
20+
end
21+
22+
def push(value)
23+
until set_probe_or_push_into_buffer(value)
24+
end
25+
end
26+
27+
def pop
28+
probe = Probe.new
29+
select(probe)
30+
probe.value
31+
end
32+
33+
def select(probe)
34+
@mutex.synchronize do
35+
36+
if @buffer.empty?
37+
@probe_set << probe
38+
true
39+
else
40+
shift_buffer if probe.set_unless_assigned peek_buffer
41+
end
42+
43+
end
44+
end
45+
46+
def remove_probe(probe)
47+
@mutex.synchronize { @probe_set.delete(probe) }
48+
end
49+
50+
private
51+
52+
def buffer_full?
53+
@buffer.size == @size
54+
end
55+
56+
def buffer_empty?
57+
@buffer.empty?
58+
end
59+
60+
def push_into_buffer(value)
61+
@buffer_condition.wait(@mutex) while buffer_full?
62+
@buffer << value
63+
@buffer_condition.broadcast
64+
end
65+
66+
def peek_buffer
67+
@buffer_condition.wait(@mutex) while buffer_empty?
68+
@buffer.first
69+
end
70+
71+
def shift_buffer
72+
@buffer_condition.wait(@mutex) while buffer_empty?
73+
result = @buffer.shift
74+
@buffer_condition.broadcast
75+
result
76+
end
77+
78+
def set_probe_or_push_into_buffer(value)
79+
@mutex.synchronize do
80+
if @probe_set.empty?
81+
push_into_buffer(value)
82+
true
83+
else
84+
@probe_set.shift.set_unless_assigned(value)
85+
end
86+
end
87+
end
88+
89+
end
90+
end
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
require 'spec_helper'
2+
3+
module Concurrent
4+
5+
describe BufferedChannel do
6+
7+
let(:size) { 2 }
8+
let(:channel) { BufferedChannel.new(size) }
9+
let(:probe) { Probe.new }
10+
11+
context 'without timeout' do
12+
13+
describe '#push' do
14+
it 'adds elements to buffer' do
15+
channel.buffer_queue_size.should be 0
16+
17+
channel.push('a')
18+
channel.push('a')
19+
20+
channel.buffer_queue_size.should be 2
21+
end
22+
23+
it 'should block when buffer is full' do
24+
channel.push 1
25+
channel.push 2
26+
27+
t = Thread.new { channel.push 3 }
28+
sleep(0.05)
29+
t.status.should eq 'sleep'
30+
end
31+
32+
it 'restarts thread when buffer is no more full' do
33+
channel.push 'hi'
34+
channel.push 'foo'
35+
36+
result = nil
37+
38+
Thread.new { channel.push 'bar'; result = 42 }
39+
40+
sleep(0.1)
41+
42+
channel.pop
43+
44+
sleep(0.1)
45+
46+
result.should eq 42
47+
end
48+
49+
it 'should assign value to a probe if probe set is not empty' do
50+
channel.select(probe)
51+
Thread.new { sleep(0.1); channel.push 3 }
52+
probe.value.should eq 3
53+
end
54+
end
55+
56+
describe '#pop' do
57+
it 'should block if buffer is empty' do
58+
t = Thread.new { channel.pop }
59+
sleep(0.05)
60+
t.status.should eq 'sleep'
61+
end
62+
63+
it 'returns value if buffer is not empty' do
64+
channel.push 1
65+
result = channel.pop
66+
67+
result.should eq 1
68+
end
69+
70+
it 'removes the first value from the buffer' do
71+
channel.push 'a'
72+
channel.push 'b'
73+
74+
channel.pop.should eq 'a'
75+
channel.buffer_queue_size.should eq 1
76+
end
77+
end
78+
79+
end
80+
81+
describe 'select' do
82+
83+
it 'does not block' do
84+
t = Thread.new { channel.select(probe) }
85+
86+
sleep(0.05)
87+
88+
t.status.should eq false
89+
end
90+
91+
it 'gets notified by writer thread' do
92+
channel.select(probe)
93+
94+
Thread.new { channel.push 82 }
95+
96+
probe.value.should eq 82
97+
end
98+
99+
end
100+
101+
context 'already set probes' do
102+
context 'empty buffer' do
103+
it 'discards already set probes' do
104+
probe.set('set value')
105+
106+
channel.select(probe)
107+
108+
channel.push 27
109+
110+
channel.buffer_queue_size.should eq 1
111+
channel.probe_set_size.should eq 0
112+
end
113+
end
114+
115+
context 'empty probe set' do
116+
it 'discards set probe' do
117+
probe.set('set value')
118+
119+
channel.push 82
120+
121+
channel.select(probe)
122+
123+
channel.buffer_queue_size.should eq 1
124+
125+
channel.pop.should eq 82
126+
127+
end
128+
end
129+
end
130+
131+
describe 'probe set' do
132+
133+
it 'has size zero after creation' do
134+
channel.probe_set_size.should eq 0
135+
end
136+
137+
it 'increases size after a select' do
138+
channel.select(probe)
139+
channel.probe_set_size.should eq 1
140+
end
141+
142+
it 'decreases size after a removal' do
143+
channel.select(probe)
144+
channel.remove_probe(probe)
145+
channel.probe_set_size.should eq 0
146+
end
147+
148+
end
149+
150+
end
151+
end

0 commit comments

Comments
 (0)