1
1
require 'thread'
2
+ require 'concurrent/delay'
2
3
require 'concurrent/executor/thread_pool_executor'
3
4
require 'concurrent/executor/timer_set'
4
5
require 'concurrent/utility/processor_count'
@@ -32,33 +33,43 @@ class Configuration
32
33
33
34
# Create a new configuration object.
34
35
def initialize
35
- @cores ||= Concurrent ::processor_count
36
+ @cores = Concurrent ::processor_count
37
+
38
+ @global_task_pool = Delay . new do
39
+ Concurrent ::ThreadPoolExecutor . new (
40
+ min_threads : [ 2 , @cores ] . max ,
41
+ max_threads : [ 20 , @cores * 15 ] . max ,
42
+ idletime : 2 * 60 , # 2 minutes
43
+ max_queue : 0 , # unlimited
44
+ overflow_policy : :abort # raise an exception
45
+ )
46
+ end
47
+
48
+ @global_operation_pool = Delay . new do
49
+ Concurrent ::ThreadPoolExecutor . new (
50
+ min_threads : [ 2 , @cores ] . max ,
51
+ max_threads : [ 2 , @cores ] . max ,
52
+ idletime : 10 * 60 , # 10 minutes
53
+ max_queue : [ 20 , @cores * 15 ] . max ,
54
+ overflow_policy : :abort # raise an exception
55
+ )
56
+ end
57
+
58
+ @global_timer_set = Delay . new { Concurrent ::TimerSet . new }
36
59
end
37
60
38
61
# Global thread pool optimized for short *tasks*.
39
62
#
40
63
# @return [ThreadPoolExecutor] the thread pool
41
64
def global_task_pool
42
- @global_task_pool ||= Concurrent ::ThreadPoolExecutor . new (
43
- min_threads : [ 2 , @cores ] . max ,
44
- max_threads : [ 20 , @cores * 15 ] . max ,
45
- idletime : 2 * 60 , # 2 minutes
46
- max_queue : 0 , # unlimited
47
- overflow_policy : :abort # raise an exception
48
- )
65
+ @global_task_pool . value
49
66
end
50
67
51
68
# Global thread pool optimized for long *operations*.
52
69
#
53
70
# @return [ThreadPoolExecutor] the thread pool
54
71
def global_operation_pool
55
- @global_operation_pool ||= Concurrent ::ThreadPoolExecutor . new (
56
- min_threads : [ 2 , @cores ] . max ,
57
- max_threads : [ 2 , @cores ] . max ,
58
- idletime : 10 * 60 , # 10 minutes
59
- max_queue : [ 20 , @cores * 15 ] . max ,
60
- overflow_policy : :abort # raise an exception
61
- )
72
+ @global_operation_pool . value
62
73
end
63
74
64
75
# Global thread pool optimized for *timers*
@@ -67,7 +78,7 @@ def global_operation_pool
67
78
#
68
79
# @see Concurrent::timer
69
80
def global_timer_set
70
- @global_timer_set ||= Concurrent :: TimerSet . new
81
+ @global_timer_set . value
71
82
end
72
83
73
84
# Global thread pool optimized for short *tasks*.
@@ -85,8 +96,8 @@ def global_timer_set
85
96
#
86
97
# @raise [ConfigurationError] if this thread pool has already been set
87
98
def global_task_pool = ( executor )
88
- raise ConfigurationError . new ( 'global task pool was already set' ) unless @global_task_pool . nil?
89
- @global_task_pool = executor
99
+ @global_task_pool . reconfigure { executor } or
100
+ raise ConfigurationError . new ( 'global task pool was already set' )
90
101
end
91
102
92
103
# Global thread pool optimized for long *operations*.
@@ -104,8 +115,8 @@ def global_task_pool=(executor)
104
115
#
105
116
# @raise [ConfigurationError] if this thread pool has already been set
106
117
def global_operation_pool = ( executor )
107
- raise ConfigurationError . new ( 'global operation pool was already set' ) unless @global_operation_pool . nil?
108
- @global_operation_pool = executor
118
+ @global_operation_pool . reconfigure { executor } or
119
+ raise ConfigurationError . new ( 'global operation pool was already set' )
109
120
end
110
121
end
111
122
0 commit comments