Skip to content

Commit 87fb2a3

Browse files
committed
wip: RubyThreadPoolExecutor now uses Exector for stop events.
1 parent 9736110 commit 87fb2a3

File tree

3 files changed

+59
-62
lines changed

3 files changed

+59
-62
lines changed

lib/concurrent/executor/executor.rb

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,35 @@ module Concurrent
88

99
module Executor
1010

11+
# Submit a task to the thread pool for asynchronous processing.
12+
#
13+
# @param [Array] args zero or more arguments to be passed to the task
14+
#
15+
# @yield the asynchronous task to perform
16+
#
17+
# @return [Boolean] `true` if the task is queued, `false` if the thread pool
18+
# is not running
19+
#
20+
# @raise [ArgumentError] if no task is given
21+
def post(*args, &task)
22+
raise ArgumentError.new('no block given') unless block_given?
23+
mutex.synchronize do
24+
break false unless running?
25+
execute(*args, &task)
26+
true
27+
end
28+
end
29+
30+
# Submit a task to the thread pool for asynchronous processing.
31+
#
32+
# @param [Proc] task the asynchronous task to perform
33+
#
34+
# @return [self] returns itself
35+
def <<(task)
36+
post(&task)
37+
self
38+
end
39+
1140
# Is the thread pool running?
1241
#
1342
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
@@ -41,9 +70,6 @@ def wait_for_termination(timeout)
4170
@stopped_event.wait(timeout.to_i)
4271
end
4372

44-
def post(*args, &task)
45-
end
46-
4773
protected
4874

4975
attr_reader :mutex, :stop_event, :stopped_event
@@ -55,6 +81,7 @@ def init_executor
5581
end
5682

5783
def execute(*args, &task)
84+
raise NotImplementedError
5885
end
5986
end
6087

lib/concurrent/executor/ruby_single_thread_executor.rb

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,6 @@ def initialize(opts = {})
1717
init_executor
1818
end
1919

20-
# Submit a task to the thread pool for asynchronous processing.
21-
#
22-
# @param [Array] args zero or more arguments to be passed to the task
23-
#
24-
# @yield the asynchronous task to perform
25-
#
26-
# @return [Boolean] `true` if the task is queued, `false` if the thread pool
27-
# is not running
28-
#
29-
# @raise [ArgumentError] if no task is given
30-
def post(*args, &task)
31-
raise ArgumentError.new('no block given') unless block_given?
32-
mutex.synchronize do
33-
break false unless running?
34-
supervise
35-
@queue << [args, task]
36-
true
37-
end
38-
end
39-
4020
# Begin an orderly shutdown. Tasks already in the queue will be executed,
4121
# but no new tasks will be accepted. Has no additional effect if the
4222
# thread pool is not running.
@@ -65,6 +45,11 @@ def kill
6545

6646
protected
6747

48+
def execute(*args, &task)
49+
supervise
50+
@queue << [args, task]
51+
end
52+
6853
def alive?
6954
@thread && @thread.alive?
7055
end

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 24 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,11 @@ def initialize(opts = {})
8787
raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
8888
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
8989

90-
@state = :running
90+
init_executor
91+
9192
@pool = []
92-
@terminator = Event.new
93+
@stopped_event = Event.new
9394
@queue = Queue.new
94-
@mutex = Mutex.new
9595
@scheduled_task_count = 0
9696
@completed_task_count = 0
9797
@largest_length = 0
@@ -104,8 +104,8 @@ def initialize(opts = {})
104104
#
105105
# @return [Integer] the length
106106
def length
107-
@mutex.synchronize do
108-
@state != :shutdown ? @pool.length : 0
107+
mutex.synchronize do
108+
running? ? @pool.length : 0
109109
end
110110
end
111111
alias_method :current_length, :length
@@ -122,29 +122,15 @@ def queue_length
122122
#
123123
# @return [Integer] the remaining_capacity
124124
def remaining_capacity
125-
@mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
126-
end
127-
128-
# Is the thread pool running?
129-
#
130-
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
131-
def running?
132-
@mutex.synchronize { @state == :running }
125+
mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
133126
end
134127

135128
# Returns an array with the status of each thread in the pool
136129
#
137130
# This method is deprecated and will be removed soon.
138131
def status
139132
warn '[DEPRECATED] `status` is deprecated and will be removed soon.'
140-
@mutex.synchronize { @pool.collect { |worker| worker.status } }
141-
end
142-
143-
# Is the thread pool shutdown?
144-
#
145-
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
146-
def shutdown?
147-
@mutex.synchronize { @state != :running }
133+
mutex.synchronize { @pool.collect { |worker| worker.status } }
148134
end
149135

150136
# Block until thread pool shutdown is complete or until `timeout` seconds have
@@ -157,7 +143,7 @@ def shutdown?
157143
#
158144
# @return [Boolean] `true` if shutdown complete or false on `timeout`
159145
def wait_for_termination(timeout)
160-
return @terminator.wait(timeout.to_i)
146+
return @stopped_event.wait(timeout.to_i)
161147
end
162148

163149
# Submit a task to the thread pool for asynchronous processing.
@@ -172,8 +158,8 @@ def wait_for_termination(timeout)
172158
# @raise [ArgumentError] if no task is given
173159
def post(*args, &task)
174160
raise ArgumentError.new('no block given') unless block_given?
175-
@mutex.synchronize do
176-
break false unless @state == :running
161+
mutex.synchronize do
162+
break false unless running?
177163
return handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
178164
@scheduled_task_count += 1
179165
@queue << [args, task]
@@ -190,14 +176,13 @@ def post(*args, &task)
190176
# but no new tasks will be accepted. Has no additional effect if the
191177
# thread pool is not running.
192178
def shutdown
193-
@mutex.synchronize do
194-
break unless @state == :running
179+
mutex.synchronize do
180+
break unless running?
195181
@queue.clear
182+
stop_event.set
196183
if @pool.empty?
197-
@state = :shutdown
198-
@terminator.set
184+
@stopped_event.set
199185
else
200-
@state = :shuttingdown
201186
@pool.length.times{ @queue << :stop }
202187
end
203188
end
@@ -208,34 +193,34 @@ def shutdown
208193
# will be accepted. Has no additional effect if the thread pool is
209194
# not running.
210195
def kill
211-
@mutex.synchronize do
212-
break if @state == :shutdown
196+
mutex.synchronize do
197+
return if shutdown?
198+
stop_event.set
213199
@queue.clear
214-
@state = :shutdown
215200
drain_pool
216-
@terminator.set
201+
stopped_event.set
217202
end
218203
end
219204

220205
# Run on task completion.
221206
#
222207
# @!visibility private
223208
def on_end_task # :nodoc:
224-
@mutex.synchronize do
209+
mutex.synchronize do
225210
@completed_task_count += 1 #if success
226-
break unless @state == :running
211+
break unless running?
227212
end
228213
end
229214

230215
# Run when a thread worker exits.
231216
#
232217
# @!visibility private
233218
def on_worker_exit(worker) # :nodoc:
234-
@mutex.synchronize do
219+
mutex.synchronize do
235220
@pool.delete(worker)
236-
if @pool.empty? && @state != :running
237-
@state = :shutdown
238-
@terminator.set
221+
if @pool.empty? && ! running?
222+
stop_event.set
223+
stopped_event.set
239224
end
240225
end
241226
end

0 commit comments

Comments
 (0)