Skip to content

Commit 5b9215d

Browse files
committed
first working wait implementation
1 parent ff717f7 commit 5b9215d

File tree

2 files changed

+66
-10
lines changed

2 files changed

+66
-10
lines changed

lib/concurrent/atomic/cyclic_barrier.rb

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ class CyclicBarrier
88
# @yield an optional block that will be executed that will be executed after the last thread arrives and before the others are released
99
#
1010
# @raise [ArgumentError] if `parties` is not an integer or is less than zero
11-
def initialize(parties)
11+
def initialize(parties, &block)
1212
raise ArgumentError.new('count must be in integer greater than or equal zero') if !parties.is_a?(Fixnum) || parties < 1
1313
@parties = parties
1414
@mutex = Mutex.new
1515
@condition = Condition.new
1616
@number_waiting = 0
17+
@action = block
1718
end
1819

1920
# @return [Fixnum] the number of threads needed to pass the barrier
@@ -27,17 +28,21 @@ def number_waiting
2728
end
2829

2930
# Blocks on the barrier until the number of waiting threads is equal to `parties` or until `timeout` is reached or `reset` is called
31+
# If a block has been passed to the constructor, it will be executed once by the last arrived thread before releasing the others
3032
# @param [Fixnum] timeout the number of seconds to wait for the counter or `nil` to block indefinitely
3133
# @return [Boolean] `true` if the `count` reaches zero else false on `timeout` or on `reset`
3234
def wait(timeout = nil)
3335
@mutex.synchronize do
3436
@number_waiting += 1
3537
if @number_waiting == @parties
38+
@action.call if @action
3639
@condition.broadcast
3740
@number_waiting = 0
3841
else
39-
@condition.wait(@mutex)
42+
@condition.wait(@mutex, timeout)
4043
end
44+
45+
true
4146
end
4247
end
4348

@@ -56,6 +61,7 @@ def reset
5661
# A broken barrier can be restored using `reset` it's safer to create a new one
5762
# @return [Boolean] true if the barrier is broken otherwise false
5863
def broken?
64+
false
5965
end
6066

6167
end

spec/concurrent/atomic/cyclic_barrier_spec.rb

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,15 @@ module Concurrent
5050
end
5151

5252
describe '#broken?' do
53-
it 'should not be broken when created'
54-
it 'should not be broken when reset is called without waiting thread'
53+
it 'should not be broken when created' do
54+
barrier.broken?.should eq false
55+
end
56+
57+
it 'should not be broken when reset is called without waiting thread' do
58+
barrier.reset
59+
barrier.broken?.should eq false
60+
end
61+
5562
it 'should be broken when at least one thread timed out'
5663
it 'should be restored when reset is called'
5764
end
@@ -74,18 +81,61 @@ module Concurrent
7481
latch = CountDownLatch.new(parties)
7582

7683
parties.times { Thread.new { barrier.wait; latch.count_down } }
77-
latch.wait(0.2).should be_true
84+
latch.wait(0.1).should be_true
7885
barrier.number_waiting.should eq 0
7986
end
8087

81-
it 'executes the block'
88+
it 'returns true when released' do
89+
latch = CountDownLatch.new(parties)
90+
91+
parties.times { Thread.new { latch.count_down if barrier.wait == true } }
92+
latch.wait(0.1).should be_true
93+
end
94+
95+
it 'executes the block once' do
96+
counter = AtomicFixnum.new
97+
barrier = described_class.new(parties) { counter.increment }
98+
99+
latch = CountDownLatch.new(parties)
100+
101+
parties.times { Thread.new { latch.count_down if barrier.wait == true } }
102+
latch.wait(0.1).should be_true
103+
104+
counter.value.should eq 1
105+
end
82106
end
83107

84108
context 'with timeout' do
85-
it 'should block the thread'
86-
it 'should release all threads when their number matches the desired one'
87-
it 'can return early and break the barrier'
88-
it 'does not execute the block on timeout'
109+
context 'timeout not expiring' do
110+
it 'should block the thread' do
111+
t = Thread.new { barrier.wait(1) }
112+
sleep(0.1)
113+
114+
t.status.should eq 'sleep'
115+
end
116+
117+
it 'should release all threads when their number matches the desired one' do
118+
latch = CountDownLatch.new(parties)
119+
120+
parties.times { Thread.new { barrier.wait(1); latch.count_down } }
121+
latch.wait(0.2).should be_true
122+
barrier.number_waiting.should eq 0
123+
end
124+
125+
it 'returns true when released' do
126+
latch = CountDownLatch.new(parties)
127+
128+
parties.times { Thread.new { latch.count_down if barrier.wait(1) == true } }
129+
latch.wait(0.1).should be_true
130+
end
131+
end
132+
133+
context 'timeout expiring' do
134+
135+
it 'returns false'
136+
it 'can return early and break the barrier'
137+
it 'does not execute the block on timeout'
138+
end
89139
end
90140
end
91141

0 commit comments

Comments
 (0)