Skip to content

Commit 33ca56e

Browse files
committed
CyclicBarrier refactor
1 parent a444551 commit 33ca56e

File tree

1 file changed

+32
-24
lines changed

1 file changed

+32
-24
lines changed

lib/concurrent/atomic/cyclic_barrier.rb

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ module Concurrent
22

33
class CyclicBarrier
44

5-
BrokenError = Class.new(StandardError)
6-
75
Generation = Struct.new(:status)
86
private_constant :Generation
97

@@ -42,45 +40,28 @@ def wait(timeout = nil)
4240

4341
return false unless @generation.status == :waiting
4442

45-
generation = @generation
46-
4743
@number_waiting += 1
4844

4945
if @number_waiting == @parties
5046
@action.call if @action
51-
generation.status = :fulfilled
52-
@generation = Generation.new(:waiting)
53-
@condition.broadcast
54-
@number_waiting = 0
47+
set_status_and_restore(:fulfilled)
5548
true
5649
else
57-
remaining = Condition::Result.new(timeout)
58-
while generation.status == :waiting && remaining.can_wait?
59-
remaining = @condition.wait(@mutex, remaining.remaining_time)
60-
end
61-
62-
if remaining.woken_up?
63-
return generation.status == :fulfilled
64-
else
65-
generation.status = :broken
66-
@condition.broadcast
67-
return false
68-
end
50+
wait_for_wake_up(@generation, timeout)
6951
end
7052
end
7153
end
7254

55+
56+
7357
# resets the barrier to its initial state
7458
# If there is at least one waiting thread, it will be woken up, the `wait` method will return false and the barrier will be broken
7559
# If the barrier is broken, this method restores it to the original state
7660
#
7761
# @return [nil]
7862
def reset
7963
@mutex.synchronize do
80-
@generation.status = :reset
81-
@condition.broadcast
82-
@generation = Generation.new(:waiting)
83-
@number_waiting = 0
64+
set_status_and_restore(:reset)
8465
end
8566
end
8667

@@ -94,5 +75,32 @@ def broken?
9475
@mutex.synchronize { @generation.status != :waiting }
9576
end
9677

78+
private
79+
80+
def set_status_and_restore(new_status)
81+
@generation.status = new_status
82+
@condition.broadcast
83+
@generation = Generation.new(:waiting)
84+
@number_waiting = 0
85+
end
86+
87+
def wait_for_wake_up(generation, timeout)
88+
if wait_while_waiting(generation, timeout)
89+
generation.status == :fulfilled
90+
else
91+
generation.status = :broken
92+
@condition.broadcast
93+
false
94+
end
95+
end
96+
97+
def wait_while_waiting(generation, timeout)
98+
remaining = Condition::Result.new(timeout)
99+
while generation.status == :waiting && remaining.can_wait?
100+
remaining = @condition.wait(@mutex, remaining.remaining_time)
101+
end
102+
remaining.woken_up?
103+
end
104+
97105
end
98106
end

0 commit comments

Comments
 (0)