Skip to content

Commit 3c80ad3

Browse files
committed
Merge pull request #58 from jdantonio/cyclic_barrier
Cyclic barrier
2 parents 483fc31 + 33ca56e commit 3c80ad3

File tree

3 files changed

+355
-0
lines changed

3 files changed

+355
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
module Concurrent
2+
3+
class CyclicBarrier
4+
5+
Generation = Struct.new(:status)
6+
private_constant :Generation
7+
8+
# Create a new `CyclicBarrier` that waits for `parties` threads
9+
#
10+
# @param [Fixnum] parties the number of parties
11+
# @yield an optional block that will be executed that will be executed after the last thread arrives and before the others are released
12+
#
13+
# @raise [ArgumentError] if `parties` is not an integer or is less than zero
14+
def initialize(parties, &block)
15+
raise ArgumentError.new('count must be in integer greater than or equal zero') if !parties.is_a?(Fixnum) || parties < 1
16+
@parties = parties
17+
@mutex = Mutex.new
18+
@condition = Condition.new
19+
@number_waiting = 0
20+
@action = block
21+
@generation = Generation.new(:waiting)
22+
end
23+
24+
# @return [Fixnum] the number of threads needed to pass the barrier
25+
def parties
26+
@parties
27+
end
28+
29+
# @return [Fixnum] the number of threads currently waiting on the barrier
30+
def number_waiting
31+
@number_waiting
32+
end
33+
34+
# Blocks on the barrier until the number of waiting threads is equal to `parties` or until `timeout` is reached or `reset` is called
35+
# If a block has been passed to the constructor, it will be executed once by the last arrived thread before releasing the others
36+
# @param [Fixnum] timeout the number of seconds to wait for the counter or `nil` to block indefinitely
37+
# @return [Boolean] `true` if the `count` reaches zero else false on `timeout` or on `reset` or if the barrier is broken
38+
def wait(timeout = nil)
39+
@mutex.synchronize do
40+
41+
return false unless @generation.status == :waiting
42+
43+
@number_waiting += 1
44+
45+
if @number_waiting == @parties
46+
@action.call if @action
47+
set_status_and_restore(:fulfilled)
48+
true
49+
else
50+
wait_for_wake_up(@generation, timeout)
51+
end
52+
end
53+
end
54+
55+
56+
57+
# resets the barrier to its initial state
58+
# If there is at least one waiting thread, it will be woken up, the `wait` method will return false and the barrier will be broken
59+
# If the barrier is broken, this method restores it to the original state
60+
#
61+
# @return [nil]
62+
def reset
63+
@mutex.synchronize do
64+
set_status_and_restore(:reset)
65+
end
66+
end
67+
68+
# A barrier can be broken when:
69+
# - a thread called the `reset` method while at least one other thread was waiting
70+
# - at least one thread timed out on `wait` method
71+
#
72+
# A broken barrier can be restored using `reset` it's safer to create a new one
73+
# @return [Boolean] true if the barrier is broken otherwise false
74+
def broken?
75+
@mutex.synchronize { @generation.status != :waiting }
76+
end
77+
78+
private
79+
80+
def set_status_and_restore(new_status)
81+
@generation.status = new_status
82+
@condition.broadcast
83+
@generation = Generation.new(:waiting)
84+
@number_waiting = 0
85+
end
86+
87+
def wait_for_wake_up(generation, timeout)
88+
if wait_while_waiting(generation, timeout)
89+
generation.status == :fulfilled
90+
else
91+
generation.status = :broken
92+
@condition.broadcast
93+
false
94+
end
95+
end
96+
97+
def wait_while_waiting(generation, timeout)
98+
remaining = Condition::Result.new(timeout)
99+
while generation.status == :waiting && remaining.can_wait?
100+
remaining = @condition.wait(@mutex, remaining.remaining_time)
101+
end
102+
remaining.woken_up?
103+
end
104+
105+
end
106+
end

lib/concurrent/atomics.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require 'concurrent/atomic/condition'
44
require 'concurrent/atomic/copy_on_notify_observer_set'
55
require 'concurrent/atomic/copy_on_write_observer_set'
6+
require 'concurrent/atomic/cyclic_barrier'
67
require 'concurrent/atomic/count_down_latch'
78
require 'concurrent/atomic/event'
89
require 'concurrent/atomic/thread_local_var'
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
require 'spec_helper'
2+
3+
module Concurrent
4+
5+
describe CyclicBarrier do
6+
7+
let(:parties) { 3 }
8+
let!(:barrier) { described_class.new(3) }
9+
10+
context '#initialize' do
11+
12+
it 'raises an exception if the initial count is less than 1' do
13+
expect {
14+
described_class.new(0)
15+
}.to raise_error(ArgumentError)
16+
end
17+
18+
it 'raises an exception if the initial count is not an integer' do
19+
expect {
20+
described_class.new('foo')
21+
}.to raise_error(ArgumentError)
22+
end
23+
end
24+
25+
describe '#parties' do
26+
27+
it 'should be the value passed to the constructor' do
28+
barrier.parties.should eq 3
29+
end
30+
31+
end
32+
33+
describe '#number_waiting' do
34+
context 'without any waiting thread' do
35+
it 'should be equal to zero' do
36+
barrier.number_waiting.should eq 0
37+
end
38+
end
39+
40+
context 'with waiting threads' do
41+
it 'should be equal to the waiting threads count' do
42+
Thread.new { barrier.wait }
43+
Thread.new { barrier.wait }
44+
45+
sleep(0.1)
46+
47+
barrier.number_waiting.should eq 2
48+
end
49+
end
50+
end
51+
52+
describe '#broken?' do
53+
it 'should not be broken when created' do
54+
barrier.broken?.should eq false
55+
end
56+
57+
it 'should not be broken when reset is called without waiting thread' do
58+
barrier.reset
59+
barrier.broken?.should eq false
60+
end
61+
end
62+
63+
describe 'reset' do
64+
it 'should release all waiting threads' do
65+
latch = CountDownLatch.new(1)
66+
67+
Thread.new { barrier.wait; latch.count_down }
68+
sleep(0.1)
69+
barrier.reset
70+
latch.wait(0.1).should be_true
71+
72+
barrier.should_not be_broken
73+
barrier.number_waiting.should eq 0
74+
end
75+
end
76+
77+
describe '#wait' do
78+
context 'without timeout' do
79+
it 'should block the thread' do
80+
t = Thread.new { barrier.wait }
81+
sleep(0.1)
82+
83+
t.status.should eq 'sleep'
84+
end
85+
86+
it 'should release all threads when their number matches the desired one' do
87+
latch = CountDownLatch.new(parties)
88+
89+
parties.times { Thread.new { barrier.wait; latch.count_down } }
90+
latch.wait(0.1).should be_true
91+
barrier.number_waiting.should eq 0
92+
barrier.should_not be_broken
93+
end
94+
95+
it 'returns true when released' do
96+
latch = CountDownLatch.new(parties)
97+
98+
parties.times { Thread.new { latch.count_down if barrier.wait == true } }
99+
latch.wait(0.1).should be_true
100+
end
101+
102+
it 'executes the block once' do
103+
counter = AtomicFixnum.new
104+
barrier = described_class.new(parties) { counter.increment }
105+
106+
latch = CountDownLatch.new(parties)
107+
108+
parties.times { Thread.new { latch.count_down if barrier.wait == true } }
109+
latch.wait(0.1).should be_true
110+
111+
counter.value.should eq 1
112+
end
113+
114+
it 'can be reused' do
115+
first_latch = CountDownLatch.new(parties)
116+
parties.times { Thread.new { barrier.wait; first_latch.count_down } }
117+
first_latch.wait(0.1).should be_true
118+
119+
latch = CountDownLatch.new(parties)
120+
parties.times { Thread.new { barrier.wait; latch.count_down } }
121+
latch.wait(0.1).should be_true
122+
end
123+
124+
it 'return false if barrier has been reset' do
125+
latch = CountDownLatch.new(1)
126+
127+
Thread.new { latch.count_down if barrier.wait == false }
128+
sleep(0.1)
129+
barrier.reset
130+
latch.wait(0.1).should be_true
131+
end
132+
end
133+
134+
context 'with timeout' do
135+
context 'timeout not expiring' do
136+
it 'should block the thread' do
137+
t = Thread.new { barrier.wait(1) }
138+
sleep(0.1)
139+
140+
t.status.should eq 'sleep'
141+
end
142+
143+
it 'should release all threads when their number matches the desired one' do
144+
latch = CountDownLatch.new(parties)
145+
146+
parties.times { Thread.new { barrier.wait(1); latch.count_down } }
147+
latch.wait(0.2).should be_true
148+
barrier.number_waiting.should eq 0
149+
end
150+
151+
it 'returns true when released' do
152+
latch = CountDownLatch.new(parties)
153+
154+
parties.times { Thread.new { latch.count_down if barrier.wait(1) == true } }
155+
latch.wait(0.1).should be_true
156+
end
157+
end
158+
159+
context 'timeout expiring' do
160+
161+
it 'returns false' do
162+
latch = CountDownLatch.new(1)
163+
164+
Thread.new { latch.count_down if barrier.wait(0.1) == false }
165+
latch.wait(0.2).should be_true
166+
end
167+
168+
it 'breaks the barrier and release all other threads' do
169+
latch = CountDownLatch.new(2)
170+
171+
Thread.new { barrier.wait(0.1); latch.count_down }
172+
Thread.new { barrier.wait; latch.count_down }
173+
174+
latch.wait(0.2).should be_true
175+
barrier.should be_broken
176+
end
177+
178+
it 'does not execute the block on timeout' do
179+
counter = AtomicFixnum.new
180+
barrier = described_class.new(parties) { counter.increment }
181+
182+
barrier.wait(0.1)
183+
184+
counter.value.should eq 0
185+
end
186+
end
187+
end
188+
189+
context '#broken barrier' do
190+
it 'should not accept new threads' do
191+
Thread.new { barrier.wait(0.1) }
192+
sleep(0.2)
193+
194+
barrier.should be_broken
195+
196+
barrier.wait.should be_false
197+
end
198+
199+
it 'can be reset' do
200+
Thread.new { barrier.wait(0.1) }
201+
sleep(0.2)
202+
203+
barrier.should be_broken
204+
205+
barrier.reset
206+
207+
barrier.should_not be_broken
208+
end
209+
end
210+
end
211+
212+
context 'spurious wake ups' do
213+
214+
before(:each) do
215+
def barrier.simulate_spurious_wake_up
216+
@mutex.synchronize do
217+
@condition.signal
218+
@condition.broadcast
219+
end
220+
end
221+
end
222+
223+
it 'should resist to spurious wake ups without timeout' do
224+
@expected = false
225+
Thread.new { barrier.wait; @expected = true }
226+
227+
sleep(0.1)
228+
barrier.simulate_spurious_wake_up
229+
230+
sleep(0.1)
231+
@expected.should be_false
232+
end
233+
234+
it 'should resist to spurious wake ups with timeout' do
235+
@expected = false
236+
Thread.new { barrier.wait(0.5); @expected = true }
237+
238+
sleep(0.1)
239+
barrier.simulate_spurious_wake_up
240+
241+
sleep(0.1)
242+
@expected.should be_false
243+
244+
end
245+
end
246+
end
247+
248+
end

0 commit comments

Comments
 (0)