9
9
module Concurrent
10
10
11
11
# A very common currency pattern is to run a thread that performs a task at regular
12
- # intervals. The thread that peforms the task sleeps for the given interval then
12
+ # intervals. The thread that performs the task sleeps for the given interval then
13
13
# wakes up and performs the task. Lather, rinse, repeat... This pattern causes two
14
- # problems. First, it is difficult to test the business logic of the task becuse the
14
+ # problems. First, it is difficult to test the business logic of the task because the
15
15
# task itself is tightly coupled with the concurrency logic. Second, an exception in
16
16
# raised while performing the task can cause the entire thread to abend. In a
17
17
# long-running application where the task thread is intended to run for days/weeks/years
@@ -25,7 +25,7 @@ module Concurrent
25
25
# performing logging or ancillary operations. +TimerTask+ can also be configured with a
26
26
# timeout value allowing it to kill a task that runs too long.
27
27
#
28
- # One other advantage of +TimerTask+ is it forces the bsiness logic to be completely decoupled
28
+ # One other advantage of +TimerTask+ is it forces the business logic to be completely decoupled
29
29
# from the concurrency logic. The business logic can be tested separately then passed to the
30
30
# +TimerTask+ for scheduling and running.
31
31
#
@@ -147,7 +147,6 @@ class TimerTask
147
147
include Dereferenceable
148
148
include Runnable
149
149
include Stoppable
150
- include Observable
151
150
152
151
# Default +:execution_interval+
153
152
EXECUTION_INTERVAL = 60
@@ -171,7 +170,7 @@ class TimerTask
171
170
# @option opts [Integer] :timeout_interval number of seconds a task can
172
171
# run before it is considered to have failed (default: TIMEOUT_INTERVAL)
173
172
# @option opts [Boolean] :run_now Whether to run the task immediately
174
- # upon instanciation or to wait until the first #execution_interval
173
+ # upon instantiation or to wait until the first #execution_interval
175
174
# has passed (default: false)
176
175
#
177
176
# @raise ArgumentError when no block is given.
@@ -193,9 +192,10 @@ def initialize(opts = {}, &block)
193
192
194
193
self . execution_interval = opts [ :execution ] || opts [ :execution_interval ] || EXECUTION_INTERVAL
195
194
self . timeout_interval = opts [ :timeout ] || opts [ :timeout_interval ] || TIMEOUT_INTERVAL
196
- @run_now = opts [ :now ] || opts [ :run_now ] || false
195
+ @run_now = opts [ :now ] || opts [ :run_now ]
197
196
198
197
@task = block
198
+ @observers = CopyOnWriteObserverSet . new
199
199
init_mutex
200
200
set_deref_options ( opts )
201
201
end
@@ -226,6 +226,10 @@ def timeout_interval=(value)
226
226
@timeout_interval = value
227
227
end
228
228
229
+ def add_observer ( observer , func = :update )
230
+ @observers . add_observer ( observer , func )
231
+ end
232
+
229
233
# Terminate with extreme prejudice. Useful in cases where +#stop+ doesn't
230
234
# work because one of the threads becomes unresponsive.
231
235
#
@@ -278,11 +282,10 @@ def execute_task # :nodoc:
278
282
end
279
283
raise TimeoutError if @worker . join ( @timeout_interval ) . nil?
280
284
mutex . synchronize { @value = @worker [ :result ] }
281
- rescue Exception => ex
282
- # suppress
285
+ rescue Exception => e
286
+ ex = e
283
287
ensure
284
- changed
285
- notify_observers ( Time . now , self . value , ex )
288
+ @observers . notify_observers ( Time . now , self . value , ex )
286
289
unless @worker . nil?
287
290
Thread . kill ( @worker )
288
291
@worker = nil
0 commit comments