Skip to content

Commit 78af7c6

Browse files
author
Petr Chalupa
committed
Merge pull request #90 from jdantonio/operator
reopening "Add Delay#reconfigure and Fix configuration to be thread safe"
2 parents 3c7faaa + 97123b9 commit 78af7c6

File tree

6 files changed

+229
-170
lines changed

6 files changed

+229
-170
lines changed

lib/concurrent/atomic/event.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
require 'thread'
2-
require 'concurrent/utilities'
32
require 'concurrent/atomic/condition'
43

54
module Concurrent

lib/concurrent/configuration.rb

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require 'thread'
2+
require 'concurrent/delay'
23
require 'concurrent/executor/thread_pool_executor'
34
require 'concurrent/executor/timer_set'
45
require 'concurrent/utility/processor_count'
@@ -8,57 +9,28 @@ module Concurrent
89
# An error class to be raised when errors occur during configuration.
910
ConfigurationError = Class.new(StandardError)
1011

11-
class << self
12-
attr_accessor :configuration
13-
end
14-
15-
# Perform gem-level configuration.
16-
#
17-
# @yield the configuration commands
18-
# @yieldparam [Configuration] the current configuration object
19-
def self.configure
20-
(@mutex ||= Mutex.new).synchronize do
21-
yield(configuration)
22-
23-
# initialize the global thread pools if necessary
24-
configuration.global_task_pool
25-
configuration.global_operation_pool
26-
configuration.global_timer_set
27-
end
28-
end
29-
3012
# A gem-level configuration object.
3113
class Configuration
3214

3315
# Create a new configuration object.
3416
def initialize
35-
@cores ||= Concurrent::processor_count
17+
@global_task_pool = Delay.new { new_task_pool }
18+
@global_operation_pool = Delay.new { new_operation_pool }
19+
@global_timer_set = Delay.new { Concurrent::TimerSet.new }
3620
end
3721

3822
# Global thread pool optimized for short *tasks*.
3923
#
4024
# @return [ThreadPoolExecutor] the thread pool
4125
def global_task_pool
42-
@global_task_pool ||= Concurrent::ThreadPoolExecutor.new(
43-
min_threads: [2, @cores].max,
44-
max_threads: [20, @cores * 15].max,
45-
idletime: 2 * 60, # 2 minutes
46-
max_queue: 0, # unlimited
47-
overflow_policy: :abort # raise an exception
48-
)
26+
@global_task_pool.value
4927
end
5028

5129
# Global thread pool optimized for long *operations*.
5230
#
5331
# @return [ThreadPoolExecutor] the thread pool
5432
def global_operation_pool
55-
@global_operation_pool ||= Concurrent::ThreadPoolExecutor.new(
56-
min_threads: [2, @cores].max,
57-
max_threads: [2, @cores].max,
58-
idletime: 10 * 60, # 10 minutes
59-
max_queue: [20, @cores * 15].max,
60-
overflow_policy: :abort # raise an exception
61-
)
33+
@global_operation_pool.value
6234
end
6335

6436
# Global thread pool optimized for *timers*
@@ -67,7 +39,7 @@ def global_operation_pool
6739
#
6840
# @see Concurrent::timer
6941
def global_timer_set
70-
@global_timer_set ||= Concurrent::TimerSet.new
42+
@global_timer_set.value
7143
end
7244

7345
# Global thread pool optimized for short *tasks*.
@@ -85,8 +57,8 @@ def global_timer_set
8557
#
8658
# @raise [ConfigurationError] if this thread pool has already been set
8759
def global_task_pool=(executor)
88-
raise ConfigurationError.new('global task pool was already set') unless @global_task_pool.nil?
89-
@global_task_pool = executor
60+
@global_task_pool.reconfigure { executor } or
61+
raise ConfigurationError.new('global task pool was already set')
9062
end
9163

9264
# Global thread pool optimized for long *operations*.
@@ -104,9 +76,41 @@ def global_task_pool=(executor)
10476
#
10577
# @raise [ConfigurationError] if this thread pool has already been set
10678
def global_operation_pool=(executor)
107-
raise ConfigurationError.new('global operation pool was already set') unless @global_operation_pool.nil?
108-
@global_operation_pool = executor
79+
@global_operation_pool.reconfigure { executor } or
80+
raise ConfigurationError.new('global operation pool was already set')
10981
end
82+
83+
def new_task_pool
84+
Concurrent::ThreadPoolExecutor.new(
85+
min_threads: [2, Concurrent.processor_count].max,
86+
max_threads: [20, Concurrent.processor_count * 15].max,
87+
idletime: 2 * 60, # 2 minutes
88+
max_queue: 0, # unlimited
89+
overflow_policy: :abort # raise an exception
90+
)
91+
end
92+
93+
def new_operation_pool
94+
Concurrent::ThreadPoolExecutor.new(
95+
min_threads: [2, Concurrent.processor_count].max,
96+
max_threads: [2, Concurrent.processor_count].max,
97+
idletime: 10 * 60, # 10 minutes
98+
max_queue: [20, Concurrent.processor_count * 15].max,
99+
overflow_policy: :abort # raise an exception
100+
)
101+
end
102+
end
103+
104+
# create the default configuration on load
105+
@configuration = Configuration.new
106+
singleton_class.send :attr_reader, :configuration
107+
108+
# Perform gem-level configuration.
109+
#
110+
# @yield the configuration commands
111+
# @yieldparam [Configuration] the current configuration object
112+
def self.configure
113+
yield(configuration)
110114
end
111115

112116
private
@@ -129,8 +133,6 @@ def self.finalize_executor(executor)
129133
false
130134
end
131135

132-
# create the default configuration on load
133-
self.configuration = Configuration.new
134136

135137
# set exit hook to shutdown global thread pools
136138
at_exit do

lib/concurrent/delay.rb

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require 'thread'
2+
require 'concurrent/obligation'
23

34
module Concurrent
45

@@ -48,7 +49,7 @@ def initialize(opts = {}, &block)
4849

4950
init_obligation
5051
@state = :pending
51-
@task = block
52+
@task = block
5253
set_deref_options(opts)
5354
end
5455

@@ -78,18 +79,34 @@ def value
7879
mutex.unlock
7980
end
8081

82+
# reconfigures the block returning the value if still #incomplete?
83+
# @yield the delayed operation to perform
84+
# @returns [true, false] if success
85+
def reconfigure(&block)
86+
mutex.lock
87+
raise ArgumentError.new('no block given') unless block_given?
88+
if @state == :pending
89+
@task = block
90+
true
91+
else
92+
false
93+
end
94+
ensure
95+
mutex.unlock
96+
end
97+
8198
private
8299

83-
def execute_task_once
84-
if @state == :pending
85-
begin
86-
@value = @task.call
87-
@state = :fulfilled
88-
rescue => ex
89-
@reason = ex
90-
@state = :rejected
91-
end
100+
def execute_task_once
101+
if @state == :pending
102+
begin
103+
@value = @task.call
104+
@state = :fulfilled
105+
rescue => ex
106+
@reason = ex
107+
@state = :rejected
92108
end
93109
end
110+
end
94111
end
95112
end

0 commit comments

Comments
 (0)