Skip to content

Commit 6563469

Browse files
committed
added support for expired probes
1 parent da59f26 commit 6563469

File tree

5 files changed

+96
-11
lines changed

5 files changed

+96
-11
lines changed

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
require 'concurrent/tvar'
3333
require 'concurrent/utilities'
3434

35+
require 'concurrent/channel/probe'
3536
require 'concurrent/channel/unbuffered_channel'
3637

3738
require 'concurrent/cached_thread_pool'

lib/concurrent/channel/probe.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
module Concurrent
2+
class Probe < IVar
3+
4+
def initialize(value = NO_VALUE, opts = {})
5+
super(value, opts)
6+
end
7+
8+
def set_unless_assigned(value)
9+
mutex.synchronize do
10+
return false if [:fulfilled, :rejected].include? @state
11+
12+
set_state(true, value, nil)
13+
event.set
14+
true
15+
end
16+
17+
end
18+
end
19+
end

lib/concurrent/channel/unbuffered_channel.rb

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,33 @@ def initialize
99
end
1010

1111
def push(value)
12-
probe = @mutex.synchronize do
13-
@condition.wait(@mutex) while @wait_set.empty?
14-
@wait_set.shift
12+
until first_waiting_probe.set_unless_assigned(value)
1513
end
16-
17-
probe.set(value)
1814
end
1915

2016
def pop
21-
probe = IVar.new
17+
probe = Probe.new
18+
select(probe)
19+
probe.value
20+
end
2221

22+
def select(probe)
2323
@mutex.synchronize do
2424
@wait_set << probe
2525
@condition.signal
2626
end
27-
28-
probe.value
29-
end
30-
31-
def select(probe)
3227
end
3328

3429
def remove_probe(probe)
3530
end
3631

32+
private
33+
def first_waiting_probe
34+
@mutex.synchronize do
35+
@condition.wait(@mutex) while @wait_set.empty?
36+
@wait_set.shift
37+
end
38+
end
39+
3740
end
3841
end

spec/concurrent/channel/probe_spec.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
require 'spec_helper'
2+
3+
module Concurrent
4+
5+
describe Probe do
6+
it 'should be written'
7+
end
8+
end

spec/concurrent/channel/unbuffered_channel_spec.rb

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ module Concurrent
3131
end
3232

3333
context 'cooperating threads' do
34+
3435
it 'passes the pushed value to thread waiting on pop' do
3536
result = nil
3637

@@ -41,7 +42,60 @@ module Concurrent
4142

4243
result.should eq 42
4344
end
45+
46+
it 'passes the pushed value to only one thread' do
47+
result = []
48+
49+
Thread.new { channel.push 37 }
50+
Thread.new { result << channel.pop }
51+
Thread.new { result << channel.pop }
52+
53+
sleep(0.05)
54+
55+
result.should have(1).items
56+
end
4457
end
4558

59+
describe 'select' do
60+
61+
let(:probe) { Probe.new }
62+
63+
it 'does not block' do
64+
t = Thread.new { channel.select(probe) }
65+
66+
sleep(0.05)
67+
68+
t.status.should eq false
69+
end
70+
71+
it 'gets notified by writer thread' do
72+
channel.select(probe)
73+
74+
Thread.new { channel.push 82 }
75+
76+
probe.value.should eq 82
77+
end
78+
79+
it 'ignores already set probes and waits for a new one' do
80+
probe.set(27)
81+
82+
channel.select(probe)
83+
84+
t = Thread.new { channel.push 72 }
85+
86+
sleep(0.05)
87+
88+
t.status.should eq 'sleep'
89+
90+
new_probe = Probe.new
91+
92+
channel.select(new_probe)
93+
94+
sleep(0.05)
95+
96+
new_probe.value.should eq 72
97+
end
98+
99+
end
46100
end
47101
end

0 commit comments

Comments
 (0)