Skip to content

Commit 3b5da34

Browse files
committed
Updated channels with Synchronization::Object.
1 parent 3ea4af2 commit 3b5da34

File tree

4 files changed

+38
-38
lines changed

4 files changed

+38
-38
lines changed
Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,76 @@
1-
require 'concurrent/atomic/condition'
1+
require 'concurrent/synchronization'
22

33
module Concurrent
4-
class BlockingRingBuffer
4+
class BlockingRingBuffer < Synchronization::Object
55

66
def initialize(capacity)
7-
@buffer = RingBuffer.new(capacity)
8-
@first = @last = 0
9-
@count = 0
10-
@mutex = Mutex.new
11-
@condition = Condition.new
7+
super(capacity)
128
end
139

1410
# @return [Integer] the capacity of the buffer
1511
def capacity
16-
@mutex.synchronize { @buffer.capacity }
12+
synchronize { @buffer.capacity }
1713
end
1814

1915
# @return [Integer] the number of elements currently in the buffer
2016
def count
21-
@mutex.synchronize { @buffer.count }
17+
synchronize { @buffer.count }
2218
end
2319

2420
# @return [Boolean] true if buffer is empty, false otherwise
2521
def empty?
26-
@mutex.synchronize { @buffer.empty? }
22+
synchronize { @buffer.empty? }
2723
end
2824

2925
# @return [Boolean] true if buffer is full, false otherwise
3026
def full?
31-
@mutex.synchronize { @buffer.full? }
27+
synchronize { @buffer.full? }
3228
end
3329

3430
# @param [Object] value the value to be inserted
3531
# @return [Boolean] true if value has been inserted, false otherwise
3632
def put(value)
37-
@mutex.synchronize do
33+
synchronize do
3834
wait_while_full
3935
@buffer.offer(value)
40-
@condition.signal
36+
ns_signal
4137
true
4238
end
4339
end
4440

4541
# @return [Object] the first available value and removes it from the buffer.
4642
# If buffer is empty it blocks until an element is available
4743
def take
48-
@mutex.synchronize do
44+
synchronize do
4945
wait_while_empty
5046
result = @buffer.poll
51-
@condition.signal
47+
ns_signal
5248
result
5349
end
5450
end
5551

5652
# @return [Object] the first available value and without removing it from
5753
# the buffer. If buffer is empty returns nil
5854
def peek
59-
@mutex.synchronize { @buffer.peek }
55+
synchronize { @buffer.peek }
56+
end
57+
58+
protected
59+
60+
def ns_initialize(capacity)
61+
@buffer = RingBuffer.new(capacity)
62+
@first = @last = 0
63+
@count = 0
6064
end
6165

6266
private
6367

6468
def wait_while_full
65-
@condition.wait(@mutex) while @buffer.full?
69+
ns_wait_until { !@buffer.full? }
6670
end
6771

6872
def wait_while_empty
69-
@condition.wait(@mutex) while @buffer.empty?
73+
ns_wait_until { !@buffer.empty? }
7074
end
71-
7275
end
7376
end

lib/concurrent/channel/buffered_channel.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
require 'concurrent/atomic/condition'
2-
3-
require_relative 'waitable_list'
2+
require 'concurrent/channel/waitable_list'
43

54
module Concurrent
65
class BufferedChannel
Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,38 @@
1-
require 'concurrent/atomic/condition'
1+
require 'concurrent/synchronization'
22

33
module Concurrent
4-
class WaitableList
5-
6-
def initialize
7-
@mutex = Mutex.new
8-
@condition = Condition.new
9-
10-
@list = []
11-
end
4+
class WaitableList < Synchronization::Object
125

136
def size
14-
@mutex.synchronize { @list.size }
7+
synchronize { @list.size }
158
end
169

1710
def empty?
18-
@mutex.synchronize { @list.empty? }
11+
synchronize { @list.empty? }
1912
end
2013

2114
def put(value)
22-
@mutex.synchronize do
15+
synchronize do
2316
@list << value
24-
@condition.signal
17+
ns_signal
2518
end
2619
end
2720

2821
def delete(value)
29-
@mutex.synchronize { @list.delete(value) }
22+
synchronize { @list.delete(value) }
3023
end
3124

3225
def take
33-
@mutex.synchronize do
34-
@condition.wait(@mutex) while @list.empty?
26+
synchronize do
27+
ns_wait_until { !@list.empty? }
3528
@list.shift
3629
end
3730
end
3831

32+
protected
33+
34+
def ns_initialize
35+
@list = []
36+
end
3937
end
4038
end

lib/concurrent/synchronization/abstract_object.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def ns_initialize(*args, &block)
5858
# synchronize { ns_wait_until(timeout, &condition) }
5959
# end
6060
# ```
61-
def ns_wait_until(timeout, &condition)
61+
def ns_wait_until(timeout = nil, &condition)
6262
if timeout
6363
wait_until = Concurrent.monotonic_time + timeout
6464
loop do

0 commit comments

Comments
 (0)