Skip to content

Commit b96e048

Browse files
committed
Use lock-free-queue in throttling and improve API
1 parent bd38e33 commit b96e048

File tree

4 files changed

+140
-26
lines changed

4 files changed

+140
-26
lines changed

lib/concurrent-edge.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
require 'concurrent/edge/atomic_markable_reference'
1010
require 'concurrent/edge/lock_free_linked_set'
11+
require 'concurrent/edge/lock_free_queue'
1112

1213
require 'concurrent/edge/promises'
1314
require 'concurrent/edge/cancellation'
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
module Concurrent
2+
3+
class LockFreeQueue < Synchronization::Object
4+
5+
class Node < Synchronization::Object
6+
attr_atomic :successor
7+
8+
def initialize(item, successor)
9+
super()
10+
# published through queue, no need to be volatile or final
11+
@Item = item
12+
self.successor = successor
13+
end
14+
15+
def item
16+
@Item
17+
end
18+
end
19+
20+
safe_initialization!
21+
22+
attr_atomic :head, :tail
23+
24+
def initialize
25+
super()
26+
dummy_node = Node.new(:dummy, nil)
27+
28+
self.head = dummy_node
29+
self.tail = dummy_node
30+
end
31+
32+
def push(item)
33+
# allocate a new node with the item embedded
34+
new_node = Node.new(item, nil)
35+
36+
# keep trying until the operation succeeds
37+
while true
38+
current_tail_node = tail
39+
current_tail_successor = current_tail_node.successor
40+
41+
# if our stored tail is still the current tail
42+
if current_tail_node == tail
43+
# if that tail was really the last node
44+
if current_tail_successor.nil?
45+
# if we can update the previous successor of tail to point to this new node
46+
if current_tail_node.compare_and_set_successor(nil, new_node)
47+
# then update tail to point to this node as well
48+
compare_and_set_tail(current_tail_node, new_node)
49+
# and return
50+
return true
51+
# else, start the loop over
52+
end
53+
else
54+
# in this case, the tail ref we had wasn't the real tail
55+
# so we try to set its successor as the real tail, then start the loop again
56+
compare_and_set_tail(current_tail_node, current_tail_successor)
57+
end
58+
end
59+
end
60+
end
61+
62+
def pop
63+
# retry until some value can be returned
64+
while true
65+
# the value in @head is just a dummy node that always sits in that position,
66+
# the real 'head' is in its successor
67+
current_dummy_node = head
68+
current_tail_node = tail
69+
70+
current_head_node = current_dummy_node.successor
71+
72+
# if our local head is still consistent with the head node, continue
73+
# otherwise, start over
74+
if current_dummy_node == head
75+
# if either the queue is empty, or falling behind
76+
if current_dummy_node == current_tail_node
77+
# if there's nothing after the 'dummy' head node
78+
if current_head_node.nil?
79+
# just return nil
80+
return nil
81+
else
82+
# here the head element succeeding head is not nil, but the head and tail are equal
83+
# so tail is falling behind, update it, then start over
84+
compare_and_set_tail(current_tail_node, current_head_node)
85+
end
86+
87+
# the queue isn't empty
88+
# if we can set the dummy head to the 'real' head, we're free to return the value in that real head, success
89+
elsif compare_and_set_head(current_dummy_node, current_head_node)
90+
# grab the item from the popped node
91+
item = current_head_node.item
92+
93+
# return it, success!
94+
return item
95+
end
96+
end
97+
end
98+
end
99+
100+
# approximate
101+
def size
102+
successor = head.successor
103+
count = 0
104+
105+
while true
106+
break if successor.nil?
107+
108+
current_node = successor
109+
successor = current_node.successor
110+
count += 1
111+
end
112+
113+
count
114+
end
115+
end
116+
end

lib/concurrent/edge/promises.rb

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1967,50 +1967,47 @@ class Promises::Throttle < Synchronization::Object
19671967
def initialize(max)
19681968
super()
19691969
self.can_run = max
1970-
# TODO (pitr-ch 10-Jun-2016): lock-free queue is needed
1971-
@Queue = Queue.new
1970+
@Queue = LockFreeQueue.new
19721971
end
19731972

1974-
def limit(future = nil, &block)
1975-
if future
1976-
# future.chain { block.call(new_trigger & future).on_resolution! { done } }.flat
1977-
block.call(new_trigger & future).on_resolution! { done }
1973+
def throttle(future = nil, &throttled_future)
1974+
if block_given?
1975+
trigger = future ? (new_trigger & future) : new_trigger
1976+
throttled_future.call(trigger).on_resolution! { done }
19781977
else
1979-
if block_given?
1980-
block.call(new_trigger).on_resolution! { done }
1981-
else
1982-
new_trigger
1983-
end
1978+
new_trigger
19841979
end
19851980
end
19861981

1987-
# TODO (pitr-ch 10-Oct-2016): maybe just then?
1988-
def then_limit(&block)
1989-
limit { |trigger| trigger.then &block }
1982+
def then_throttle(&task)
1983+
throttle { |trigger| trigger.then &task }
19901984
end
19911985

1986+
private
1987+
19921988
def done
19931989
while true
19941990
current_can_run = can_run
19951991
if compare_and_set_can_run current_can_run, current_can_run + 1
1996-
@Queue.pop.resolve if current_can_run < 0
1992+
if current_can_run <= 0
1993+
Thread.pass until (trigger = @Queue.pop)
1994+
trigger.resolve
1995+
end
19971996
return self
19981997
end
19991998
end
20001999
end
20012000

2002-
private
2003-
20042001
def new_trigger
20052002
while true
20062003
current_can_run = can_run
20072004
if compare_and_set_can_run current_can_run, current_can_run - 1
20082005
if current_can_run > 0
20092006
return Promises.resolved_event
20102007
else
2011-
e = Promises.resolvable_event
2012-
@Queue.push e
2013-
return e
2008+
event = Promises.resolvable_event
2009+
@Queue.push event
2010+
return event
20142011
end
20152012
end
20162013
end
@@ -2020,7 +2017,7 @@ def new_trigger
20202017
class Promises::AbstractEventFuture < Synchronization::Object
20212018

20222019
def throttle(throttle, &throttled_future)
2023-
throttle.limit(self, &throttled_future)
2020+
throttle.throttle(self, &throttled_future)
20242021
end
20252022

20262023
def then_throttle(throttle, &block)

spec/concurrent/edge/promises_spec.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -507,21 +507,21 @@ def behaves_as_delay(delay, value)
507507

508508
expect(Concurrent::Promises.zip(
509509
*12.times.map do |i|
510-
max_tree.limit { |trigger| trigger.then &testing }
511-
end).value!.all? { |v| v < 3 }).to be_truthy
510+
max_tree.throttle { |trigger| trigger.then &testing }
511+
end).value!.all? { |v| v <= 3 }).to be_truthy
512512

513513
expect(Concurrent::Promises.zip(
514514
*12.times.map do |i|
515515
Concurrent::Promises.
516516
fulfilled_future(i).
517517
throttle(max_tree) { |trigger| trigger.then &testing }
518-
end).value!.all? { |v| v < 3 }).to be_truthy
518+
end).value!.all? { |v| v <= 3 }).to be_truthy
519519
end
520520

521521
specify do
522522
max_five = Concurrent::Promises::Throttle.new 5
523523
jobs = 20.times.map do |i|
524-
max_five.limit do |trigger|
524+
max_five.throttle do |trigger|
525525
# trigger is an event, has same chain-able capabilities as current promise
526526
trigger.then do
527527
# at any given time there max 5 simultaneous executions of this block
@@ -537,7 +537,7 @@ def behaves_as_delay(delay, value)
537537
specify do
538538
max_five = Concurrent::Promises::Throttle.new 5
539539
jobs = 20.times.map do |i|
540-
max_five.then_limit do
540+
max_five.then_throttle do
541541
# at any given time there max 5 simultaneous executions of this block
542542
the_work = i * 2
543543
end # returns promise

0 commit comments

Comments
 (0)