Skip to content

Commit 911f357

Browse files
committed
Future can be cancelled before processing begins.
1 parent d28112a commit 911f357

File tree

3 files changed

+148
-9
lines changed

3 files changed

+148
-9
lines changed

lib/concurrent/errors.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ module Concurrent
33
# Raised when errors occur during configuration.
44
ConfigurationError = Class.new(StandardError)
55

6+
# Raised when an asynchronous operation is cancelled before execution.
7+
CancelledOperationError = Class.new(StandardError)
8+
69
# Raised when a lifecycle method (such as `stop`) is called in an improper
710
# sequence or when the object is in an inappropriate state.
811
LifecycleError = Class.new(StandardError)

lib/concurrent/future.rb

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,38 @@ def set(value = IVar::NO_VALUE, &block)
8484
execute
8585
end
8686

87+
# Attempt to cancel the operation if it has not already processed.
88+
# The operation can only be cancelled while still `pending`. It cannot
89+
# be cancelled once it has begun processing or has completed.
90+
#
91+
# @return [Boolean] was the operation successfully cancelled.
92+
def cancel
93+
compare_and_set_state(:cancelled, :pending)
94+
end
95+
96+
# Has the operation been successfully cancelled?
97+
#
98+
# @return [Boolean]
99+
def cancelled?
100+
state == :cancelled
101+
end
102+
103+
# Wait the given number of seconds for the operation to complete.
104+
# On timeout attempt to cancel the operation.
105+
#
106+
# @param [Numeric] timeout the maximum time in seconds to wait.
107+
# @return [Boolean] true if the operation completed before the timeout
108+
# else false
109+
def wait_or_cancel(timeout)
110+
wait(timeout)
111+
if complete?
112+
true
113+
else
114+
cancel
115+
false
116+
end
117+
end
118+
87119
protected
88120

89121
def ns_initialize(value, opts)
@@ -97,9 +129,13 @@ def ns_initialize(value, opts)
97129
private
98130

99131
# @!visibility private
100-
def work # :nodoc:
101-
success, val, reason = SafeTaskExecutor.new(@task, rescue_exception: true).execute(*@args)
102-
complete(success, val, reason)
132+
def work
133+
if compare_and_set_state(:processing, :pending)
134+
success, val, reason = SafeTaskExecutor.new(@task, rescue_exception: true).execute(*@args)
135+
complete(success, val, reason)
136+
else
137+
complete(false, nil, CancelledOperationError.new)
138+
end
103139
end
104140
end
105141
end

spec/concurrent/future_spec.rb

Lines changed: 106 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ module Concurrent
1818
let!(:rejected_reason) { StandardError.new('mojo jojo') }
1919

2020
let(:pending_subject) do
21-
Future.new(executor: executor){ sleep(0.1); fulfilled_value }.execute
21+
executor = Concurrent::SingleThreadExecutor.new
22+
executor.post{ sleep(5) }
23+
Future.execute(executor: executor){ fulfilled_value }
2224
end
2325

2426
let(:fulfilled_subject) do
@@ -110,8 +112,11 @@ def get_ivar_from_args(opts)
110112
end
111113

112114
it 'sets the state to :pending' do
113-
latch = Concurrent::CountDownLatch.new(1)
114-
future = Future.new(executor: executor){ latch.wait(10) }
115+
latch = Concurrent::CountDownLatch.new
116+
executor = Concurrent::SingleThreadExecutor.new
117+
executor.post{ latch.wait(2) }
118+
119+
future = Future.new(executor: executor){ 42 }
115120
future.execute
116121
expect(future).to be_pending
117122
latch.count_down
@@ -150,10 +155,29 @@ def get_ivar_from_args(opts)
150155

151156
let(:executor) { ImmediateExecutor.new }
152157

158+
it 'sets the state to :processing while the task is executing' do
159+
start_latch = Concurrent::CountDownLatch.new
160+
continue_latch = Concurrent::CountDownLatch.new
161+
executor = Concurrent::SingleThreadExecutor.new
162+
163+
future = Future.execute(executor: executor) do
164+
start_latch.count_down
165+
continue_latch.wait(2)
166+
42
167+
end
168+
169+
start_latch.wait(2)
170+
state = future.state
171+
continue_latch.count_down
172+
future.value
173+
174+
expect(state).to eq :processing
175+
end
176+
153177
it 'passes all arguments to handler' do
154-
@expected = false
155-
Future.new(executor: executor){ @expected = true }.execute
156-
expect(@expected).to be_truthy
178+
expected = false
179+
Future.new(executor: executor){ expected = true }.execute
180+
expect(expected).to be_truthy
157181
end
158182

159183
it 'sets the value to the result of the handler' do
@@ -198,6 +222,82 @@ def get_ivar_from_args(opts)
198222
end
199223
end
200224

225+
context 'cancellation' do
226+
227+
context '#cancel' do
228+
229+
it 'fails to cancel the task once processing has begun' do
230+
start_latch = Concurrent::CountDownLatch.new
231+
continue_latch = Concurrent::CountDownLatch.new
232+
f = Future.execute do
233+
start_latch.count_down
234+
continue_latch.wait(2)
235+
42
236+
end
237+
238+
start_latch.wait(2)
239+
cancelled = f.cancel
240+
continue_latch.count_down
241+
242+
expect(cancelled).to be false
243+
expect(f.value).to eq 42
244+
expect(f).to be_fulfilled
245+
end
246+
247+
it 'fails to cancel the task once processing is complete' do
248+
f = Future.execute{ 42 }
249+
f.wait
250+
cancelled = f.cancel
251+
252+
expect(cancelled).to be false
253+
expect(f.value).to eq 42
254+
expect(f).to be_fulfilled
255+
end
256+
257+
it 'cancels a pending task' do
258+
executor = Concurrent::SingleThreadExecutor.new
259+
latch = Concurrent::CountDownLatch.new
260+
executor.post{ latch.wait(2) }
261+
262+
f = Future.execute(executor: executor){ 42 }
263+
cancelled = f.cancel
264+
latch.count_down
265+
266+
expect(cancelled).to be true
267+
expect(f.value).to be_nil
268+
expect(f).to be_rejected
269+
expect(f.reason).to be_a Concurrent::CancelledOperationError
270+
end
271+
end
272+
273+
context '#wait_or_cancel' do
274+
275+
it 'returns true if the operation completes before timeout' do
276+
f = Future.execute{ 42 }
277+
success = f.wait_or_cancel(1)
278+
279+
expect(success).to be true
280+
expect(f.value).to eq 42
281+
expect(f).to be_fulfilled
282+
end
283+
284+
it 'cancels the task on timeout' do
285+
latch = Concurrent::CountDownLatch.new
286+
executor = Concurrent::SingleThreadExecutor.new
287+
executor.post{ latch.wait(2) }
288+
289+
f = Future.execute(executor: executor){ 42 }
290+
success = f.wait_or_cancel(0.1)
291+
latch.count_down
292+
293+
expect(success).to be false
294+
expect(f.value).to be_nil
295+
expect(f).to be_rejected
296+
expect(f.reason).to be_a Concurrent::CancelledOperationError
297+
end
298+
end
299+
end
300+
201301
context 'observation' do
202302

203303
let(:executor) { ImmediateExecutor.new }

0 commit comments

Comments
 (0)