Skip to content

Commit 9174a1d

Browse files
committed
ScheduledTask and TimerSet::Task now support deref and executor options.
1 parent 217c46e commit 9174a1d

File tree

4 files changed

+72
-34
lines changed

4 files changed

+72
-34
lines changed

.rspec

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
--require spec_helper
22
--color
3-
--backtrace
43
--format documentation

lib/concurrent/executor/timer_set.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@ class TimerSet < RubyExecutorService
2020
class Task < Concurrent::IVar
2121
include Comparable
2222

23+
attr_reader :executor
24+
2325
def initialize(parent, delay, args, task, opts = {})
2426
super(IVar::NO_VALUE, opts, &nil)
2527
synchronize do
2628
ns_set_delay_and_time!(delay)
2729
@parent = parent
2830
@args = args
2931
@task = task
32+
@executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
3033
self.observers = CopyOnNotifyObserverSet.new
3134
end
3235
end
@@ -134,7 +137,7 @@ def initialize(opts = {})
134137
# @!macro deprecated_scheduling_by_clock_time
135138
def post(delay, *args, &task)
136139
raise ArgumentError.new('no block given') unless block_given?
137-
task = Task.new(self, delay, args, task) # may raise exception
140+
task = Task.new(self, delay, args, task, {executor: @task_executor}) # may raise exception
138141
ok = synchronize{ ns_post_task(task) }
139142
ok ? task : false
140143
end
@@ -192,7 +195,7 @@ def post_task(task)
192195
def ns_post_task(task)
193196
return false unless ns_running?
194197
if (task.original_delay) <= 0.01
195-
@task_executor.post{ task.process_task }
198+
task.executor.post{ task.process_task }
196199
else
197200
@queue.push(task)
198201
# only post the process method when the queue is empty
@@ -248,7 +251,7 @@ def process_tasks
248251
# queue now must have the same pop time, or a closer one, as
249252
# when we peeked).
250253
task = synchronize { @queue.pop }
251-
@task_executor.post{ task.process_task }
254+
task.executor.post{ task.process_task }
252255
else
253256
@condition.wait([diff, 60].min)
254257
end

lib/concurrent/scheduled_task.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,9 @@ class ScheduledTask < TimerSet::Task
149149
# but will not be supported in the 1.0 release.
150150
def initialize(delay, opts = {}, &block)
151151
raise ArgumentError.new('no block given') unless block_given?
152-
super(Concurrent.global_timer_set, delay, [], block, &nil)
152+
timer_set = opts.fetch(:timer_set, Concurrent.global_timer_set)
153+
args = get_arguments_from(opts)
154+
super(timer_set, delay, args, block, opts, &nil)
153155
synchronize do
154156
ns_set_state(:unscheduled)
155157
@__original_delay__ = delay
@@ -182,7 +184,7 @@ def execute
182184
#
183185
# @!macro deprecated_scheduling_by_clock_time
184186
def self.execute(delay, opts = {}, &block)
185-
new(delay, &block).execute
187+
new(delay, opts, &block).execute
186188
end
187189

188190
# In the task execution in progress?

spec/concurrent/scheduled_task_spec.rb

Lines changed: 62 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require 'timecop'
2+
require_relative 'dereferenceable_shared'
23
require_relative 'obligation_shared'
34
require_relative 'observable_shared'
45

@@ -8,8 +9,6 @@ module Concurrent
89

910
context 'behavior' do
1011

11-
# obligation
12-
1312
let!(:fulfilled_value) { 10 }
1413
let!(:rejected_reason) { StandardError.new('mojo jojo') }
1514

@@ -33,21 +32,30 @@ module Concurrent
3332
task
3433
end
3534

36-
it_should_behave_like :obligation
37-
38-
# dereferenceable
39-
40-
specify{ expect(ScheduledTask.ancestors).to include(Dereferenceable) }
35+
def dereferenceable_subject(value, opts = {})
36+
task = ScheduledTask.execute(0, opts){ value }.execute
37+
task.value
38+
task
39+
end
4140

42-
# observable
41+
def dereferenceable_observable(opts = {})
42+
ScheduledTask.new(0, opts){ 'value' }
43+
end
4344

44-
subject{ ScheduledTask.new(0.1){ nil } }
45+
def execute_dereferenceable(subject)
46+
subject.execute
47+
subject.value
48+
end
4549

4650
def trigger_observable(observable)
4751
observable.execute
4852
sleep(0.2)
4953
end
5054

55+
subject{ ScheduledTask.new(0.1){ nil } }
56+
57+
it_should_behave_like :obligation
58+
it_should_behave_like :dereferenceable
5159
it_should_behave_like :observable
5260
end
5361

@@ -107,10 +115,6 @@ def trigger_observable(observable)
107115
task.execute
108116
end
109117

110-
it 'allows setting the execution interval to 0' do
111-
expect { 1000.times { ScheduledTask.execute(0) { } } }.not_to raise_error
112-
end
113-
114118
it 'sets the sate to :pending' do
115119
task = ScheduledTask.new(1){ nil }
116120
task.execute
@@ -145,6 +149,50 @@ def trigger_observable(observable)
145149
end
146150
end
147151

152+
context 'execution' do
153+
154+
it 'passes :args from the options to the block' do
155+
expected = [1, 2, 3]
156+
actual = nil
157+
latch = Concurrent::CountDownLatch.new
158+
task = ScheduledTask.execute(0, args: expected) do |*args|
159+
actual = args
160+
latch.count_down
161+
end
162+
latch.wait(2)
163+
expect(actual).to eq expected
164+
end
165+
166+
it 'uses the :executor from the options' do
167+
latch = Concurrent::CountDownLatch.new
168+
executor = Concurrent::ImmediateExecutor.new
169+
expect(executor).to receive(:post).once.with(any_args).and_call_original
170+
task = ScheduledTask.execute(0.1, executor: executor) do
171+
latch.count_down
172+
end
173+
latch.wait(2)
174+
end
175+
176+
it 'uses the :timer_set from the options' do
177+
timer = Concurrent::TimerSet.new
178+
expect(timer).to receive(:post_task).once.with(any_args).and_return(false)
179+
task = ScheduledTask.execute(1, timer_set: timer){ nil }
180+
end
181+
182+
it 'sets the state to :processing when the task is running' do
183+
start_latch = Concurrent::CountDownLatch.new(1)
184+
continue_latch = Concurrent::CountDownLatch.new(1)
185+
task = ScheduledTask.new(0.1) {
186+
start_latch.count_down
187+
continue_latch.wait(2)
188+
}.execute
189+
start_latch.wait(2)
190+
state = task.state
191+
continue_latch.count_down
192+
expect(state).to eq :processing
193+
end
194+
end
195+
148196
context '#cancel' do
149197

150198
it 'returns false if the task has already been performed' do
@@ -171,7 +219,6 @@ def trigger_observable(observable)
171219
expect(latch.wait(0.3)).to be_falsey
172220
end
173221

174-
175222
it 'cancels the task if it has not yet started' do
176223
latch = Concurrent::CountDownLatch.new(1)
177224
task = ScheduledTask.new(0.3){ latch.count_down }.execute
@@ -195,19 +242,6 @@ def trigger_observable(observable)
195242
end
196243
end
197244

198-
context 'execution' do
199-
200-
it 'sets the state to :in_progress when the task is running' do
201-
latch = Concurrent::CountDownLatch.new(1)
202-
task = ScheduledTask.new(0.1) {
203-
latch.count_down
204-
sleep(1)
205-
}.execute
206-
latch.wait(1)
207-
expect(task).to be_in_progress
208-
end
209-
end
210-
211245
context 'observation' do
212246

213247
let(:clazz) do
@@ -240,7 +274,7 @@ def update(time, value, reason)
240274
expect(task.add_observer(observer)).to be_truthy
241275
end
242276

243-
it 'returns true for an observer added while :in_progress' do
277+
it 'returns true for an observer added while :processing' do
244278
task = ScheduledTask.new(0.1){ sleep(1); 42 }.execute
245279
sleep(0.2)
246280
expect(task.add_observer(observer)).to be_truthy

0 commit comments

Comments
 (0)