1
+ require 'concurrent/synchronization'
1
2
require 'concurrent/channel/waitable_list'
2
3
3
4
module Concurrent
4
5
module Channel
5
6
6
7
# @api Channel
7
8
# @!macro edge_warning
8
- class BufferedChannel
9
+ class BufferedChannel < Synchronization :: Object
9
10
10
11
def initialize ( size )
11
- @mutex = Mutex . new
12
- @buffer_condition = ConditionVariable . new
13
-
14
- @probe_set = WaitableList . new
15
- @buffer = RingBuffer . new ( size )
12
+ super ( )
13
+ synchronize { ns_initialize ( size ) }
16
14
end
17
15
18
16
def probe_set_size
19
17
@probe_set . size
20
18
end
21
19
22
20
def buffer_queue_size
23
- @mutex . synchronize { @buffer . count }
21
+ synchronize { @buffer . count }
24
22
end
25
23
26
24
def push ( value )
27
- until set_probe_or_push_into_buffer ( value )
25
+ until set_probe_or_ns_push_into_buffer ( value )
28
26
end
29
27
end
30
28
@@ -35,53 +33,57 @@ def pop
35
33
end
36
34
37
35
def select ( probe )
38
- @mutex . synchronize do
39
-
36
+ synchronize do
40
37
if @buffer . empty?
41
38
@probe_set . put ( probe )
42
39
true
43
40
else
44
- shift_buffer if probe . try_set ( [ peek_buffer , self ] )
41
+ ns_shift_buffer if probe . try_set ( [ ns_peek_buffer , self ] )
45
42
end
46
-
47
43
end
48
44
end
49
45
50
46
def remove_probe ( probe )
51
47
@probe_set . delete ( probe )
52
48
end
53
49
50
+ protected
51
+
52
+ def ns_initialize ( size )
53
+ @probe_set = WaitableList . new
54
+ @buffer = RingBuffer . new ( size )
55
+ end
56
+
54
57
private
55
58
56
- def push_into_buffer ( value )
57
- @buffer_condition . wait ( @mutex ) while @buffer . full?
59
+ def ns_push_into_buffer ( value )
60
+ ns_wait while @buffer . full?
58
61
@buffer . offer value
59
- @buffer_condition . broadcast
62
+ ns_broadcast
60
63
end
61
64
62
- def peek_buffer
63
- @buffer_condition . wait ( @mutex ) while @buffer . empty?
65
+ def ns_peek_buffer
66
+ ns_wait while @buffer . empty?
64
67
@buffer . peek
65
68
end
66
69
67
- def shift_buffer
68
- @buffer_condition . wait ( @mutex ) while @buffer . empty?
70
+ def ns_shift_buffer
71
+ ns_wait while @buffer . empty?
69
72
result = @buffer . poll
70
- @buffer_condition . broadcast
73
+ ns_broadcast
71
74
result
72
75
end
73
76
74
- def set_probe_or_push_into_buffer ( value )
75
- @mutex . synchronize do
77
+ def set_probe_or_ns_push_into_buffer ( value )
78
+ synchronize do
76
79
if @probe_set . empty?
77
- push_into_buffer ( value )
80
+ ns_push_into_buffer ( value )
78
81
true
79
82
else
80
83
@probe_set . take . try_set ( [ value , self ] )
81
84
end
82
85
end
83
86
end
84
-
85
87
end
86
88
end
87
89
end
0 commit comments