Skip to content

Commit b9a2a3c

Browse files
committed
first RingBuffer implementation
1 parent 1c06a3c commit b9a2a3c

File tree

3 files changed

+143
-0
lines changed

3 files changed

+143
-0
lines changed

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
require 'concurrent/channel/channel'
3737
require 'concurrent/channel/unbuffered_channel'
3838
require 'concurrent/channel/buffered_channel'
39+
require 'concurrent/channel/ring_buffer'
3940

4041
require 'concurrent/cached_thread_pool'
4142
require 'concurrent/fixed_thread_pool'

lib/concurrent/channel/ring_buffer.rb

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
module Concurrent
2+
class RingBuffer
3+
4+
def initialize(capacity)
5+
@buffer = Array.new(capacity)
6+
@first = @last = 0
7+
@count = 0
8+
@mutex = Mutex.new
9+
@condition = Condition.new
10+
end
11+
12+
def capacity
13+
@mutex.synchronize { @buffer.size }
14+
end
15+
16+
def count
17+
@mutex.synchronize { @count }
18+
end
19+
20+
def put(value)
21+
@mutex.synchronize do
22+
wait_while_full
23+
@buffer[@last] = value
24+
@last += 1
25+
@count += 1
26+
@condition.signal
27+
end
28+
end
29+
30+
def take
31+
@mutex.synchronize do
32+
wait_while_empty
33+
result = @buffer[@first]
34+
@buffer[@first] = nil
35+
@first += 1
36+
@count -= 1
37+
@condition.signal
38+
result
39+
end
40+
end
41+
42+
private
43+
44+
def wait_while_full
45+
@condition.wait(@mutex) while @count == @buffer.size
46+
end
47+
48+
def wait_while_empty
49+
@condition.wait(@mutex) while @count == 0
50+
end
51+
52+
end
53+
end
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
require 'spec_helper'
2+
3+
module Concurrent
4+
5+
describe RingBuffer do
6+
7+
let(:capacity) { 3 }
8+
let(:buffer) { RingBuffer.new( capacity ) }
9+
10+
describe '#capacity' do
11+
it 'returns the value passed in constructor' do
12+
buffer.capacity.should eq capacity
13+
end
14+
end
15+
16+
describe '#count' do
17+
it 'is zero when created' do
18+
buffer.count.should eq 0
19+
end
20+
21+
it 'increases when an element is added' do
22+
buffer.put 5
23+
buffer.count.should eq 1
24+
25+
buffer.put 1
26+
buffer.count.should eq 2
27+
end
28+
29+
it 'decreases when an element is removed' do
30+
buffer.put 10
31+
32+
buffer.take
33+
34+
buffer.count.should eq 0
35+
end
36+
end
37+
38+
describe '#put' do
39+
it 'block when buffer is full' do
40+
capacity.times { buffer.put 27 }
41+
42+
t = Thread.new { buffer.put 32 }
43+
44+
sleep(0.1)
45+
46+
t.status.should eq 'sleep'
47+
end
48+
49+
it 'continues when an element is removed' do
50+
latch = CountDownLatch.new(1)
51+
52+
Thread.new { (capacity + 1).times { buffer.put 'hi' }; latch.count_down }
53+
Thread.new { sleep(0.1); buffer.take }
54+
55+
latch.wait(0.2).should be_true
56+
end
57+
end
58+
59+
describe '#take' do
60+
it 'blocks when buffer is empty' do
61+
t = Thread.new { buffer.take }
62+
63+
sleep(0.1)
64+
65+
t.status.should eq 'sleep'
66+
end
67+
68+
it 'continues when an element is added' do
69+
latch = CountDownLatch.new(1)
70+
71+
Thread.new { buffer.take; latch.count_down }
72+
Thread.new { sleep(0.1); buffer.put 3 }
73+
74+
latch.wait(0.2).should be_true
75+
end
76+
77+
it 'returns the first added value' do
78+
buffer.put 'hi'
79+
buffer.put 'foo'
80+
buffer.put 'bar'
81+
82+
buffer.take.should eq 'hi'
83+
buffer.take.should eq 'foo'
84+
buffer.take.should eq 'bar'
85+
end
86+
end
87+
88+
end
89+
end

0 commit comments

Comments
 (0)