Skip to content

Commit 79c835c

Browse files
committed
Move throttle to its own file
1 parent fa5417e commit 79c835c

File tree

2 files changed

+140
-109
lines changed

2 files changed

+140
-109
lines changed

lib/concurrent/edge/promises.rb

Lines changed: 30 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,35 @@ module Concurrent
1414
# {include:file:doc/promises-main.md}
1515
module Promises
1616

17+
# TODO (pitr-ch 23-Dec-2016): move out
18+
# @!visibility private
19+
module ReInclude
20+
def included(base)
21+
included_into << [:include, base]
22+
super(base)
23+
end
24+
25+
def extended(base)
26+
included_into << [:extend, base]
27+
super(base)
28+
end
29+
30+
def include(*modules)
31+
super(*modules)
32+
modules.reverse.each do |module_being_included|
33+
included_into.each do |method, mod|
34+
mod.send method, module_being_included
35+
end
36+
end
37+
end
38+
39+
private
40+
41+
def included_into
42+
@included_into ||= []
43+
end
44+
end
45+
1746
# @!macro [new] promises.param.default_executor
1847
# @param [Executor, :io, :fast] default_executor Instance of an executor or a name of the
1948
# global executor. Default executor propagates to chained futures unless overridden with
@@ -46,6 +75,7 @@ module Promises
4675
# Container of all {Future}, {Event} factory methods. They are never constructed directly with
4776
# new.
4877
module FactoryMethods
78+
extend ReInclude
4979

5080
# @!macro promises.shortcut.on
5181
# @return [ResolvableEvent]
@@ -1922,115 +1952,6 @@ def then_ask(actor)
19221952
include ActorIntegration
19231953
end
19241954

1925-
# A tool manage concurrency level of future tasks.
1926-
# @example With futures
1927-
# data = (1..5).to_a
1928-
# db = data.reduce({}) { |h, v| h.update v => v.to_s }
1929-
# max_two = Promises.throttle 2
1930-
#
1931-
# futures = data.map do |data|
1932-
# Promises.future(data) { |data|
1933-
# # un-throttled, concurrency level equal data.size
1934-
# data + 1
1935-
# }.then_throttle(max_two, db) { |v, db|
1936-
# # throttled, only 2 tasks executed at the same time
1937-
# # e.g. limiting access to db
1938-
# db[v]
1939-
# }
1940-
# end
1941-
#
1942-
# futures.map(&:value!) # => [2, 3, 4, 5, nil]
1943-
#
1944-
# @example With Threads
1945-
# max_two = Concurrent::Throttle.new 2
1946-
# 5.timse
1947-
class Throttle < Synchronization::Object
1948-
# TODO (pitr-ch 23-Dec-2016): move into different file
1949-
# TODO (pitr-ch 23-Dec-2016): move to Concurrent space
1950-
# TODO (pitr-ch 21-Dec-2016): consider using sized channel for implementation instead when available
1951-
1952-
safe_initialization!
1953-
private *attr_atomic(:can_run)
1954-
1955-
# New throttle.
1956-
# @param [Integer] limit
1957-
def initialize(limit)
1958-
super()
1959-
@Limit = limit
1960-
self.can_run = limit
1961-
@Queue = LockFreeQueue.new
1962-
end
1963-
1964-
# @return [Integer] The limit.
1965-
def limit
1966-
@Limit
1967-
end
1968-
1969-
def trigger
1970-
while true
1971-
current_can_run = can_run
1972-
if compare_and_set_can_run current_can_run, current_can_run - 1
1973-
if current_can_run > 0
1974-
return Promises.resolved_event
1975-
else
1976-
event = Promises.resolvable_event
1977-
@Queue.push event
1978-
return event
1979-
end
1980-
end
1981-
end
1982-
end
1983-
1984-
def release
1985-
while true
1986-
current_can_run = can_run
1987-
if compare_and_set_can_run current_can_run, current_can_run + 1
1988-
if current_can_run < 0
1989-
Thread.pass until (trigger = @Queue.pop)
1990-
trigger.resolve
1991-
end
1992-
return self
1993-
end
1994-
end
1995-
end
1996-
1997-
# @return [String] Short string representation.
1998-
def to_s
1999-
format '<#%s:0x%x limit:%s can_run:%d>', self.class, object_id << 1, @Limit, can_run
2000-
end
2001-
2002-
alias_method :inspect, :to_s
2003-
2004-
module PromisesIntegration
2005-
# TODO (pitr-ch 23-Dec-2016): apply similar pattern elsewhere
2006-
2007-
def throttled(&throttled_futures)
2008-
throttled_futures.call(trigger).on_resolution! { release }
2009-
end
2010-
2011-
def then_throttled(*args, &task)
2012-
trigger.then(*args, &task).on_resolution! { release }
2013-
end
2014-
end
2015-
2016-
include PromisesIntegration
2017-
end
2018-
2019-
class AbstractEventFuture < Synchronization::Object
2020-
module ThrottleIntegration
2021-
def throttled_by(throttle, &throttled_futures)
2022-
a_trigger = throttle.trigger & self
2023-
throttled_futures.call(a_trigger).on_resolution! { throttle.release }
2024-
end
2025-
2026-
def then_throttled_by(throttle, *args, &block)
2027-
throttled_by(throttle) { |trigger| trigger.then(*args, &block) }
2028-
end
2029-
end
2030-
2031-
include ThrottleIntegration
2032-
end
2033-
20341955
### Experimental features follow
20351956

20361957
module FactoryMethods

lib/concurrent/edge/throttle.rb

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
module Concurrent
2+
3+
# A tool manage concurrency level of future tasks.
4+
# @example With futures
5+
# data = (1..5).to_a
6+
# db = data.reduce({}) { |h, v| h.update v => v.to_s }
7+
# max_two = Promises.throttle 2
8+
#
9+
# futures = data.map do |data|
10+
# Promises.future(data) { |data|
11+
# # un-throttled, concurrency level equal data.size
12+
# data + 1
13+
# }.then_throttle(max_two, db) { |v, db|
14+
# # throttled, only 2 tasks executed at the same time
15+
# # e.g. limiting access to db
16+
# db[v]
17+
# }
18+
# end
19+
#
20+
# futures.map(&:value!) # => [2, 3, 4, 5, nil]
21+
#
22+
# @example With Threads
23+
# # TODO (pitr-ch 23-Dec-2016): thread example, add blocking block method for threads
24+
class Throttle < Synchronization::Object
25+
# TODO (pitr-ch 21-Dec-2016): consider using sized channel for implementation instead when available
26+
27+
safe_initialization!
28+
private *attr_atomic(:can_run)
29+
30+
# New throttle.
31+
# @param [Integer] limit
32+
def initialize(limit)
33+
super()
34+
@Limit = limit
35+
self.can_run = limit
36+
@Queue = LockFreeQueue.new
37+
end
38+
39+
# @return [Integer] The limit.
40+
def limit
41+
@Limit
42+
end
43+
44+
def trigger
45+
while true
46+
current_can_run = can_run
47+
if compare_and_set_can_run current_can_run, current_can_run - 1
48+
if current_can_run > 0
49+
return Promises.resolved_event
50+
else
51+
event = Promises.resolvable_event
52+
@Queue.push event
53+
return event
54+
end
55+
end
56+
end
57+
end
58+
59+
def release
60+
while true
61+
current_can_run = can_run
62+
if compare_and_set_can_run current_can_run, current_can_run + 1
63+
if current_can_run < 0
64+
Thread.pass until (trigger = @Queue.pop)
65+
trigger.resolve
66+
end
67+
return self
68+
end
69+
end
70+
end
71+
72+
# @return [String] Short string representation.
73+
def to_s
74+
format '<#%s:0x%x limit:%s can_run:%d>', self.class, object_id << 1, @Limit, can_run
75+
end
76+
77+
alias_method :inspect, :to_s
78+
79+
module PromisesIntegration
80+
81+
def throttled(&throttled_futures)
82+
throttled_futures.call(trigger).on_resolution! { release }
83+
end
84+
85+
def then_throttled(*args, &task)
86+
trigger.then(*args, &task).on_resolution! { release }
87+
end
88+
end
89+
90+
include PromisesIntegration
91+
end
92+
93+
module Promises
94+
95+
class AbstractEventFuture < Synchronization::Object
96+
module ThrottleIntegration
97+
def throttled_by(throttle, &throttled_futures)
98+
a_trigger = throttle.trigger & self
99+
throttled_futures.call(a_trigger).on_resolution! { throttle.release }
100+
end
101+
102+
def then_throttled_by(throttle, *args, &block)
103+
throttled_by(throttle) { |trigger| trigger.then(*args, &block) }
104+
end
105+
end
106+
107+
include ThrottleIntegration
108+
end
109+
end
110+
end

0 commit comments

Comments
 (0)