Skip to content

Commit a599eb8

Browse files
committed
added waitable list
1 parent d8e6f5e commit a599eb8

File tree

3 files changed

+52
-24
lines changed

3 files changed

+52
-24
lines changed

lib/concurrent/channel/buffered_channel.rb

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require_relative 'waitable_list'
2+
13
module Concurrent
24
class BufferedChannel
35

@@ -6,13 +8,13 @@ def initialize(size)
68
@condition = Condition.new
79
@buffer_condition = Condition.new
810

9-
@probe_set = []
11+
@probe_set = WaitableList.new
1012
@buffer = []
1113
@size = size
1214
end
1315

1416
def probe_set_size
15-
@mutex.synchronize { @probe_set.size }
17+
@probe_set.size
1618
end
1719

1820
def buffer_queue_size
@@ -34,7 +36,7 @@ def select(probe)
3436
@mutex.synchronize do
3537

3638
if @buffer.empty?
37-
@probe_set << probe
39+
@probe_set.push(probe)
3840
true
3941
else
4042
shift_buffer if probe.set_unless_assigned peek_buffer
@@ -44,7 +46,7 @@ def select(probe)
4446
end
4547

4648
def remove_probe(probe)
47-
@mutex.synchronize { @probe_set.delete(probe) }
49+
@probe_set.delete(probe)
4850
end
4951

5052
private
@@ -81,7 +83,7 @@ def set_probe_or_push_into_buffer(value)
8183
push_into_buffer(value)
8284
true
8385
else
84-
@probe_set.shift.set_unless_assigned(value)
86+
@probe_set.first.set_unless_assigned(value)
8587
end
8688
end
8789
end

lib/concurrent/channel/unbuffered_channel.rb

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1+
require_relative 'waitable_list'
2+
13
module Concurrent
24
class UnbufferedChannel
35

46
def initialize
5-
@mutex = Mutex.new
6-
@condition = Condition.new
7-
8-
@probe_set = []
7+
@probe_set = WaitableList.new
98
end
109

1110
def probe_set_size
12-
@mutex.synchronize { @probe_set.size }
11+
@probe_set.size
1312
end
1413

1514
def push(value)
16-
until first_waiting_probe.set_unless_assigned(value)
15+
until @probe_set.first.set_unless_assigned(value)
1716
end
1817
end
1918

@@ -24,22 +23,11 @@ def pop
2423
end
2524

2625
def select(probe)
27-
@mutex.synchronize do
28-
@probe_set << probe
29-
@condition.signal
30-
end
26+
@probe_set.push(probe)
3127
end
3228

3329
def remove_probe(probe)
34-
@mutex.synchronize { @probe_set.delete(probe) }
35-
end
36-
37-
private
38-
def first_waiting_probe
39-
@mutex.synchronize do
40-
@condition.wait(@mutex) while @probe_set.empty?
41-
@probe_set.shift
42-
end
30+
@probe_set.delete(probe)
4331
end
4432

4533
end
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
module Concurrent
2+
class WaitableList
3+
4+
def initialize
5+
@mutex = Mutex.new
6+
@condition = Condition.new
7+
8+
@list = []
9+
end
10+
11+
def size
12+
@mutex.synchronize { @list.size }
13+
end
14+
15+
def empty?
16+
@mutex.synchronize { @list.empty? }
17+
end
18+
19+
def push(value)
20+
@mutex.synchronize do
21+
@list << value
22+
@condition.signal
23+
end
24+
end
25+
26+
def delete(value)
27+
@mutex.synchronize { @list.delete(value) }
28+
end
29+
30+
def first
31+
@mutex.synchronize do
32+
@condition.wait(@mutex) while @list.empty?
33+
@list.shift
34+
end
35+
end
36+
37+
end
38+
end

0 commit comments

Comments
 (0)