|
| 1 | +""" |
| 2 | + cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false) |
| 3 | +
|
| 4 | +Cancels `task` at any point in its lifecycle, causing the scheduler to abandon |
| 5 | +it. If `force` is `true`, the task will be interrupted with an |
| 6 | +`InterruptException` (not recommended, this is unsafe). If `halt_sch` is |
| 7 | +`true`, the scheduler will be halted after the task is cancelled (it will |
| 8 | +restart automatically upon the next `@spawn`/`spawn` call). |
| 9 | +
|
| 10 | +As an example, the following code will cancel task `t` before it finishes |
| 11 | +executing: |
| 12 | +
|
| 13 | +```julia |
| 14 | +t = Dagger.@spawn sleep(1000) |
| 15 | +# We're bored, let's cancel `t` |
| 16 | +Dagger.cancel!(t) |
| 17 | +``` |
| 18 | +
|
| 19 | +Cancellation allows the scheduler to free up execution resources for other |
| 20 | +tasks which are waiting to run. Using `cancel!` is generally a much safer |
| 21 | +alternative to Ctrl+C, as it cooperates with the scheduler and runtime and |
| 22 | +avoids unintended side effects. |
| 23 | +""" |
| 24 | +function cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false) |
| 25 | + tid = lock(Dagger.Sch.EAGER_ID_MAP) do id_map |
| 26 | + id_map[task.uid] |
| 27 | + end |
| 28 | + cancel!(tid; force, halt_sch) |
| 29 | +end |
| 30 | +function cancel!(tid::Union{Int,Nothing}=nothing; |
| 31 | + force::Bool=false, halt_sch::Bool=false) |
| 32 | + remotecall_fetch(1, tid, force, halt_sch) do tid, force, halt_sch |
| 33 | + state = Sch.EAGER_STATE[] |
| 34 | + state === nothing && return |
| 35 | + @lock state.lock _cancel!(state, tid, force, halt_sch) |
| 36 | + end |
| 37 | +end |
| 38 | +function _cancel!(state, tid, force, halt_sch) |
| 39 | + @assert islocked(state.lock) |
| 40 | + |
| 41 | + # Get the scheduler uid |
| 42 | + sch_uid = state.uid |
| 43 | + |
| 44 | + # Cancel ready tasks |
| 45 | + for task in state.ready |
| 46 | + tid !== nothing && task.id != tid && continue |
| 47 | + @dagdebug tid :cancel "Cancelling ready task" |
| 48 | + state.cache[task] = InterruptException() |
| 49 | + state.errored[task] = true |
| 50 | + Sch.set_failed!(state, task) |
| 51 | + end |
| 52 | + empty!(state.ready) |
| 53 | + |
| 54 | + # Cancel waiting tasks |
| 55 | + for task in keys(state.waiting) |
| 56 | + tid !== nothing && task.id != tid && continue |
| 57 | + @dagdebug tid :cancel "Cancelling waiting task" |
| 58 | + state.cache[task] = InterruptException() |
| 59 | + state.errored[task] = true |
| 60 | + Sch.set_failed!(state, task) |
| 61 | + end |
| 62 | + empty!(state.waiting) |
| 63 | + |
| 64 | + # Cancel running tasks at the processor level |
| 65 | + wids = unique(map(root_worker_id, values(state.running_on))) |
| 66 | + for wid in wids |
| 67 | + remotecall_fetch(wid, tid, sch_uid, force) do _tid, sch_uid, force |
| 68 | + Dagger.Sch.proc_states(sch_uid) do states |
| 69 | + for (proc, state) in states |
| 70 | + istate = state.state |
| 71 | + any_cancelled = false |
| 72 | + @lock istate.queue begin |
| 73 | + for (tid, task) in istate.tasks |
| 74 | + _tid !== nothing && tid != _tid && continue |
| 75 | + task_spec = istate.task_specs[tid] |
| 76 | + Tf = task_spec[6] |
| 77 | + Tf === typeof(Sch.eager_thunk) && continue |
| 78 | + istaskdone(task) && continue |
| 79 | + any_cancelled = true |
| 80 | + @dagdebug tid :cancel "Cancelling running task ($Tf)" |
| 81 | + if force |
| 82 | + @dagdebug tid :cancel "Interrupting running task ($Tf)" |
| 83 | + Threads.@spawn Base.throwto(task, InterruptException()) |
| 84 | + else |
| 85 | + # Tell the processor to just drop this task |
| 86 | + task_occupancy = task_spec[4] |
| 87 | + time_util = task_spec[2] |
| 88 | + istate.proc_occupancy[] -= task_occupancy |
| 89 | + istate.time_pressure[] -= time_util |
| 90 | + push!(istate.cancelled, tid) |
| 91 | + to_proc = istate.proc |
| 92 | + put!(istate.return_queue, (myid(), to_proc, tid, (InterruptException(), nothing))) |
| 93 | + end |
| 94 | + end |
| 95 | + end |
| 96 | + if any_cancelled |
| 97 | + notify(istate.reschedule) |
| 98 | + end |
| 99 | + end |
| 100 | + end |
| 101 | + return |
| 102 | + end |
| 103 | + end |
| 104 | + |
| 105 | + if halt_sch |
| 106 | + unlock(state.lock) |
| 107 | + try |
| 108 | + # Give tasks a moment to be processed |
| 109 | + sleep(0.5) |
| 110 | + |
| 111 | + # Halt the scheduler |
| 112 | + @dagdebug nothing :cancel "Halting the scheduler" |
| 113 | + notify(state.halt) |
| 114 | + put!(state.chan, (1, nothing, nothing, (Sch.SchedulerHaltedException(), nothing))) |
| 115 | + |
| 116 | + # Wait for the scheduler to halt |
| 117 | + @dagdebug nothing :cancel "Waiting for scheduler to halt" |
| 118 | + while Sch.EAGER_INIT[] |
| 119 | + sleep(0.1) |
| 120 | + end |
| 121 | + @dagdebug nothing :cancel "Scheduler halted" |
| 122 | + finally |
| 123 | + lock(state.lock) |
| 124 | + end |
| 125 | + end |
| 126 | + |
| 127 | + return |
| 128 | +end |
0 commit comments