Skip to content

Commit 6da10fa

Browse files
committed
cancellation: Add graceful vs. forced
1 parent 2af3f10 commit 6da10fa

File tree

2 files changed

+44
-25
lines changed

2 files changed

+44
-25
lines changed

src/cancellation.jl

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,29 @@
11
# DTask-level cancellation
22

3-
struct CancelToken
4-
cancelled::Base.RefValue{Bool}
3+
mutable struct CancelToken
4+
@atomic cancelled::Bool
5+
@atomic graceful::Bool
56
event::Base.Event
67
end
7-
CancelToken() = CancelToken(Ref(false), Base.Event())
8-
function cancel!(token::CancelToken)
9-
token.cancelled[] = true
8+
CancelToken() = CancelToken(false, false, Base.Event())
9+
function cancel!(token::CancelToken; graceful::Bool=true)
10+
if !graceful
11+
@atomic token.graceful = false
12+
end
13+
@atomic token.cancelled = true
1014
notify(token.event)
1115
return
1216
end
13-
is_cancelled(token::CancelToken) = token.cancelled[]
17+
function is_cancelled(token::CancelToken; must_force::Bool=false)
18+
if token.cancelled[]
19+
if must_force && token.graceful[]
20+
# If we're only responding to forced cancellation, ignore graceful cancellations
21+
return false
22+
end
23+
return true
24+
end
25+
return false
26+
end
1427
Base.wait(token::CancelToken) = wait(token.event)
1528
# TODO: Enable this for safety
1629
#Serialization.serialize(io::AbstractSerializer, ::CancelToken) =
@@ -34,13 +47,15 @@ end
3447
# Global-level cancellation
3548

3649
"""
37-
cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false)
50+
cancel!(task::DTask; force::Bool=false, graceful::Bool=true, halt_sch::Bool=false)
3851
3952
Cancels `task` at any point in its lifecycle, causing the scheduler to abandon
40-
it. If `force` is `true`, the task will be interrupted with an
41-
`InterruptException` (not recommended, this is unsafe). If `halt_sch` is
42-
`true`, the scheduler will be halted after the task is cancelled (it will
43-
restart automatically upon the next `@spawn`/`spawn` call).
53+
it.
54+
55+
# Keyword arguments
56+
- `force`: If `true`, the task will be interrupted with an `InterruptException` (not recommended, this is unsafe).
57+
- `graceful`: If `true`, the task will be allowed to finish its current execution before being cancelled; otherwise, it will be cancelled as soon as possible.
58+
- `halt_sch`: If `true`, the scheduler will be halted after the task is cancelled (it will restart automatically upon the next `@spawn`/`spawn` call).
4459
4560
As an example, the following code will cancel task `t` before it finishes
4661
executing:
@@ -56,24 +71,24 @@ tasks which are waiting to run. Using `cancel!` is generally a much safer
5671
alternative to Ctrl+C, as it cooperates with the scheduler and runtime and
5772
avoids unintended side effects.
5873
"""
59-
function cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false)
74+
function cancel!(task::DTask; force::Bool=false, graceful::Bool=true, halt_sch::Bool=false)
6075
tid = lock(Dagger.Sch.EAGER_ID_MAP) do id_map
6176
id_map[task.uid]
6277
end
63-
cancel!(tid; force, halt_sch)
78+
cancel!(tid; force, graceful, halt_sch)
6479
end
6580
function cancel!(tid::Union{Int,Nothing}=nothing;
66-
force::Bool=false, halt_sch::Bool=false)
81+
force::Bool=false, graceful::Bool=true, halt_sch::Bool=false)
6782
remotecall_fetch(1, tid, force, halt_sch) do tid, force, halt_sch
6883
state = Sch.EAGER_STATE[]
6984

7085
# Check that the scheduler isn't stopping or has already stopped
7186
if !isnothing(state) && !state.halt.set
72-
@lock state.lock _cancel!(state, tid, force, halt_sch)
87+
@lock state.lock _cancel!(state, tid, force, graceful, halt_sch)
7388
end
7489
end
7590
end
76-
function _cancel!(state, tid, force, halt_sch)
91+
function _cancel!(state, tid, force, graceful, halt_sch)
7792
@assert islocked(state.lock)
7893

7994
# Get the scheduler uid
@@ -128,7 +143,7 @@ function _cancel!(state, tid, force, halt_sch)
128143
push!(istate.cancelled, tid)
129144
to_proc = istate.proc
130145
put!(istate.return_queue, (myid(), to_proc, tid, (InterruptException(), nothing)))
131-
cancel!(istate.cancel_tokens[tid])
146+
cancel!(istate.cancel_tokens[tid]; graceful)
132147
end
133148
end
134149
end

src/task-tls.jl

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,26 +52,30 @@ task_processor() = get_tls().processor
5252
@deprecate(thunk_processor(), task_processor())
5353

5454
"""
55-
task_cancelled() -> Bool
55+
task_cancelled(; must_force::Bool=false) -> Bool
5656
5757
Returns `true` if the current [`DTask`](@ref) has been cancelled, else `false`.
58+
If `must_force=true`, then only return `true` if the cancellation was forced.
5859
"""
59-
task_cancelled() = is_cancelled(get_tls().cancel_token)
60+
task_cancelled(; must_force::Bool=false) =
61+
is_cancelled(get_tls().cancel_token; must_force)
6062

6163
"""
62-
task_may_cancel!()
64+
task_may_cancel!(; must_force::Bool=false)
6365
6466
Throws an `InterruptException` if the current [`DTask`](@ref) has been cancelled.
67+
If `must_force=true`, then only throw if the cancellation was forced.
6568
"""
66-
function task_may_cancel!()
67-
if task_cancelled()
69+
function task_may_cancel!(;must_force::Bool=false)
70+
if task_cancelled(;must_force)
6871
throw(InterruptException())
6972
end
7073
end
7174

7275
"""
73-
task_cancel!()
76+
task_cancel!(; graceful::Bool=true)
7477
75-
Cancels the current [`DTask`](@ref).
78+
Cancels the current [`DTask`](@ref). If `graceful=true`, then the task will be
79+
cancelled gracefully, otherwise it will be forced.
7680
"""
77-
task_cancel!() = cancel!(get_tls().cancel_token)
81+
task_cancel!(; graceful::Bool=true) = cancel!(get_tls().cancel_token; graceful)

0 commit comments

Comments
 (0)