Skip to content

Commit 297758e

Browse files
committed
Merge pull request #42 from jdantonio/global-thread-pool
Global thread pool is now a thread pool executor.
2 parents 915aaa2 + ddad044 commit 297758e

27 files changed

+507
-338
lines changed

demos/global_thread_pool-demo.rb

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
require 'concurrent'
2+
require 'open-uri'
3+
4+
def get_year_end_closing(symbol, year)
5+
uri = "http://ichart.finance.yahoo.com/table.csv?s=#{symbol}&a=11&b=01&c=#{year}&d=11&e=31&f=#{year}&g=m"
6+
data = open(uri) {|f| f.collect{|line| line.strip } }
7+
price = data[1].split(',')[4]
8+
price.to_f
9+
[symbol, price.to_f]
10+
end
11+
12+
def get_top_stock(symbols, year, timeout = 5)
13+
stock_prices = symbols.collect{|symbol| Concurrent::dataflow{ get_year_end_closing(symbol, year) }}
14+
Concurrent::dataflow(*stock_prices) { |*prices|
15+
prices.reduce(['', 0.0]){|highest, price| price.last > highest.last ? price : highest}
16+
}.value(timeout)
17+
end
18+
19+
def timer(*args)
20+
return 0,nil unless block_given?
21+
t1 = Time.now
22+
result = yield(*args)
23+
t2 = Time.now
24+
return (t2 - t1), result
25+
end
26+
27+
def strftimer(seconds) # :nodoc:
28+
Time.at(seconds).gmtime.strftime('%R:%S.%L')
29+
end
30+
31+
s_and_p_500 = ['MMM','ABT','ABBV','ACE','ACN','ACT','ADBE','ADT','AES','AET','AFL','A','GAS','APD','ARG','AKAM','AA','ALXN','ATI','ALLE','AGN','ADS','ALL','ALTR','MO','AMZN','AEE','AEP','AXP','AIG','AMT','AMP','ABC','AME','AMGN','APH','APC','ADI','AON','APA','AIV','AAPL','AMAT','ADM','AIZ','T','ADSK','ADP','AN','AZO','AVB','AVY','AVP','BHI','BLL','BAC','BK','BCR','BAX','BBT','BEAM','BDX','BBBY','BMS','BRK.B','BBY','BIIB','BLK','HRB','BA','BWA','BXP','BSX','BMY','BRCM','BF.B','CHRW','CA','CVC','COG','CAM','CPB','COF','CAH','CFN','KMX','CCL','CAT','CBG','CBS','CELG','CNP','CTL','CERN','CF','SCHW','CHK','CVX','CMG','CB','CI','CINF','CTAS','CSCO','C','CTXS','CLF','CLX','CME','CMS','COH','KO','CCE','CTSH','CL','CMCSA','CMA','CSC','CAG','COP','CNX','ED','STZ','GLW','COST','COV','CCI','CSX','CMI','CVS','DHI','DHR','DRI','DVA','DE','DLPH','DAL','DNR','XRAY','DVN','DO','DTV','DFS','DISCA','DG','DLTR','D','DOV','DOW','DPS','DTE','DD','DUK','DNB','ETFC','EMN','ETN','EBAY','ECL','EIX','EW','EA','EMC','EMR','ESV','ETR','EOG','EQT','EFX','EQR','EL','EXC','EXPE','EXPD','ESRX','XOM','FFIV','FB','FDO','FAST','FDX','FIS','FITB','FSLR','FE','FISV','FLIR','FLS','FLR','FMC','FTI','F','FRX','FOSL','BEN','FCX','FTR','GME','GCI','GPS','GRMN','GD','GE','GGP','GIS','GM','GPC','GNW','GILD','GS','GT','GOOG','GWW','HAL','HOG','HAR','HRS','HIG','HAS','HCP','HCN','HP','HES','HPQ','HD','HON','HRL','HSP','HST','HCBK','HUM','HBAN','ITW','IR','TEG','INTC','ICE','IBM','IGT','IP','IPG','IFF','INTU','ISRG','IVZ','IRM','JBL','JEC','JNJ','JCI','JOY','JPM','JNPR','KSU','K','KEY','GMCR','KMB','KIM','KMI','KLAC','KSS','KRFT','KR','LB','LLL','LH','LRCX','LM','LEG','LEN','LUK','LLY','LNC','LLTC','LMT','L','LO','LOW','LSI','LYB','MTB','MAC','M','MRO','MPC','MAR','MMC','MAS','MA','MAT','MKC','MCD','MHFI','MCK','MJN','MWV','MDT','MRK','MET','MCHP','MU','MSFT','MHK','TAP','MDLZ','MON','MNST','MCO','MS','MOS','MSI','MUR','MYL','NBR','NDAQ','NOV','NTAP','NFLX','NWL','NFX','NEM','NWSA','NEE','NLSN','NKE','NI','NE','NBL','JWN','NSC','NTRS','NOC','NU','NRG','NUE','NVDA','KORS','ORLY','OXY','OMC','OKE','ORCL','OI','PCG','PCAR','PLL','PH','PDCO','PAYX','BTU','PNR','PBCT','POM','PEP','PKI','PRGO','PETM','PFE','PM','PSX','PNW','PXD','PBI','PCL','PNC','RL','PPG','PPL','PX','PCP','PCLN','PFG','PG','PGR','PLD','PRU','PEG','PSA','PHM','PVH','QEP','PWR','QCOM','DGX','RRC','RTN','RHT','REGN','RF','RSG','RAI','RHI','ROK','COL','ROP','ROST','RDC','R','SWY','CRM','SNDK','SCG','SLB','SNI','STX','SEE','SRE','SHW','SIAL','SPG','SLM','SJM','SNA','SO','LUV','SWN','SE','STJ','SWK','SPLS','SBUX','HOT','STT','SRCL','SYK','STI','SYMC','SYY','TROW','TGT','TEL','TE','THC','TDC','TSO','TXN','TXT','HSY','TRV','TMO','TIF','TWX','TWC','TJX','TMK','TSS','TSCO','RIG','TRIP','FOXA','TSN','TYC','USB','UNP','UNH','UPS','X','UTX','UNM','URBN','VFC','VLO','VAR','VTR','VRSN','VZ','VRTX','VIAB','V','VNO','VMC','WMT','WAG','DIS','GHC','WM','WAT','WLP','WFC','WDC','WU','WY','WHR','WFM','WMB','WIN','WEC','WYN','WYNN','XEL','XRX','XLNX','XL','XYL','YHOO','YUM','ZMH','ZION','ZTS']
32+
year = 2013
33+
34+
#puts "Starting the sequential calculation..."
35+
#time, prices = timer do
36+
#s_and_p_500.inject({}){|memo, symbol| memo[symbol] = get_year_end_closing(symbol, year).last; memo }
37+
#end
38+
#puts "Sequential time: #{strftimer(time)}"
39+
40+
puts "Starting the concurrent calculation..."
41+
time, prices = timer do
42+
futures = s_and_p_500.collect{|symbol| Concurrent::Future.execute{ get_year_end_closing(symbol, year) } }
43+
futures.inject({}){|memo, future| memo[future.value.first] = future.value.last; memo }
44+
end
45+
puts "Concurrent time: #{strftimer(time)}"

lib/concurrent.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
require 'concurrent/immediate_executor'
3838
require 'concurrent/per_thread_executor'
3939
require 'concurrent/thread_pool_executor'
40-
require 'concurrent/uses_global_thread_pool'
4140

4241
# Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell,
4342
# F#, C#, Java, and classic concurrency patterns.

lib/concurrent/agent.rb

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
require 'thread'
22
require 'observer'
33

4+
require 'concurrent/configuration'
45
require 'concurrent/dereferenceable'
5-
require 'concurrent/uses_global_thread_pool'
66
require 'concurrent/utilities'
77

88
module Concurrent
@@ -34,7 +34,7 @@ module Concurrent
3434
# @return [Fixnum] the maximum number of seconds before an update is cancelled
3535
class Agent
3636
include Dereferenceable
37-
include UsesGlobalThreadPool
37+
include OptionsParser
3838

3939
# The default timeout value (in seconds); used when no timeout option
4040
# is given at initialization
@@ -46,7 +46,15 @@ class Agent
4646
#
4747
# @param [Object] initial the initial value
4848
# @param [Hash] opts the options used to define the behavior at update and deref
49+
#
4950
# @option opts [Fixnum] :timeout (TIMEOUT) maximum number of seconds before an update is cancelled
51+
#
52+
# @option opts [Boolean] :operation (false) when +true+ will execute the future on the global
53+
# operation pool (for long-running operations), when +false+ will execute the future on the
54+
# global task pool (for short-running tasks)
55+
# @option opts [object] :executor when provided will run all operations on
56+
# this executor rather than the global thread pool (overrides :operation)
57+
#
5058
# @option opts [String] :dup_on_deref (false) call +#dup+ before returning the data
5159
# @option opts [String] :freeze_on_deref (false) call +#freeze+ before returning the data
5260
# @option opts [String] :copy_on_deref (nil) call the given +Proc+ passing the internal value and
@@ -57,6 +65,7 @@ def initialize(initial, opts = {})
5765
@validator = Proc.new { |result| true }
5866
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
5967
@observers = CopyOnWriteObserverSet.new
68+
@executor = get_executor_from(opts)
6069
init_mutex
6170
set_deref_options(opts)
6271
end
@@ -116,7 +125,7 @@ def validate(&block)
116125
# @yieldparam [Object] value the current value
117126
# @yieldreturn [Object] the new value
118127
def post(&block)
119-
Agent.thread_pool.post{ work(&block) } unless block.nil?
128+
@executor.post{ work(&block) } unless block.nil?
120129
end
121130

122131
# Update the current value with the result of the given block operation

lib/concurrent/async.rb

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
require 'thread'
2+
require 'concurrent/configuration'
23
require 'concurrent/ivar'
34
require 'concurrent/future'
5+
require 'concurrent/thread_pool_executor'
46

57
module Concurrent
68

@@ -169,8 +171,9 @@ class AsyncDelegator # :nodoc:
169171
#
170172
# @param [Object] delegate the object to wrap and delegate method calls to
171173
# @param [Mutex] mutex the mutex lock to use when delegating method calls
172-
def initialize(delegate, mutex)
174+
def initialize(delegate, executor, mutex)
173175
@delegate = delegate
176+
@executor = executor
174177
@mutex = mutex
175178
end
176179

@@ -191,7 +194,7 @@ def method_missing(method, *args, &block)
191194

192195
self.define_singleton_method(method) do |*args|
193196
Async::validate_argc(@delegate, method, *args)
194-
Concurrent::Future.execute do
197+
Concurrent::Future.execute(executor: @executor) do
195198
mutex.synchronize do
196199
@delegate.send(method, *args, &block)
197200
end
@@ -237,7 +240,7 @@ def method_missing(method, *args, &block)
237240
#
238241
# @see Concurrent::Future
239242
def async
240-
@__async_delegator__ ||= AsyncDelegator.new(self, await.mutex)
243+
@__async_delegator__ ||= AsyncDelegator.new(self, executor, await.mutex)
241244
end
242245
alias_method :future, :async
243246

@@ -272,5 +275,16 @@ def await
272275
@__await_delegator__ ||= AwaitDelegator.new(self, Mutex.new)
273276
end
274277
alias_method :delay, :await
278+
279+
def executor=(executor)
280+
raise ArgumentError.new('executor has already been set') unless @__async__executor__.nil?
281+
@__async__executor__ = executor
282+
end
283+
284+
private
285+
286+
def executor
287+
@__async__executor__ ||= Concurrent.configuration.global_task_pool
288+
end
275289
end
276290
end

lib/concurrent/configuration.rb

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
require 'concurrent/per_thread_executor'
1+
require 'concurrent/thread_pool_executor'
2+
require 'concurrent/processor_count'
23

34
module Concurrent
45
class << self
@@ -10,13 +11,89 @@ def self.configure
1011
end
1112

1213
class Configuration
13-
attr_accessor :global_thread_pool
14+
attr_accessor :global_task_pool
15+
attr_accessor :global_operation_pool
1416

1517
def initialize
16-
@global_thread_pool = Concurrent::PerThreadExecutor.new
18+
end
19+
20+
def cores
21+
@cores ||= Concurrent::processor_count
22+
end
23+
24+
def global_task_pool
25+
@global_task_pool ||= Concurrent::ThreadPoolExecutor.new(
26+
min_threads: [2, cores].max,
27+
max_threads: [20, cores * 15].max,
28+
idletime: 2 * 60, # 2 minutes
29+
max_queue: 0, # unlimited
30+
overflow_policy: :abort # raise an exception
31+
)
32+
end
33+
34+
def global_operation_pool
35+
@global_operation_pool = Concurrent::ThreadPoolExecutor.new(
36+
min_threads: [2, cores].max,
37+
max_threads: [2, cores].max,
38+
idletime: 10 * 60, # 10 minutes
39+
max_queue: [20, cores * 15].max,
40+
overflow_policy: :abort # raise an exception
41+
)
42+
end
43+
44+
def global_task_pool=(executor)
45+
finalize_executor(@global_task_pool)
46+
@global_task_pool = executor
47+
end
48+
49+
def global_operation_pool=(executor)
50+
finalize_executor(@global_operation_pool)
51+
@global_operation_pool = executor
52+
end
53+
54+
private
55+
56+
def finalize_executor(executor)
57+
return if executor.nil?
58+
if executor.respond_to?(:shutdown)
59+
executor.shutdown
60+
elsif executor.respond_to?(:kill)
61+
executor.kill
62+
end
63+
rescue
64+
# suppress
65+
end
66+
end
67+
68+
module OptionsParser
69+
70+
def get_executor_from(opts = {})
71+
if opts.has_key?(:executor)
72+
opts[:executor]
73+
elsif opts[:operation] == true || opts[:task] == false
74+
Concurrent.configuration.global_operation_pool
75+
else
76+
Concurrent.configuration.global_task_pool
77+
end
1778
end
1879
end
1980

81+
def task(*args, &block)
82+
Concurrent.configuration.global_task_pool.post(*args, &block)
83+
end
84+
module_function :task
85+
86+
def operation(*args, &block)
87+
Concurrent.configuration.global_operation_pool.post(*args, &block)
88+
end
89+
module_function :operation
90+
2091
# create the default configuration on load
2192
self.configuration = Configuration.new
93+
94+
# set exit hook to shutdown global thread pools
95+
at_exit do
96+
self.configuration.global_task_pool = nil
97+
self.configuration.global_operation_pool = nil
98+
end
2299
end

lib/concurrent/dataflow.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
require 'concurrent/atomic'
22
require 'concurrent/future'
3+
require 'concurrent/per_thread_executor'
34

45
module Concurrent
56

lib/concurrent/future.rb

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
require 'thread'
22

3+
require 'concurrent/configuration'
34
require 'concurrent/obligation'
4-
require 'concurrent/uses_global_thread_pool'
55
require 'concurrent/safe_task_executor'
66

77
module Concurrent
@@ -41,13 +41,18 @@ module Concurrent
4141
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html java.util.concurrent.Future
4242
class Future < IVar
4343
include Obligation
44-
include UsesGlobalThreadPool
44+
include OptionsParser
4545

4646
# Create a new +Future+ in the +:unscheduled+ state.
4747
#
4848
# @yield the asynchronous operation to perform
4949
#
5050
# @param [Hash] opts the options to create a message with
51+
# @option opts [Boolean] :operation (false) when +true+ will execute the future on the global
52+
# operation pool (for long-running operations), when +false+ will execute the future on the
53+
# global task pool (for short-running tasks)
54+
# @option opts [object] :executor when provided will run all operations on
55+
# this executor rather than the global thread pool (overrides :operation)
5156
# @option opts [String] :dup_on_deref (false) call +#dup+ before returning the data
5257
# @option opts [String] :freeze_on_deref (false) call +#freeze+ before returning the data
5358
# @option opts [String] :copy_on_deref (nil) call the given +Proc+ passing the internal value and
@@ -59,6 +64,7 @@ def initialize(opts = {}, &block)
5964
super(IVar::NO_VALUE, opts)
6065
@state = :unscheduled
6166
@task = block
67+
@executor = get_executor_from(opts)
6268
end
6369

6470
# Execute an +:unscheduled+ +Future+. Immediately sets the state to +:pending+ and
@@ -80,7 +86,7 @@ def initialize(opts = {}, &block)
8086
# @since 0.5.0
8187
def execute
8288
if compare_and_set_state(:pending, :unscheduled)
83-
Future.thread_pool.post { work }
89+
@executor.post{ work }
8490
self
8591
end
8692
end
@@ -105,7 +111,7 @@ def execute
105111
#
106112
# @since 0.5.0
107113
def self.execute(opts = {}, &block)
108-
return Future.new(opts, &block).execute
114+
Future.new(opts, &block).execute
109115
end
110116

111117
protected :set, :fail, :complete
@@ -117,6 +123,5 @@ def work # :nodoc:
117123
success, val, reason = SafeTaskExecutor.new(@task).execute
118124
complete(success, val, reason)
119125
end
120-
121126
end
122127
end

lib/concurrent/java_cached_thread_pool.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ def initialize(opts = {})
3636
idletime, java.util.concurrent.TimeUnit::SECONDS,
3737
java.util.concurrent.SynchronousQueue.new,
3838
OVERFLOW_POLICIES[@overflow_policy].new)
39+
40+
at_exit { self.kill }
3941
end
4042
end
4143
end

lib/concurrent/java_fixed_thread_pool.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ def initialize(num_threads, opts = {})
2828
@max_queue, java.util.concurrent.TimeUnit::SECONDS,
2929
java.util.concurrent.LinkedBlockingQueue.new,
3030
OVERFLOW_POLICIES[@overflow_policy].new)
31+
32+
at_exit { self.kill }
3133
end
3234
end
3335
end

lib/concurrent/java_thread_pool_executor.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ def initialize(opts = {})
6060
min_length, max_length,
6161
idletime, java.util.concurrent.TimeUnit::SECONDS,
6262
queue, OVERFLOW_POLICIES[@overflow_policy].new)
63+
64+
at_exit { self.kill }
6365
end
6466

6567
def min_length

0 commit comments

Comments
 (0)