Skip to content

Commit da59f26

Browse files
committed
initial unbuffered channel implementation
1 parent 487f066 commit da59f26

File tree

3 files changed

+87
-0
lines changed

3 files changed

+87
-0
lines changed

lib/concurrent.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
require 'concurrent/tvar'
3333
require 'concurrent/utilities'
3434

35+
require 'concurrent/channel/unbuffered_channel'
36+
3537
require 'concurrent/cached_thread_pool'
3638
require 'concurrent/fixed_thread_pool'
3739
require 'concurrent/immediate_executor'
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
module Concurrent
2+
class UnbufferedChannel
3+
4+
def initialize
5+
@mutex = Mutex.new
6+
@condition = Condition.new
7+
8+
@wait_set = []
9+
end
10+
11+
def push(value)
12+
probe = @mutex.synchronize do
13+
@condition.wait(@mutex) while @wait_set.empty?
14+
@wait_set.shift
15+
end
16+
17+
probe.set(value)
18+
end
19+
20+
def pop
21+
probe = IVar.new
22+
23+
@mutex.synchronize do
24+
@wait_set << probe
25+
@condition.signal
26+
end
27+
28+
probe.value
29+
end
30+
31+
def select(probe)
32+
end
33+
34+
def remove_probe(probe)
35+
end
36+
37+
end
38+
end
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
require 'spec_helper'
2+
3+
module Concurrent
4+
5+
describe UnbufferedChannel do
6+
7+
let!(:channel) { subject } # let is not thread safe, let! creates the object before ensuring uniqueness
8+
9+
context 'with one thread' do
10+
11+
context 'without timeout' do
12+
13+
describe '#push' do
14+
it 'should block' do
15+
t = Thread.new { channel.push 5 }
16+
sleep(0.05)
17+
t.status.should eq 'sleep'
18+
end
19+
end
20+
21+
describe '#pop' do
22+
it 'should block' do
23+
t = Thread.new { channel.pop }
24+
sleep(0.05)
25+
t.status.should eq 'sleep'
26+
end
27+
end
28+
29+
end
30+
31+
end
32+
33+
context 'cooperating threads' do
34+
it 'passes the pushed value to thread waiting on pop' do
35+
result = nil
36+
37+
Thread.new { channel.push 42 }
38+
Thread.new { result = channel.pop }
39+
40+
sleep(0.05)
41+
42+
result.should eq 42
43+
end
44+
end
45+
46+
end
47+
end

0 commit comments

Comments
 (0)