Skip to content

Commit fe52dde

Browse files
committed
Rewritten TimerTask now an Executor instead of a Runnable.
1 parent fc5637e commit fe52dde

File tree

2 files changed

+212
-110
lines changed

2 files changed

+212
-110
lines changed

lib/concurrent/timer_task.rb

Lines changed: 172 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
require 'thread'
2-
31
require 'concurrent/dereferenceable'
42
require 'concurrent/observable'
3+
require 'concurrent/atomic/atomic_fixnum'
4+
require 'concurrent/executor/executor'
5+
require 'concurrent/executor/safe_task_executor'
6+
7+
# deprecated Updated to use `Executor` instead of `Runnable`
58
require 'concurrent/runnable'
6-
require 'concurrent/stoppable'
79

810
module Concurrent
911

@@ -144,148 +146,216 @@ module Concurrent
144146
# @see http://docs.oracle.com/javase/7/docs/api/java/util/TimerTask.html
145147
class TimerTask
146148
include Dereferenceable
149+
include Executor
147150
include Concurrent::Observable
148-
include Runnable
149-
include Stoppable
150151

151-
# Default `:execution_interval`
152+
# Default `:execution_interval` in seconds.
152153
EXECUTION_INTERVAL = 60
153154

154-
# Default `:timeout_interval`
155+
# Default `:timeout_interval` in seconds.
155156
TIMEOUT_INTERVAL = 30
156157

157-
# Number of seconds after the task completes before the task is
158-
# performed again.
159-
attr_reader :execution_interval
160-
161-
# Number of seconds the task can run before it is considered to have failed.
162-
# Failed tasks are forcibly killed.
163-
attr_reader :timeout_interval
164-
165158
# Create a new TimerTask with the given task and configuration.
166159
#
167-
# @param [Hash] opts the options defining task execution.
168-
# @option opts [Integer] :execution_interval number of seconds between
169-
# task executions (default: EXECUTION_INTERVAL)
170-
# @option opts [Integer] :timeout_interval number of seconds a task can
171-
# run before it is considered to have failed (default: TIMEOUT_INTERVAL)
172-
# @option opts [Boolean] :run_now Whether to run the task immediately
173-
# upon instantiation or to wait until the first #execution_interval
174-
# has passed (default: false)
175-
#
176-
# @raise ArgumentError when no block is given.
177-
#
178-
# @yield to the block after :execution_interval seconds have passed since
179-
# the last yield
180-
# @yieldparam task a reference to the `TimerTask` instance so that the
181-
# block can control its own lifecycle. Necessary since `self` will
182-
# refer to the execution context of the block rather than the running
183-
# `TimerTask`.
160+
# @!macro [attach] timer_task_initialize
161+
# @param [Hash] opts the options defining task execution.
162+
# @option opts [Integer] :execution_interval number of seconds between
163+
# task executions (default: EXECUTION_INTERVAL)
164+
# @option opts [Integer] :timeout_interval number of seconds a task can
165+
# run before it is considered to have failed (default: TIMEOUT_INTERVAL)
166+
# @option opts [Boolean] :run_now Whether to run the task immediately
167+
# upon instantiation or to wait until the first # execution_interval
168+
# has passed (default: false)
169+
#
170+
# @raise ArgumentError when no block is given.
171+
#
172+
# @yield to the block after :execution_interval seconds have passed since
173+
# the last yield
174+
# @yieldparam task a reference to the `TimerTask` instance so that the
175+
# block can control its own lifecycle. Necessary since `self` will
176+
# refer to the execution context of the block rather than the running
177+
# `TimerTask`.
178+
#
179+
# @note Calls Concurrent::Dereferenceable# set_deref_options passing `opts`.
180+
# All options supported by Concurrent::Dereferenceable can be set
181+
# during object initialization.
184182
#
185-
# @note Calls Concurrent::Dereferenceable#set_deref_options passing `opts`.
186-
# All options supported by Concurrent::Dereferenceable can be set
187-
# during object initialization.
188-
#
189-
# @see Concurrent::Dereferenceable#set_deref_options
190-
def initialize(opts = {}, &block)
183+
# @return [TimerTask] the new `TimerTask`
184+
#
185+
# @see Concurrent::Dereferenceable# set_deref_options
186+
def initialize(opts = {}, &task)
191187
raise ArgumentError.new('no block given') unless block_given?
192188

189+
init_executor
190+
set_deref_options(opts)
191+
193192
self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
194193
self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL
195194
@run_now = opts[:now] || opts[:run_now]
195+
@executor = Concurrent::SafeTaskExecutor.new(task)
196+
@running = Concurrent::AtomicFixnum.new(0)
196197

197-
@task = block
198198
self.observers = CopyOnWriteObserverSet.new
199-
init_mutex
200-
set_deref_options(opts)
201199
end
202200

203-
# Number of seconds after the task completes before the task is
204-
# performed again.
201+
# Is the executor running?
202+
#
203+
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
204+
def running?
205+
@running.value == 1
206+
end
207+
208+
# Execute a previously created `TimerTask`.
205209
#
206-
# @param [Float] value number of seconds
210+
# @return [TimerTask] a reference to `self`
207211
#
208-
# @raise ArgumentError when value is non-numeric or not greater than zero
209-
def execution_interval=(value)
210-
if (value = value.to_f) <= 0.0
211-
raise ArgumentError.new("'execution_interval' must be non-negative number")
212+
# @example Instance and execute in separate steps
213+
# task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }
214+
# task.running? #=> false
215+
# task.execute
216+
# task.running? #=> true
217+
#
218+
# @example Instance and execute in one line
219+
# task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }.execute
220+
# task.running? #=> true
221+
#
222+
# @since 0.6.0
223+
def execute
224+
mutex.synchronize do
225+
if @running.value == 0
226+
@running.value = 1
227+
schedule_next_task(@run_now ? 0 : @execution_interval)
228+
end
212229
end
213-
@execution_interval = value
230+
self
214231
end
215232

216-
# Number of seconds the task can run before it is considered to have failed.
217-
# Failed tasks are forcibly killed.
233+
# Create and execute a new `TimerTask`.
218234
#
219-
# @param [Float] value number of seconds
235+
# @!macro timer_task_initialize
220236
#
221-
# @raise ArgumentError when value is non-numeric or not greater than zero
222-
def timeout_interval=(value)
237+
# @example
238+
# task = Concurrent::TimerTask.execute(execution_interval: 10){ print "Hello World\n" }
239+
# task.running? #=> true
240+
#
241+
# @since 0.6.0
242+
def self.execute(opts = {}, &task)
243+
TimerTask.new(opts, &task).execute
244+
end
245+
246+
# @!attribute [rw] execution_interval
247+
# @return [Fixnum] Number of seconds after the task completes before the
248+
# task is performed again.
249+
def execution_interval
250+
mutex.synchronize{ @execution_interval }
251+
end
252+
253+
# @!attribute [rw] execution_interval
254+
# @return [Fixnum] Number of seconds after the task completes before the
255+
# task is performed again.
256+
def execution_interval=(value)
223257
if (value = value.to_f) <= 0.0
224-
raise ArgumentError.new("'timeout_interval' must be non-negative number")
258+
raise ArgumentError.new('must be greater than zero')
259+
else
260+
mutex.synchronize{ @execution_interval = value }
225261
end
226-
@timeout_interval = value
227262
end
228263

229-
# Terminate with extreme prejudice. Useful in cases where `#stop` doesn't
230-
# work because one of the threads becomes unresponsive.
231-
#
232-
# @return [Boolean] indicating whether or not the `TimerTask` was killed
233-
#
234-
# @note Do not use this method unless `#stop` has failed.
235-
def kill
236-
return true unless running?
237-
mutex.synchronize do
238-
@running = false
239-
Thread.kill(@worker) unless @worker.nil?
240-
Thread.kill(@monitor) unless @monitor.nil?
264+
# @!attribute [rw] timeout_interval
265+
# @return [Fixnum] Number of seconds the task can run before it is
266+
# considered to have failed.
267+
def timeout_interval
268+
mutex.synchronize{ @timeout_interval }
269+
end
270+
271+
# @!attribute [rw] timeout_interval
272+
# @return [Fixnum] Number of seconds the task can run before it is
273+
# considered to have failed.
274+
def timeout_interval=(value)
275+
if (value = value.to_f) <= 0.0
276+
raise ArgumentError.new('must be greater than zero')
277+
else
278+
mutex.synchronize{ @timeout_interval = value }
241279
end
242-
return true
243-
rescue
244-
return false
245-
ensure
246-
@worker = @monitor = nil
247280
end
248-
alias_method :terminate, :kill
249281

250-
alias_method :cancel, :stop
282+
# @deprecated Updated to use `Executor` instead of `Runnable`
283+
def terminate(*args) deprecated(:terminate, :kill, *args); end
284+
285+
# @deprecated Updated to use `Executor` instead of `Runnable`
286+
def stop(*args) deprecated(:stop, :shutdown, *args); end
287+
288+
# @deprecated Updated to use `Executor` instead of `Runnable`
289+
def cancel(*args) deprecated(:cancel, :shutdown, *args); end
290+
291+
# @deprecated Updated to use `Executor` instead of `Runnable`
292+
def run!(*args) deprecated(:run!, :execute); end
293+
294+
# @deprecated Updated to use `Executor` instead of `Runnable`
295+
def self.run!(*args, &block)
296+
warn "[DEPRECATED] `run!` is deprecated, please use `execute` instead."
297+
Concurrent::Runnable::Context.new(TimerTask.new(*args, &block))
298+
end
299+
300+
# @deprecated Updated to use `Executor` instead of `Runnable`
301+
def run
302+
raise Concurrent::Runnable::LifecycleError.new('already running') if @running.value == 1
303+
self.execute
304+
self.wait_for_termination
305+
true
306+
end
307+
308+
private :post, :<<
251309

252310
protected
253311

254-
def on_run # :nodoc:
255-
@monitor = Thread.current
312+
# @!visibility private
313+
def shutdown_execution
314+
@running.value = 0
315+
super
256316
end
257317

258-
def on_stop # :nodoc:
259-
before_stop_proc.call if before_stop_proc
260-
@monitor.wakeup if @monitor.alive?
261-
Thread.pass
318+
# @!visibility private
319+
def kill_execution
320+
@running.value = 0
321+
super
262322
end
263323

264-
def on_task # :nodoc:
265-
if @run_now
266-
@run_now = false
267-
else
268-
sleep(@execution_interval)
269-
end
270-
execute_task
324+
# @!visibility private
325+
def schedule_next_task(interval = execution_interval)
326+
Concurrent::timer(interval, Concurrent::Event.new, &method(:execute_task))
271327
end
272328

273-
def execute_task # :nodoc:
274-
@value = ex = nil
275-
@worker = Thread.new do
276-
Thread.current.abort_on_exception = false
277-
Thread.current[:result] = @task.call(self)
329+
# @!visibility private
330+
def execute_task(completion)
331+
return unless @running.value == 1
332+
Concurrent::timer(timeout_interval, completion, &method(:timeout_task))
333+
success, value, reason = @executor.execute(self)
334+
if completion.try?
335+
self.value = value
336+
schedule_next_task
337+
time = Time.now
338+
observers.notify_observers do
339+
[time, self.value, reason]
340+
end
278341
end
279-
raise TimeoutError if @worker.join(@timeout_interval).nil?
280-
mutex.synchronize { @value = @worker[:result] }
281-
rescue Exception => e
282-
ex = e
283-
ensure
284-
observers.notify_observers(Time.now, self.value, ex)
285-
unless @worker.nil?
286-
Thread.kill(@worker)
287-
@worker = nil
342+
end
343+
344+
# @!visibility private
345+
def timeout_task(completion)
346+
return unless @running.value == 1
347+
if completion.try?
348+
self.value = value
349+
schedule_next_task
350+
observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new)
288351
end
289352
end
353+
354+
# @deprecated Updated to use `Executor` instead of `Runnable`
355+
# @!visibility private
356+
def deprecated(old, new, *args)
357+
warn "[DEPRECATED] `#{old}` is deprecated, please use `#{new}` instead."
358+
self.send(new, *args)
359+
end
290360
end
291361
end

0 commit comments

Comments
 (0)