Skip to content

Commit 0df8cdb

Browse files
committed
Fixed channel ticker/timer closing and next.
1 parent 09e482d commit 0df8cdb

File tree

9 files changed

+118
-103
lines changed

9 files changed

+118
-103
lines changed

lib/concurrent/agent.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ class Agent < Synchronization::LockableObject
166166
class Error < StandardError
167167
def initialize(message = nil)
168168
message ||= 'agent must be restarted before jobs can post'
169+
super(message)
169170
end
170171
end
171172

@@ -174,6 +175,7 @@ def initialize(message = nil)
174175
class ValidationError < Error
175176
def initialize(message = nil)
176177
message ||= 'invalid value'
178+
super(message)
177179
end
178180
end
179181

lib/concurrent/channel.rb

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,20 +116,25 @@ def offer?(item)
116116
end
117117

118118
def take
119-
item, _ = self.next
120-
item
119+
item = do_take
120+
item == Buffer::NO_VALUE ? nil : item
121121
end
122122
alias_method :receive, :take
123123
alias_method :~, :take
124124

125125
def take!
126-
item, _ = do_next
126+
item = do_take
127127
raise Error if item == Buffer::NO_VALUE
128128
item
129129
end
130130

131131
def take?
132-
item, _ = self.next?
132+
item = do_take
133+
item = if item == Buffer::NO_VALUE
134+
Concurrent::Maybe.nothing
135+
else
136+
Concurrent::Maybe.just(item)
137+
end
133138
item
134139
end
135140

@@ -238,13 +243,21 @@ def go_loop_via(executor, *args, &block)
238243

239244
private
240245

241-
def validator() @validator; end
246+
def validator
247+
@validator
248+
end
242249

243-
def validator=(value) @validator = value; end
250+
def validator=(value)
251+
@validator = value
252+
end
244253

245-
def buffer() @buffer; end
254+
def buffer
255+
@buffer
256+
end
246257

247-
def buffer=(value) @buffer = value; end
258+
def buffer=(value)
259+
@buffer = value
260+
end
248261

249262
def validate(value, allow_nil, raise_error)
250263
if !allow_nil && value.nil?
@@ -267,6 +280,10 @@ def do_offer(item)
267280
buffer.offer(item)
268281
end
269282

283+
def do_take
284+
buffer.take
285+
end
286+
270287
def do_next
271288
buffer.next
272289
end

lib/concurrent/channel/buffer/base.rb

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,25 @@ def closed?
193193

194194
private
195195

196-
def buffer() @buffer; end
196+
def buffer
197+
@buffer
198+
end
197199

198-
def buffer=(value) @buffer = value; end
200+
def buffer=(value)
201+
@buffer = value
202+
end
199203

200-
def closed=(value) @closed = value; end
204+
def closed=(value)
205+
@closed = value
206+
end
201207

202-
def capacity=(value) @capacity = value; end
208+
def capacity=(value)
209+
@capacity = value
210+
end
203211

204-
def size=(value) @size = value; end
212+
def size=(value)
213+
@size = value
214+
end
205215

206216
def ns_initialize(*args)
207217
end

lib/concurrent/channel/buffer/ticker.rb

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,26 @@ def offer(item)
1717
end
1818

1919
def take
20+
# a Go timer will block forever if stopped
2021
loop do
21-
result, _ = do_poll
22-
if result.nil?
23-
return NO_VALUE
24-
elsif result != NO_VALUE
25-
return result
26-
end
22+
tick = do_poll
23+
return tick if tick != NO_VALUE
24+
Thread.pass
2725
end
2826
end
2927

3028
def next
29+
# a Go timer will block forever if stopped
30+
# it will always return `true` for more
3131
loop do
32-
result, _ = do_poll
33-
if result.nil?
34-
return NO_VALUE, false
35-
elsif result != NO_VALUE
36-
return result, true
37-
end
32+
tick = do_poll
33+
return tick, true if tick != NO_VALUE
34+
Thread.pass
3835
end
3936
end
4037

4138
def poll
42-
result, _ = do_poll
43-
if result.nil? || result == NO_VALUE
44-
NO_VALUE
45-
else
46-
result
47-
end
39+
do_poll
4840
end
4941

5042
private
@@ -55,21 +47,27 @@ def ns_initialize(interval)
5547
self.capacity = 1
5648
end
5749

58-
def ns_size() 0; end
50+
def ns_size
51+
0
52+
end
5953

60-
def ns_empty?() false; end
54+
def ns_empty?
55+
false
56+
end
6157

62-
def ns_full?() true; end
58+
def ns_full?
59+
true
60+
end
6361

6462
def do_poll
65-
if ns_closed?
66-
return nil, false
67-
elsif (now = Concurrent.monotonic_time) > @next_tick
68-
tick = Concurrent::Channel::Tick.new(@next_tick)
69-
@next_tick = now + @interval
70-
return tick, true
71-
else
72-
return NO_VALUE, true
63+
synchronize do
64+
if !ns_closed? && (now = Concurrent.monotonic_time) >= @next_tick
65+
tick = Concurrent::Channel::Tick.new(@next_tick)
66+
@next_tick = now + @interval
67+
return tick
68+
else
69+
return NO_VALUE
70+
end
7371
end
7472
end
7573
end

lib/concurrent/channel/buffer/timer.rb

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,26 @@ def offer(item)
1717
end
1818

1919
def take
20+
# a Go timer will block forever if stopped
2021
loop do
21-
result, tick = do_poll
22-
if result == :closed
23-
return NO_VALUE
24-
elsif result == :tick
25-
return tick
26-
end
22+
tick = do_poll
23+
return tick if tick != NO_VALUE
2724
Thread.pass
2825
end
2926
end
3027

3128
def next
29+
# a Go timer will block forever if stopped
30+
# it will always return `true` for more
3231
loop do
33-
status, tick = do_poll
34-
if status == :closed
35-
return NO_VALUE, false
36-
elsif status == :tick
37-
return tick, false
38-
# AFAIK a Go timer will block forever if stopped
39-
#elsif status == :closed
40-
#return false, false
41-
end
32+
tick = do_poll
33+
return tick, true if tick != NO_VALUE
4234
Thread.pass
4335
end
4436
end
4537

4638
def poll
47-
status, tick = do_poll
48-
status == :tick ? tick : NO_VALUE
39+
do_poll
4940
end
5041

5142
private
@@ -55,22 +46,26 @@ def ns_initialize(delay)
5546
self.capacity = 1
5647
end
5748

58-
def ns_size() 0; end
49+
def ns_size
50+
0
51+
end
5952

60-
def ns_empty?() false; end
53+
def ns_empty?
54+
false
55+
end
6156

62-
def ns_full?() true; end
57+
def ns_full?
58+
true
59+
end
6360

6461
def do_poll
6562
synchronize do
66-
if ns_closed?
67-
return :closed, false
68-
elsif Concurrent.monotonic_time > @tick
63+
if !ns_closed? && Concurrent.monotonic_time >= @tick
6964
# only one listener gets notified
7065
self.closed = true
71-
return :tick, Concurrent::Channel::Tick.new(@tick)
66+
return Concurrent::Channel::Tick.new(@tick)
7267
else
73-
return :wait, true
68+
return NO_VALUE
7469
end
7570
end
7671
end

spec/concurrent/channel/buffer/ticker_spec.rb

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ module Concurrent::Channel::Buffer
44

55
describe Ticker do
66

7-
subject { described_class.new(1) }
7+
let(:delay) { 0.1 }
8+
subject { described_class.new(delay) }
89

910
it_behaves_like :channel_timing_buffer
1011

1112
context '#take' do
1213
it 'triggers until closed' do
13-
subject = described_class.new(0.1)
1414
expected = 3
1515
actual = 0
1616
expected.times { actual += 1 if subject.take.is_a? Concurrent::Channel::Tick }
@@ -20,7 +20,6 @@ module Concurrent::Channel::Buffer
2020

2121
context '#poll' do
2222
it 'triggers until closed' do
23-
subject = described_class.new(0.1)
2423
expected = 3
2524
actual = 0
2625
expected.times do
@@ -32,15 +31,7 @@ module Concurrent::Channel::Buffer
3231
end
3332

3433
context '#next' do
35-
36-
it 'returns more until closed' do
37-
subject = described_class.new(0.1)
38-
_, more = subject.next
39-
expect(more).to be true
40-
end
41-
4234
it 'triggers until closed' do
43-
subject = described_class.new(0.1)
4435
expected = 3
4536
actual = 0
4637
expected.times { actual += 1 if subject.next.first.is_a? Concurrent::Channel::Tick }

spec/concurrent/channel/buffer/timer_spec.rb

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,20 @@ module Concurrent::Channel::Buffer
44

55
describe Timer do
66

7-
subject { described_class.new(0) }
7+
let(:delay) { 0.1 }
8+
subject { described_class.new(0.1) }
89

910
it_behaves_like :channel_timing_buffer
1011

1112
context '#take' do
1213
it 'closes automatically on first take' do
13-
subject = described_class.new(0.1)
1414
expect(subject.take).to be_truthy
1515
expect(subject).to be_closed
1616
end
1717
end
1818

1919
context '#poll' do
2020
it 'closes automatically on first take' do
21-
subject = described_class.new(0.1)
2221
loop do
2322
break if subject.poll != NO_VALUE
2423
end
@@ -27,25 +26,13 @@ module Concurrent::Channel::Buffer
2726
end
2827

2928
context '#next' do
30-
3129
it 'closes automatically on first take' do
32-
subject = described_class.new(0.1)
3330
loop do
3431
value, _ = subject.next
3532
break if value != NO_VALUE
3633
end
3734
expect(subject).to be_closed
3835
end
39-
40-
it 'returns false for more on first take' do
41-
subject = described_class.new(0.1)
42-
more = true
43-
loop do
44-
value, more = subject.next
45-
break if value != NO_VALUE
46-
end
47-
expect(more).to be false
48-
end
4936
end
5037
end
5138
end

0 commit comments

Comments
 (0)