Skip to content

Commit 7c525e2

Browse files
committed
timer task now uses thread safe observer set instead of Observable module
1 parent 487f066 commit 7c525e2

File tree

1 file changed

+13
-10
lines changed

1 file changed

+13
-10
lines changed

lib/concurrent/timer_task.rb

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
module Concurrent
1010

1111
# A very common currency pattern is to run a thread that performs a task at regular
12-
# intervals. The thread that peforms the task sleeps for the given interval then
12+
# intervals. The thread that performs the task sleeps for the given interval then
1313
# wakes up and performs the task. Lather, rinse, repeat... This pattern causes two
14-
# problems. First, it is difficult to test the business logic of the task becuse the
14+
# problems. First, it is difficult to test the business logic of the task because the
1515
# task itself is tightly coupled with the concurrency logic. Second, an exception in
1616
# raised while performing the task can cause the entire thread to abend. In a
1717
# long-running application where the task thread is intended to run for days/weeks/years
@@ -25,7 +25,7 @@ module Concurrent
2525
# performing logging or ancillary operations. +TimerTask+ can also be configured with a
2626
# timeout value allowing it to kill a task that runs too long.
2727
#
28-
# One other advantage of +TimerTask+ is it forces the bsiness logic to be completely decoupled
28+
# One other advantage of +TimerTask+ is it forces the business logic to be completely decoupled
2929
# from the concurrency logic. The business logic can be tested separately then passed to the
3030
# +TimerTask+ for scheduling and running.
3131
#
@@ -147,7 +147,6 @@ class TimerTask
147147
include Dereferenceable
148148
include Runnable
149149
include Stoppable
150-
include Observable
151150

152151
# Default +:execution_interval+
153152
EXECUTION_INTERVAL = 60
@@ -171,7 +170,7 @@ class TimerTask
171170
# @option opts [Integer] :timeout_interval number of seconds a task can
172171
# run before it is considered to have failed (default: TIMEOUT_INTERVAL)
173172
# @option opts [Boolean] :run_now Whether to run the task immediately
174-
# upon instanciation or to wait until the first #execution_interval
173+
# upon instantiation or to wait until the first #execution_interval
175174
# has passed (default: false)
176175
#
177176
# @raise ArgumentError when no block is given.
@@ -193,9 +192,10 @@ def initialize(opts = {}, &block)
193192

194193
self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
195194
self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL
196-
@run_now = opts[:now] || opts[:run_now] || false
195+
@run_now = opts[:now] || opts[:run_now]
197196

198197
@task = block
198+
@observers = CopyOnWriteObserverSet.new
199199
init_mutex
200200
set_deref_options(opts)
201201
end
@@ -226,6 +226,10 @@ def timeout_interval=(value)
226226
@timeout_interval = value
227227
end
228228

229+
def add_observer(observer, func = :update)
230+
@observers.add_observer(observer, func)
231+
end
232+
229233
# Terminate with extreme prejudice. Useful in cases where +#stop+ doesn't
230234
# work because one of the threads becomes unresponsive.
231235
#
@@ -278,11 +282,10 @@ def execute_task # :nodoc:
278282
end
279283
raise TimeoutError if @worker.join(@timeout_interval).nil?
280284
mutex.synchronize { @value = @worker[:result] }
281-
rescue Exception => ex
282-
# suppress
285+
rescue Exception => e
286+
ex = e
283287
ensure
284-
changed
285-
notify_observers(Time.now, self.value, ex)
288+
@observers.notify_observers(Time.now, self.value, ex)
286289
unless @worker.nil?
287290
Thread.kill(@worker)
288291
@worker = nil

0 commit comments

Comments
 (0)