Skip to content

Commit 97e74ad

Browse files
committed
Add experimental Throttle
1 parent 4b00a78 commit 97e74ad

File tree

2 files changed

+64
-0
lines changed

2 files changed

+64
-0
lines changed

lib/concurrent/edge/promises.rb

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,53 @@ def join(*tokens)
148148
# TODO (pitr-ch 27-Mar-2016): cooperation with mutex, select etc?
149149
# TODO (pitr-ch 27-Mar-2016): examples (scheduled to be cancelled in 10 sec)
150150
end
151+
152+
class Throttle < Synchronization::Object
153+
154+
safe_initialization!
155+
private *attr_atomic(:can_run)
156+
157+
def initialize(max)
158+
super()
159+
self.can_run = max
160+
# TODO (pitr-ch 10-Jun-2016): lockfree gueue is needed
161+
@Queue = Queue.new
162+
end
163+
164+
def limit(ready = nil, &block)
165+
# TODO (pitr-ch 11-Jun-2016): triggers should allocate resources when they are to be required
166+
if block_given?
167+
block.call(get_event).on_completion! { done }
168+
else
169+
get_event
170+
end
171+
end
172+
173+
def done
174+
while true
175+
current_can_run = can_run
176+
if compare_and_set_can_run current_can_run, current_can_run + 1
177+
@Queue.pop.complete if current_can_run < 0
178+
return self
179+
end
180+
end
181+
end
182+
183+
private
184+
185+
def get_event
186+
while true
187+
current_can_run = can_run
188+
if compare_and_set_can_run current_can_run, current_can_run - 1
189+
if current_can_run > 0
190+
return Promises.completed_event
191+
else
192+
e = Promises.completable_event
193+
@Queue.push e
194+
return e
195+
end
196+
end
197+
end
198+
end
199+
end
151200
end

spec/concurrent/promises_spec.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,21 @@ def behaves_as_delay(delay, value)
481481
end
482482
end
483483

484+
describe 'Throttling' do
485+
specify do
486+
throttle = Concurrent::Throttle.new 3
487+
counter = Concurrent::AtomicFixnum.new
488+
expect(Concurrent::Promises.zip(
489+
*12.times.map do |i|
490+
throttle.limit do |trigger|
491+
trigger.then do
492+
counter.increment
493+
sleep 0.01
494+
counter.decrement
495+
end
496+
end
497+
end).value.all? { |v| v < 3 }).to be_truthy
498+
end
484499
end
485500

486501
end

0 commit comments

Comments
 (0)