Skip to content

Commit 29c355f

Browse files
committed
Merge pull request #321 from ruby-concurrency/internal-reorg-channel
Moved all channel classes into the Channel module.
2 parents 1b53506 + 1434ee4 commit 29c355f

11 files changed

+656
-630
lines changed
Lines changed: 59 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,77 +1,80 @@
11
require 'concurrent/synchronization'
22

33
module Concurrent
4-
class BlockingRingBuffer < Synchronization::Object
4+
module Channel
55

6-
def initialize(capacity)
7-
super()
8-
synchronize { ns_initialize capacity}
9-
end
6+
class BlockingRingBuffer < Synchronization::Object
107

11-
# @return [Integer] the capacity of the buffer
12-
def capacity
13-
synchronize { @buffer.capacity }
14-
end
8+
def initialize(capacity)
9+
super()
10+
synchronize { ns_initialize capacity}
11+
end
1512

16-
# @return [Integer] the number of elements currently in the buffer
17-
def count
18-
synchronize { @buffer.count }
19-
end
13+
# @return [Integer] the capacity of the buffer
14+
def capacity
15+
synchronize { @buffer.capacity }
16+
end
2017

21-
# @return [Boolean] true if buffer is empty, false otherwise
22-
def empty?
23-
synchronize { @buffer.empty? }
24-
end
18+
# @return [Integer] the number of elements currently in the buffer
19+
def count
20+
synchronize { @buffer.count }
21+
end
2522

26-
# @return [Boolean] true if buffer is full, false otherwise
27-
def full?
28-
synchronize { @buffer.full? }
29-
end
23+
# @return [Boolean] true if buffer is empty, false otherwise
24+
def empty?
25+
synchronize { @buffer.empty? }
26+
end
3027

31-
# @param [Object] value the value to be inserted
32-
# @return [Boolean] true if value has been inserted, false otherwise
33-
def put(value)
34-
synchronize do
35-
wait_while_full
36-
@buffer.offer(value)
37-
ns_signal
38-
true
28+
# @return [Boolean] true if buffer is full, false otherwise
29+
def full?
30+
synchronize { @buffer.full? }
3931
end
40-
end
4132

42-
# @return [Object] the first available value and removes it from the buffer.
43-
# If buffer is empty it blocks until an element is available
44-
def take
45-
synchronize do
46-
wait_while_empty
47-
result = @buffer.poll
48-
ns_signal
49-
result
33+
# @param [Object] value the value to be inserted
34+
# @return [Boolean] true if value has been inserted, false otherwise
35+
def put(value)
36+
synchronize do
37+
wait_while_full
38+
@buffer.offer(value)
39+
ns_signal
40+
true
41+
end
5042
end
51-
end
5243

53-
# @return [Object] the first available value and without removing it from
54-
# the buffer. If buffer is empty returns nil
55-
def peek
56-
synchronize { @buffer.peek }
57-
end
44+
# @return [Object] the first available value and removes it from the buffer.
45+
# If buffer is empty it blocks until an element is available
46+
def take
47+
synchronize do
48+
wait_while_empty
49+
result = @buffer.poll
50+
ns_signal
51+
result
52+
end
53+
end
5854

59-
protected
55+
# @return [Object] the first available value and without removing it from
56+
# the buffer. If buffer is empty returns nil
57+
def peek
58+
synchronize { @buffer.peek }
59+
end
6060

61-
def ns_initialize(capacity)
62-
@buffer = RingBuffer.new(capacity)
63-
@first = @last = 0
64-
@count = 0
65-
end
61+
protected
6662

67-
private
63+
def ns_initialize(capacity)
64+
@buffer = RingBuffer.new(capacity)
65+
@first = @last = 0
66+
@count = 0
67+
end
6868

69-
def wait_while_full
70-
ns_wait_until { !@buffer.full? }
71-
end
69+
private
7270

73-
def wait_while_empty
74-
ns_wait_until { !@buffer.empty? }
71+
def wait_while_full
72+
ns_wait_until { !@buffer.full? }
73+
end
74+
75+
def wait_while_empty
76+
ns_wait_until { !@buffer.empty? }
77+
end
7578
end
7679
end
7780
end
Lines changed: 60 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,85 @@
11
require 'concurrent/channel/waitable_list'
22

33
module Concurrent
4-
class BufferedChannel
4+
module Channel
55

6-
def initialize(size)
7-
@mutex = Mutex.new
8-
@buffer_condition = ConditionVariable.new
6+
class BufferedChannel
97

10-
@probe_set = WaitableList.new
11-
@buffer = RingBuffer.new(size)
12-
end
8+
def initialize(size)
9+
@mutex = Mutex.new
10+
@buffer_condition = ConditionVariable.new
1311

14-
def probe_set_size
15-
@probe_set.size
16-
end
12+
@probe_set = WaitableList.new
13+
@buffer = RingBuffer.new(size)
14+
end
1715

18-
def buffer_queue_size
19-
@mutex.synchronize { @buffer.count }
20-
end
16+
def probe_set_size
17+
@probe_set.size
18+
end
2119

22-
def push(value)
23-
until set_probe_or_push_into_buffer(value)
20+
def buffer_queue_size
21+
@mutex.synchronize { @buffer.count }
2422
end
25-
end
2623

27-
def pop
28-
probe = Channel::Probe.new
29-
select(probe)
30-
probe.value
31-
end
24+
def push(value)
25+
until set_probe_or_push_into_buffer(value)
26+
end
27+
end
3228

33-
def select(probe)
34-
@mutex.synchronize do
29+
def pop
30+
probe = Channel::Probe.new
31+
select(probe)
32+
probe.value
33+
end
3534

36-
if @buffer.empty?
37-
@probe_set.put(probe)
38-
true
39-
else
40-
shift_buffer if probe.try_set([peek_buffer, self])
41-
end
35+
def select(probe)
36+
@mutex.synchronize do
37+
38+
if @buffer.empty?
39+
@probe_set.put(probe)
40+
true
41+
else
42+
shift_buffer if probe.try_set([peek_buffer, self])
43+
end
4244

45+
end
4346
end
44-
end
4547

46-
def remove_probe(probe)
47-
@probe_set.delete(probe)
48-
end
48+
def remove_probe(probe)
49+
@probe_set.delete(probe)
50+
end
4951

50-
private
52+
private
5153

52-
def push_into_buffer(value)
53-
@buffer_condition.wait(@mutex) while @buffer.full?
54-
@buffer.offer value
55-
@buffer_condition.broadcast
56-
end
54+
def push_into_buffer(value)
55+
@buffer_condition.wait(@mutex) while @buffer.full?
56+
@buffer.offer value
57+
@buffer_condition.broadcast
58+
end
5759

58-
def peek_buffer
59-
@buffer_condition.wait(@mutex) while @buffer.empty?
60-
@buffer.peek
61-
end
60+
def peek_buffer
61+
@buffer_condition.wait(@mutex) while @buffer.empty?
62+
@buffer.peek
63+
end
6264

63-
def shift_buffer
64-
@buffer_condition.wait(@mutex) while @buffer.empty?
65-
result = @buffer.poll
66-
@buffer_condition.broadcast
67-
result
68-
end
65+
def shift_buffer
66+
@buffer_condition.wait(@mutex) while @buffer.empty?
67+
result = @buffer.poll
68+
@buffer_condition.broadcast
69+
result
70+
end
6971

70-
def set_probe_or_push_into_buffer(value)
71-
@mutex.synchronize do
72-
if @probe_set.empty?
73-
push_into_buffer(value)
74-
true
75-
else
76-
@probe_set.take.try_set([value, self])
72+
def set_probe_or_push_into_buffer(value)
73+
@mutex.synchronize do
74+
if @probe_set.empty?
75+
push_into_buffer(value)
76+
true
77+
else
78+
@probe_set.take.try_set([value, self])
79+
end
7780
end
7881
end
79-
end
8082

83+
end
8184
end
8285
end

lib/concurrent/channel/ring_buffer.rb

Lines changed: 57 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,62 @@
11
module Concurrent
2+
module Channel
3+
4+
# non-thread safe buffer
5+
class RingBuffer
6+
7+
def initialize(capacity)
8+
@buffer = Array.new(capacity)
9+
@first = @last = 0
10+
@count = 0
11+
end
12+
13+
14+
# @return [Integer] the capacity of the buffer
15+
def capacity
16+
@buffer.size
17+
end
18+
19+
# @return [Integer] the number of elements currently in the buffer
20+
def count
21+
@count
22+
end
23+
24+
# @return [Boolean] true if buffer is empty, false otherwise
25+
def empty?
26+
@count == 0
27+
end
28+
29+
# @return [Boolean] true if buffer is full, false otherwise
30+
def full?
31+
@count == capacity
32+
end
33+
34+
# @param [Object] value
35+
# @return [Boolean] true if value has been inserted, false otherwise
36+
def offer(value)
37+
return false if full?
38+
39+
@buffer[@last] = value
40+
@last = (@last + 1) % @buffer.size
41+
@count += 1
42+
true
43+
end
44+
45+
# @return [Object] the first available value and removes it from the buffer. If buffer is empty returns nil
46+
def poll
47+
result = @buffer[@first]
48+
@buffer[@first] = nil
49+
@first = (@first + 1) % @buffer.size
50+
@count -= 1
51+
result
52+
end
53+
54+
# @return [Object] the first available value and without removing it from
55+
# the buffer. If buffer is empty returns nil
56+
def peek
57+
@buffer[@first]
58+
end
259

3-
# non-thread safe buffer
4-
class RingBuffer
5-
6-
def initialize(capacity)
7-
@buffer = Array.new(capacity)
8-
@first = @last = 0
9-
@count = 0
10-
end
11-
12-
13-
# @return [Integer] the capacity of the buffer
14-
def capacity
15-
@buffer.size
16-
end
17-
18-
# @return [Integer] the number of elements currently in the buffer
19-
def count
20-
@count
21-
end
22-
23-
# @return [Boolean] true if buffer is empty, false otherwise
24-
def empty?
25-
@count == 0
26-
end
27-
28-
# @return [Boolean] true if buffer is full, false otherwise
29-
def full?
30-
@count == capacity
3160
end
32-
33-
# @param [Object] value
34-
# @return [Boolean] true if value has been inserted, false otherwise
35-
def offer(value)
36-
return false if full?
37-
38-
@buffer[@last] = value
39-
@last = (@last + 1) % @buffer.size
40-
@count += 1
41-
true
42-
end
43-
44-
# @return [Object] the first available value and removes it from the buffer. If buffer is empty returns nil
45-
def poll
46-
result = @buffer[@first]
47-
@buffer[@first] = nil
48-
@first = (@first + 1) % @buffer.size
49-
@count -= 1
50-
result
51-
end
52-
53-
# @return [Object] the first available value and without removing it from
54-
# the buffer. If buffer is empty returns nil
55-
def peek
56-
@buffer[@first]
57-
end
58-
5961
end
6062
end

0 commit comments

Comments
 (0)