Skip to content

Commit 0b108ee

Browse files
committed
Deprecate proclist and single in favor of scope
Also changes behavior such that proclist and single override scope when set, to prevent issues with mixing proclist/single with scope.
1 parent 96aa4dc commit 0b108ee

File tree

5 files changed

+63
-42
lines changed

5 files changed

+63
-42
lines changed

src/sch/Sch.jl

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import Random: randperm
99
import Base: @invokelatest
1010

1111
import ..Dagger
12-
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, WeakChunk, OSProc, DefaultScope
12+
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope
1313
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, default_enabled, get_processors, get_parent, execute!, rmprocs!, addprocs!, thunk_processor, constrain, cputhreadtime
1414

1515
const OneToMany = Dict{Thunk, Set{Thunk}}
@@ -143,13 +143,13 @@ end
143143
Stores DAG-global options to be passed to the Dagger.Sch scheduler.
144144
145145
# Arguments
146-
- `single::Int=0`: Force all work onto worker with specified id. `0` disables
147-
this option.
148-
- `proclist=nothing`: Force scheduler to use one or more processors that are
149-
instances/subtypes of a contained type. Alternatively, a function can be
150-
supplied, and the function will be called with a processor as the sole
151-
argument and should return a `Bool` result to indicate whether or not to use
152-
the given processor. `nothing` enables all default processors.
146+
- `single::Int=0`: (Deprecated) Force all work onto worker with specified id.
147+
`0` disables this option.
148+
- `proclist=nothing`: (Deprecated) Force scheduler to use one or more
149+
processors that are instances/subtypes of a contained type. Alternatively, a
150+
function can be supplied, and the function will be called with a processor as
151+
the sole argument and should return a `Bool` result to indicate whether or not
152+
to use the given processor. `nothing` enables all default processors.
153153
- `allow_errors::Bool=true`: Allow thunks to error without affecting
154154
non-dependent thunks.
155155
- `checkpoint=nothing`: If not `nothing`, uses the provided function to save
@@ -176,11 +176,11 @@ end
176176
Stores Thunk-local options to be passed to the Dagger.Sch scheduler.
177177
178178
# Arguments
179-
- `single::Int=0`: Force thunk onto worker with specified id. `0` disables this
180-
option.
181-
- `proclist=nothing`: Force thunk to use one or more processors that are
182-
instances/subtypes of a contained type. Alternatively, a function can be
183-
supplied, and the function will be called with a processor as the sole
179+
- `single::Int=0`: (Deprecated) Force thunk onto worker with specified id. `0`
180+
disables this option.
181+
- `proclist=nothing`: (Deprecated) Force thunk to use one or more processors
182+
that are instances/subtypes of a contained type. Alternatively, a function can
183+
be supplied, and the function will be called with a processor as the sole
184184
argument and should return a `Bool` result to indicate whether or not to use
185185
the given processor. `nothing` enables all default processors.
186186
- `time_util::Dict{Type,Any}=Dict{Type,Any}()`: Indicates the maximum expected
@@ -634,7 +634,12 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
634634
scope = if task.f isa Chunk
635635
task.f.scope
636636
else
637-
DefaultScope()
637+
if task.options.proclist !== nothing
638+
# proclist overrides scope selection
639+
AnyScope()
640+
else
641+
DefaultScope()
642+
end
638643
end
639644
for input in task.inputs
640645
input = unwrap_weak_checked(input)
@@ -682,7 +687,8 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
682687

683688
for proc in local_procs
684689
gproc = get_parent(proc)
685-
if can_use_proc(task, gproc, proc, opts, scope)
690+
can_use, scope = can_use_proc(task, gproc, proc, opts, scope)
691+
if can_use
686692
has_cap, est_time_util, est_alloc_util = has_capacity(state, proc, gproc.pid, opts.time_util, opts.alloc_util, sig)
687693
if has_cap
688694
# Schedule task onto proc
@@ -706,7 +712,8 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
706712
cap, extra_util = nothing, nothing
707713
procs_found = false
708714
# N.B. if we only have one processor, we need to select it now
709-
if can_use_proc(task, entry.gproc, entry.proc, opts, scope)
715+
can_use, scope = can_use_proc(task, entry.gproc, entry.proc, opts, scope)
716+
if can_use
710717
has_cap, est_time_util, est_alloc_util = has_capacity(state, entry.proc, entry.gproc.pid, opts.time_util, opts.alloc_util, sig)
711718
if has_cap
712719
selected_entry = entry
@@ -730,7 +737,8 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
730737
@goto pop_task
731738
end
732739

733-
if can_use_proc(task, entry.gproc, entry.proc, opts, scope)
740+
can_use, scope = can_use_proc(task, entry.gproc, entry.proc, opts, scope)
741+
if can_use
734742
has_cap, est_time_util, est_alloc_util = has_capacity(state, entry.proc, entry.gproc.pid, opts.time_util, opts.alloc_util, sig)
735743
if has_cap
736744
# Select this processor

src/sch/eager.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ function init_eager()
1818
ctx = eager_context()
1919
@async try
2020
sopts = SchedulerOptions(;allow_errors=true)
21-
topts = ThunkOptions(;single=1)
21+
scope = Dagger.ExactScope(Dagger.ThreadProc(1, 1))
2222
atexit() do
2323
EAGER_FORCE_KILL[] = true
2424
close(EAGER_THUNK_CHAN)
2525
end
26-
Dagger.compute(ctx, Dagger.delayed(eager_thunk; options=topts)(); options=sopts)
26+
Dagger.compute(ctx, Dagger.delayed(eager_thunk; scope)(); options=sopts)
2727
catch err
2828
iob = IOContext(IOBuffer(), :color=>true)
2929
println(iob, "Error in eager scheduler:")

src/sch/util.jl

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -263,43 +263,55 @@ end
263263

264264
function can_use_proc(task, gproc, proc, opts, scope)
265265
# Check against proclist
266-
if opts.proclist === nothing
267-
if !default_enabled(proc)
268-
@debug "Rejected $proc: !default_enabled(proc)"
269-
return false
270-
end
271-
elseif opts.proclist isa Function
272-
if !Base.invokelatest(opts.proclist, proc)
273-
@debug "Rejected $proc: proclist(proc) == false"
274-
return false
266+
if opts.proclist !== nothing
267+
@warn "The `proclist` option is deprecated, please use scopes instead\nSee https://juliaparallel.org/Dagger.jl/stable/scopes/ for details" maxlog=1
268+
if opts.proclist isa Function
269+
if !Base.invokelatest(opts.proclist, proc)
270+
@debug "[$(task.id)] Rejected $proc: proclist(proc) == false"
271+
return false, scope
272+
end
273+
scope = constrain(scope, Dagger.ExactScope(proc))
274+
elseif opts.proclist isa Vector
275+
if !(typeof(proc) in opts.proclist)
276+
@debug "[$(task.id)] Rejected $proc: !(typeof(proc) in proclist)"
277+
return false, scope
278+
end
279+
scope = constrain(scope,
280+
Dagger.UnionScope(map(Dagger.ProcessorTypeScope, opts.proclist)))
281+
else
282+
throw(SchedulingException("proclist must be a Function, Vector, or nothing"))
275283
end
276-
elseif opts.proclist isa Vector
277-
if !(typeof(proc) in opts.proclist)
278-
@debug "Rejected $proc: !(typeof(proc) in proclist)"
279-
return false
284+
if scope isa Dagger.InvalidScope
285+
@debug "[$(task.id)] Rejected $proc: Not contained in task scope ($scope)"
286+
return false, scope
280287
end
281-
else
282-
throw(SchedulingException("proclist must be a Function, Vector, or nothing"))
283288
end
284289

285290
# Check against single
286291
if opts.single !== nothing
292+
@warn "The `single` option is deprecated, please use scopes instead\nSee https://juliaparallel.org/Dagger.jl/stable/scopes/ for details" maxlog=1
287293
if gproc.pid != opts.single
288294
@debug "[$(task.id)] Rejected $proc: gproc.pid ($(gproc.pid)) != single ($(opts.single))"
289-
return false
295+
return false, scope
296+
end
297+
scope = constrain(scope, Dagger.ProcessScope(opts.single))
298+
if scope isa Dagger.InvalidScope
299+
@debug "[$(task.id)] Rejected $proc: Not contained in task scope ($scope)"
300+
return false, scope
290301
end
291302
end
292303

293-
# Check scope
304+
# Check against scope
294305
proc_scope = Dagger.ExactScope(proc)
295306
if constrain(scope, proc_scope) isa Dagger.InvalidScope
296307
@debug "[$(task.id)] Rejected $proc: Not contained in task scope ($scope)"
297-
return false
308+
return false, scope
298309
end
299310

300-
@debug "[$(task.id)] Accepted $proc"
311+
@label accept
301312

302-
return true
313+
@debug "[$(task.id)] Accepted $proc"
314+
return true, scope
303315
end
304316

305317
function has_capacity(state, p, gp, time_util, alloc_util, sig)

test/fakeproc.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
@everywhere begin
22

33
using Dagger
4-
import Dagger: ThreadProc
4+
import Dagger: OSProc, ThreadProc
55

66
struct FakeProc <: Dagger.Processor
77
owner::Int
@@ -18,6 +18,7 @@ fakesum(xs...) = FakeVal(sum(map(y->y.x, xs)))
1818
Dagger.iscompatible_func(proc::FakeProc, opts, f) = true
1919
Dagger.iscompatible_arg(proc::FakeProc, opts, ::Type{<:Integer}) = true
2020
Dagger.iscompatible_arg(proc::FakeProc, opts, ::Type{<:FakeVal}) = true
21+
Dagger.move(from_proc::OSProc, to_proc::FakeProc, x::Integer) = FakeVal(x)
2122
Dagger.move(from_proc::ThreadProc, to_proc::FakeProc, x::Integer) = FakeVal(x)
2223
Dagger.execute!(proc::FakeProc, func, args...) = FakeVal(42+func(args...).x)
2324

test/processors.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ end
3737
end
3838
@testset "Processor exhaustion" begin
3939
opts = ThunkOptions(proclist=[OptOutProc])
40-
@test_throws_unwrap Dagger.ThunkFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try making proclist more liberal" collect(delayed(sum; options=opts)([1,2,3]))
40+
@test_throws_unwrap Dagger.ThunkFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3]))
4141
opts = ThunkOptions(proclist=(proc)->false)
42-
@test_throws_unwrap Dagger.ThunkFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try making proclist more liberal" collect(delayed(sum; options=opts)([1,2,3]))
42+
@test_throws_unwrap Dagger.ThunkFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3]))
4343
opts = ThunkOptions(proclist=nothing)
4444
@test collect(delayed(sum; options=opts)([1,2,3])) == 6
4545
end

0 commit comments

Comments
 (0)