Skip to content

Commit 9af0f8c

Browse files
committed
Remove timeout from TimerTask
The timeout in TimerTask is not ensuring tasks are not allowed to continue processing after the timeout has passed. This can lead to threads leaking since TimerTask will try to run the provided task again before the previous execution has completed. Yet TimerTask will not allow the task to run in parallel so more and more worker threads will be queued up waiting to be scheduled for executing the task. To illustrate, imagine running a TimerTask with an execution interval of 1 and a timeout interval of 1, with the task itself running for 4 seconds. Time 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Worker1 s> t - - < > t . . . . . - - - < Worker2 s > t . - - - < > t . . . . . Worker3 s > t . . . - - - < > t . Worker4 s > t . . . . . . . Worker5 s > t . . . At t=1 worker 1 is spawned(s) and scheduled(>) and will start executing the task. It will timeout(t) after 1 second and cause the spawning of worker 2. Worker 2 will then wait 1 second to be scheduled and then another second to timeout causing the spawn of worker 3 at t=4. Worker 3 is then scheduled to start at t=5 and will timeout at t=6. At this point worker 1 has completed it's previous task so the task queued by worker 3 will go to worker 1 to be scheduled for t=7. At t=8 worker 1 will timeout and since worker 2 is currently executing(-) and worker 3 is current waiting(.) for worker 2 to completed worker 4 will be spawned. This patterns will continue to repeat with new workers/threads spawned every 4 seconds.
1 parent a9ae6de commit 9af0f8c

File tree

2 files changed

+7
-103
lines changed

2 files changed

+7
-103
lines changed

lib/concurrent-ruby/concurrent/timer_task.rb

Lines changed: 6 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ module Concurrent
2525
# Should the task experience an unrecoverable crash only the task thread will
2626
# crash. This makes the `TimerTask` very fault tolerant. Additionally, the
2727
# `TimerTask` thread can respond to the success or failure of the task,
28-
# performing logging or ancillary operations. `TimerTask` can also be
29-
# configured with a timeout value allowing it to kill a task that runs too
30-
# long.
28+
# performing logging or ancillary operations.
3129
#
3230
# One other advantage of `TimerTask` is that it forces the business logic to
3331
# be completely decoupled from the concurrency logic. The business logic can
@@ -48,9 +46,7 @@ module Concurrent
4846
# {http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html
4947
# Observable} module. On execution the `TimerTask` will notify the observers
5048
# with three arguments: time of execution, the result of the block (or nil on
51-
# failure), and any raised exceptions (or nil on success). If the timeout
52-
# interval is exceeded the observer will receive a `Concurrent::TimeoutError`
53-
# object as the third argument.
49+
# failure), and any raised exceptions (or nil on success).
5450
#
5551
# @!macro copy_options
5652
#
@@ -59,20 +55,18 @@ module Concurrent
5955
# task.execute
6056
#
6157
# task.execution_interval #=> 60 (default)
62-
# task.timeout_interval #=> 30 (default)
6358
#
6459
# # wait 60 seconds...
6560
# #=> 'Boom!'
6661
#
6762
# task.shutdown #=> true
6863
#
69-
# @example Configuring `:execution_interval` and `:timeout_interval`
70-
# task = Concurrent::TimerTask.new(execution_interval: 5, timeout_interval: 5) do
64+
# @example Configuring `:execution_interval`
65+
# task = Concurrent::TimerTask.new(execution_interval: 5) do
7166
# puts 'Boom!'
7267
# end
7368
#
7469
# task.execution_interval #=> 5
75-
# task.timeout_interval #=> 5
7670
#
7771
# @example Immediate execution with `:run_now`
7872
# task = Concurrent::TimerTask.new(run_now: true){ puts 'Boom!' }
@@ -115,15 +109,13 @@ module Concurrent
115109
# def update(time, result, ex)
116110
# if result
117111
# print "(#{time}) Execution successfully returned #{result}\n"
118-
# elsif ex.is_a?(Concurrent::TimeoutError)
119-
# print "(#{time}) Execution timed out\n"
120112
# else
121113
# print "(#{time}) Execution failed with error #{ex}\n"
122114
# end
123115
# end
124116
# end
125117
#
126-
# task = Concurrent::TimerTask.new(execution_interval: 1, timeout_interval: 1){ 42 }
118+
# task = Concurrent::TimerTask.new(execution_interval: 1){ 42 }
127119
# task.add_observer(TaskObserver.new)
128120
# task.execute
129121
# sleep 4
@@ -133,7 +125,7 @@ module Concurrent
133125
# #=> (2013-10-13 19:09:00 -0400) Execution successfully returned 42
134126
# task.shutdown
135127
#
136-
# task = Concurrent::TimerTask.new(execution_interval: 1, timeout_interval: 1){ sleep }
128+
# task = Concurrent::TimerTask.new(execution_interval: 1){ sleep }
137129
# task.add_observer(TaskObserver.new)
138130
# task.execute
139131
#
@@ -160,17 +152,12 @@ class TimerTask < RubyExecutorService
160152
# Default `:execution_interval` in seconds.
161153
EXECUTION_INTERVAL = 60
162154

163-
# Default `:timeout_interval` in seconds.
164-
TIMEOUT_INTERVAL = 30
165-
166155
# Create a new TimerTask with the given task and configuration.
167156
#
168157
# @!macro timer_task_initialize
169158
# @param [Hash] opts the options defining task execution.
170159
# @option opts [Integer] :execution_interval number of seconds between
171160
# task executions (default: EXECUTION_INTERVAL)
172-
# @option opts [Integer] :timeout_interval number of seconds a task can
173-
# run before it is considered to have failed (default: TIMEOUT_INTERVAL)
174161
# @option opts [Boolean] :run_now Whether to run the task immediately
175162
# upon instantiation or to wait until the first # execution_interval
176163
# has passed (default: false)
@@ -252,24 +239,6 @@ def execution_interval=(value)
252239
end
253240
end
254241

255-
# @!attribute [rw] timeout_interval
256-
# @return [Fixnum] Number of seconds the task can run before it is
257-
# considered to have failed.
258-
def timeout_interval
259-
synchronize { @timeout_interval }
260-
end
261-
262-
# @!attribute [rw] timeout_interval
263-
# @return [Fixnum] Number of seconds the task can run before it is
264-
# considered to have failed.
265-
def timeout_interval=(value)
266-
if (value = value.to_f) <= 0.0
267-
raise ArgumentError.new('must be greater than zero')
268-
else
269-
synchronize { @timeout_interval = value }
270-
end
271-
end
272-
273242
private :post, :<<
274243

275244
private
@@ -278,7 +247,6 @@ def ns_initialize(opts, &task)
278247
set_deref_options(opts)
279248

280249
self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
281-
self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL
282250
@run_now = opts[:now] || opts[:run_now]
283251
@executor = Concurrent::SafeTaskExecutor.new(task)
284252
@running = Concurrent::AtomicBoolean.new(false)
@@ -308,7 +276,6 @@ def schedule_next_task(interval = execution_interval)
308276
# @!visibility private
309277
def execute_task(completion)
310278
return nil unless @running.true?
311-
ScheduledTask.execute(timeout_interval, args: [completion], &method(:timeout_task))
312279
_success, value, reason = @executor.execute(self)
313280
if completion.try?
314281
self.value = value
@@ -320,14 +287,5 @@ def execute_task(completion)
320287
end
321288
nil
322289
end
323-
324-
# @!visibility private
325-
def timeout_task(completion)
326-
return unless @running.true?
327-
if completion.try?
328-
schedule_next_task
329-
observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new)
330-
end
331-
end
332290
end
333291
end

spec/concurrent/timer_task_spec.rb

Lines changed: 1 addition & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -72,37 +72,15 @@ def trigger_observable(observable)
7272
}.to raise_error(ArgumentError)
7373
end
7474

75-
it 'raises an exception if :timeout_interval is not greater than zero' do
76-
expect {
77-
Concurrent::TimerTask.new(timeout_interval: 0) { nil }
78-
}.to raise_error(ArgumentError)
79-
end
80-
81-
it 'raises an exception if :timeout_interval is not an integer' do
82-
expect {
83-
Concurrent::TimerTask.new(timeout_interval: 'one') { nil }
84-
}.to raise_error(ArgumentError)
85-
end
86-
8775
it 'uses the default execution interval when no interval is given' do
8876
subject = TimerTask.new { nil }
8977
expect(subject.execution_interval).to eq TimerTask::EXECUTION_INTERVAL
9078
end
9179

92-
it 'uses the default timeout interval when no interval is given' do
93-
subject = TimerTask.new { nil }
94-
expect(subject.timeout_interval).to eq TimerTask::TIMEOUT_INTERVAL
95-
end
96-
9780
it 'uses the given execution interval' do
9881
subject = TimerTask.new(execution_interval: 5) { nil }
9982
expect(subject.execution_interval).to eq 5
10083
end
101-
102-
it 'uses the given timeout interval' do
103-
subject = TimerTask.new(timeout_interval: 5) { nil }
104-
expect(subject.timeout_interval).to eq 5
105-
end
10684
end
10785

10886
context '#kill' do
@@ -135,8 +113,7 @@ def trigger_observable(observable)
135113
specify '#execution_interval is writeable' do
136114

137115
latch = CountDownLatch.new(1)
138-
subject = TimerTask.new(timeout_interval: 1,
139-
execution_interval: 1,
116+
subject = TimerTask.new(execution_interval: 1,
140117
run_now: true) do |task|
141118
task.execution_interval = 3
142119
latch.count_down
@@ -152,27 +129,6 @@ def trigger_observable(observable)
152129
expect(subject.execution_interval).to eq(3)
153130
subject.kill
154131
end
155-
156-
specify '#timeout_interval is writeable' do
157-
158-
latch = CountDownLatch.new(1)
159-
subject = TimerTask.new(timeout_interval: 1,
160-
execution_interval: 0.1,
161-
run_now: true) do |task|
162-
task.timeout_interval = 3
163-
latch.count_down
164-
end
165-
166-
expect(subject.timeout_interval).to eq(1)
167-
subject.timeout_interval = 2
168-
expect(subject.timeout_interval).to eq(2)
169-
170-
subject.execute
171-
latch.wait(0.2)
172-
173-
expect(subject.timeout_interval).to eq(3)
174-
subject.kill
175-
end
176132
end
177133

178134
context 'execution' do
@@ -243,16 +199,6 @@ def trigger_observable(observable)
243199
subject.kill
244200
end
245201

246-
it 'notifies all observers on timeout' do
247-
subject = TimerTask.new(run_now: true, execution: 2, timeout: 0.1) { sleep }
248-
subject.add_observer(observer)
249-
subject.execute
250-
observer.latch.wait(1)
251-
expect(observer.value).to be_nil
252-
expect(observer.ex).to be_a(Concurrent::TimeoutError)
253-
subject.kill
254-
end
255-
256202
it 'notifies all observers on error' do
257203
subject = TimerTask.new(execution: 0.1) { raise ArgumentError }
258204
subject.add_observer(observer)

0 commit comments

Comments
 (0)