Skip to content

Commit 774c213

Browse files
committed
buffered channel refactor using ring buffer
1 parent bdab705 commit 774c213

File tree

1 file changed

+8
-17
lines changed

1 file changed

+8
-17
lines changed

lib/concurrent/channel/buffered_channel.rb

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,15 @@ def initialize(size)
99
@buffer_condition = Condition.new
1010

1111
@probe_set = WaitableList.new
12-
@buffer = []
13-
@size = size
12+
@buffer = RingBuffer.new(size)
1413
end
1514

1615
def probe_set_size
1716
@probe_set.size
1817
end
1918

2019
def buffer_queue_size
21-
@mutex.synchronize { @buffer.size }
20+
@mutex.synchronize { @buffer.count }
2221
end
2322

2423
def push(value)
@@ -51,28 +50,20 @@ def remove_probe(probe)
5150

5251
private
5352

54-
def buffer_full?
55-
@buffer.size == @size
56-
end
57-
58-
def buffer_empty?
59-
@buffer.empty?
60-
end
61-
6253
def push_into_buffer(value)
63-
@buffer_condition.wait(@mutex) while buffer_full?
64-
@buffer << value
54+
@buffer_condition.wait(@mutex) while @buffer.full?
55+
@buffer.offer value
6556
@buffer_condition.broadcast
6657
end
6758

6859
def peek_buffer
69-
@buffer_condition.wait(@mutex) while buffer_empty?
70-
@buffer.first
60+
@buffer_condition.wait(@mutex) while @buffer.empty?
61+
@buffer.peek
7162
end
7263

7364
def shift_buffer
74-
@buffer_condition.wait(@mutex) while buffer_empty?
75-
result = @buffer.shift
65+
@buffer_condition.wait(@mutex) while @buffer.empty?
66+
result = @buffer.poll
7667
@buffer_condition.broadcast
7768
result
7869
end

0 commit comments

Comments
 (0)