Skip to content

Commit 72a5035

Browse files
committed
first working implementation
1 parent 5b9215d commit 72a5035

File tree

2 files changed

+137
-17
lines changed

2 files changed

+137
-17
lines changed

lib/concurrent/atomic/cyclic_barrier.rb

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@ module Concurrent
22

33
class CyclicBarrier
44

5+
BrokenError = Class.new(StandardError)
6+
7+
Generation = Struct.new(:status)
8+
private_constant :Generation
9+
510
# Create a new `CyclicBarrier` that waits for `parties` threads
611
#
712
# @param [Fixnum] parties the number of parties
@@ -15,6 +20,7 @@ def initialize(parties, &block)
1520
@condition = Condition.new
1621
@number_waiting = 0
1722
@action = block
23+
@generation = Generation.new(:waiting)
1824
end
1925

2026
# @return [Fixnum] the number of threads needed to pass the barrier
@@ -30,19 +36,37 @@ def number_waiting
3036
# Blocks on the barrier until the number of waiting threads is equal to `parties` or until `timeout` is reached or `reset` is called
3137
# If a block has been passed to the constructor, it will be executed once by the last arrived thread before releasing the others
3238
# @param [Fixnum] timeout the number of seconds to wait for the counter or `nil` to block indefinitely
33-
# @return [Boolean] `true` if the `count` reaches zero else false on `timeout` or on `reset`
39+
# @return [Boolean] `true` if the `count` reaches zero else false on `timeout` or on `reset` or on broken event
3440
def wait(timeout = nil)
3541
@mutex.synchronize do
42+
43+
return false unless @generation.status == :waiting
44+
45+
generation = @generation
46+
3647
@number_waiting += 1
48+
3749
if @number_waiting == @parties
3850
@action.call if @action
51+
generation.status = :fulfilled
52+
@generation = Generation.new(:waiting)
3953
@condition.broadcast
4054
@number_waiting = 0
55+
true
4156
else
42-
@condition.wait(@mutex, timeout)
43-
end
57+
remaining = Condition::Result.new(timeout)
58+
while generation.status == :waiting && remaining.can_wait?
59+
remaining = @condition.wait(@mutex, remaining.remaining_time)
60+
end
4461

45-
true
62+
if remaining.woken_up?
63+
return true
64+
else
65+
generation.status = :broken
66+
@condition.broadcast
67+
return false
68+
end
69+
end
4670
end
4771
end
4872

@@ -52,6 +76,12 @@ def wait(timeout = nil)
5276
#
5377
# @return [nil]
5478
def reset
79+
@mutex.synchronize do
80+
@generation.status = :reset
81+
@condition.broadcast
82+
@generation = Generation.new(:waiting)
83+
@number_waiting = 0
84+
end
5585
end
5686

5787
# A barrier can be broken when:
@@ -61,7 +91,7 @@ def reset
6191
# A broken barrier can be restored using `reset` it's safer to create a new one
6292
# @return [Boolean] true if the barrier is broken otherwise false
6393
def broken?
64-
false
94+
@mutex.synchronize { @generation.status != :waiting }
6595
end
6696

6797
end

spec/concurrent/atomic/cyclic_barrier_spec.rb

Lines changed: 102 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,20 @@ module Concurrent
5858
barrier.reset
5959
barrier.broken?.should eq false
6060
end
61-
62-
it 'should be broken when at least one thread timed out'
63-
it 'should be restored when reset is called'
6461
end
6562

6663
describe 'reset' do
67-
it 'should release all waiting threads'
68-
it 'should not execute the block'
64+
it 'should release all waiting threads' do
65+
latch = CountDownLatch.new(1)
66+
67+
Thread.new { barrier.wait; latch.count_down }
68+
sleep(0.1)
69+
barrier.reset
70+
latch.wait(0.1).should be_true
71+
72+
barrier.should_not be_broken
73+
barrier.number_waiting.should eq 0
74+
end
6975
end
7076

7177
describe '#wait' do
@@ -83,6 +89,7 @@ module Concurrent
8389
parties.times { Thread.new { barrier.wait; latch.count_down } }
8490
latch.wait(0.1).should be_true
8591
barrier.number_waiting.should eq 0
92+
barrier.should_not be_broken
8693
end
8794

8895
it 'returns true when released' do
@@ -103,6 +110,16 @@ module Concurrent
103110

104111
counter.value.should eq 1
105112
end
113+
114+
it 'can be reused' do
115+
first_latch = CountDownLatch.new(parties)
116+
parties.times { Thread.new { barrier.wait; first_latch.count_down } }
117+
first_latch.wait(0.1).should be_true
118+
119+
latch = CountDownLatch.new(parties)
120+
parties.times { Thread.new { barrier.wait; latch.count_down } }
121+
latch.wait(0.1).should be_true
122+
end
106123
end
107124

108125
context 'with timeout' do
@@ -132,18 +149,91 @@ module Concurrent
132149

133150
context 'timeout expiring' do
134151

135-
it 'returns false'
136-
it 'can return early and break the barrier'
137-
it 'does not execute the block on timeout'
152+
it 'returns false' do
153+
latch = CountDownLatch.new(1)
154+
155+
Thread.new { latch.count_down if barrier.wait(0.1) == false }
156+
latch.wait(0.2).should be_true
157+
end
158+
159+
it 'breaks the barrier and release all other threads' do
160+
latch = CountDownLatch.new(2)
161+
162+
Thread.new { barrier.wait(0.1); latch.count_down }
163+
Thread.new { barrier.wait; latch.count_down }
164+
165+
latch.wait(0.2).should be_true
166+
barrier.should be_broken
167+
end
168+
169+
it 'does not execute the block on timeout' do
170+
counter = AtomicFixnum.new
171+
barrier = described_class.new(parties) { counter.increment }
172+
173+
barrier.wait(0.1)
174+
175+
counter.value.should eq 0
176+
end
138177
end
139178
end
140-
end
141179

142-
context 'spurious wakeups' do
143-
it 'should resist'
180+
context '#broken barrier' do
181+
it 'should not accept new threads' do
182+
Thread.new { barrier.wait(0.1) }
183+
sleep(0.2)
184+
185+
barrier.should be_broken
186+
187+
barrier.wait.should be_false
188+
end
189+
190+
it 'can be reset' do
191+
Thread.new { barrier.wait(0.1) }
192+
sleep(0.2)
193+
194+
barrier.should be_broken
195+
196+
barrier.reset
197+
198+
barrier.should_not be_broken
199+
end
200+
end
144201
end
145202

146-
end
203+
context 'spurious wake ups' do
204+
205+
before(:each) do
206+
def barrier.simulate_spurious_wake_up
207+
@mutex.synchronize do
208+
@condition.signal
209+
@condition.broadcast
210+
end
211+
end
212+
end
213+
214+
it 'should resist to spurious wake ups without timeout' do
215+
@expected = false
216+
Thread.new { barrier.wait; @expected = true }
217+
218+
sleep(0.1)
219+
barrier.simulate_spurious_wake_up
220+
221+
sleep(0.1)
222+
@expected.should be_false
223+
end
147224

225+
it 'should resist to spurious wake ups with timeout' do
226+
@expected = false
227+
Thread.new { barrier.wait(0.5); @expected = true }
228+
229+
sleep(0.1)
230+
barrier.simulate_spurious_wake_up
231+
232+
sleep(0.1)
233+
@expected.should be_false
234+
235+
end
236+
end
237+
end
148238

149239
end

0 commit comments

Comments
 (0)