Skip to content

Commit 85637c0

Browse files
authored
Merge pull request #374 from JuliaParallel/jps/processor-type-scope
Add ProcessorTypeScope, deprecate proclist and single
2 parents 8d6b234 + fa7f828 commit 85637c0

File tree

13 files changed

+480
-104
lines changed

13 files changed

+480
-104
lines changed

docs/src/index.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -198,15 +198,17 @@ While Dagger generally "just works", sometimes one needs to exert some more
198198
fine-grained control over how the scheduler allocates work. There are two
199199
parallel mechanisms to achieve this: Scheduler options (from
200200
`Dagger.Sch.SchedulerOptions`) and Thunk options (from
201-
`Dagger.Sch.ThunkOptions`). These two options structs generally contain the
202-
same options, with the difference being that Scheduler options operate
201+
`Dagger.Sch.ThunkOptions`). These two options structs contain many shared
202+
options, with the difference being that Scheduler options operate
203203
globally across an entire DAG, and Thunk options operate on a thunk-by-thunk
204-
basis. Scheduler options can be constructed and passed to `collect()` or
205-
`compute()` as the keyword argument `options` for lazy API usage:
204+
basis.
205+
206+
Scheduler options can be constructed and passed to `collect()` or `compute()`
207+
as the keyword argument `options` for lazy API usage:
206208

207209
```julia
208210
t = @par 1+2
209-
opts = Dagger.Sch.ThunkOptions(;single=1) # Execute on worker 1
211+
opts = Dagger.Sch.SchedulerOptions(;single=1) # Execute on worker 1
210212

211213
compute(t; options=opts)
212214

@@ -219,7 +221,6 @@ Thunk options can be passed to `@spawn/spawn`, `@par`, and `delayed` similarly:
219221
# Execute on worker 1
220222

221223
Dagger.@spawn single=1 1+2
222-
223224
Dagger.spawn(+, 1, 2; single=1)
224225

225226
opts = Dagger.Sch.ThunkOptions(;single=1)

docs/src/processors.md

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -76,42 +76,13 @@ processor B. This mechanism uses Julia's Serialization library to serialize and
7676
deserialize data, so data must be serializable for this mechanism to work
7777
properly.
7878

79-
### Future: Hierarchy Generic Path Move
80-
81-
NOTE: This used to be the default move behavior, but was removed because it
82-
wasn't considered helpful, and there were not any processor implementations
83-
that made use of it.
84-
85-
Movement of data between any two processors is decomposable into a sequence of
86-
"moves" between a child and its parent, termed a "generic path move". Movement
87-
of data may also take "shortcuts" between nodes in the tree which are not
88-
directly connected if enabled by libraries or the user, which may make use of
89-
IPC mechanisms to transfer data more directly and efficiently (such as
90-
Infiniband, GPU RDMA, NVLINK, etc.). All data is considered local to some
91-
processor, and may only be operated on by another processor by first doing an
92-
explicit move operation to that processor.
93-
9479
## Processor Selection
9580

9681
By default, Dagger uses the CPU to process work, typically single-threaded per
9782
cluster node. However, Dagger allows access to a wider range of hardware and
9883
software acceleration techniques, such as multithreading and GPUs. These more
9984
advanced (but performant) accelerators are disabled by default, but can easily
100-
be enabled by using Scheduler/Thunk options in the `proclist` field. If
101-
`nothing`, all default processors will be used. If a vector of types, only the
102-
processor types contained in `options.proclist` will be used to compute all or
103-
a given thunk. If a function, it will be called for each processor (with the
104-
processor as the argument) until it returns `true`.
105-
106-
```julia
107-
opts = Dagger.Sch.ThunkOptions(;proclist=nothing) # default behavior
108-
# OR
109-
opts = Dagger.Sch.ThunkOptions(;proclist=[DaggerGPU.CuArrayProc]) # only execute on CuArrayProc
110-
# OR
111-
opts = Dagger.Sch.ThunkOptions(;proclist=(proc)->(proc isa Dagger.ThreadProc && proc.tid == 3)) # only run on ThreadProc with thread ID 3
112-
113-
t = Dagger.@par options=opts sum(X) # do sum(X) on the specified processor
114-
```
85+
be enabled by using scopes (see [Scopes](@ref) for details).
11586

11687
## Resource Control
11788

@@ -137,7 +108,7 @@ sufficient resources become available by thunks completing execution.
137108
The [DaggerGPU.jl](https://github.com/JuliaGPU/DaggerGPU.jl) package can be
138109
imported to enable GPU acceleration for NVIDIA and AMD GPUs, when available.
139110
The processors provided by that package are not enabled by default, but may be
140-
enabled via `options.proclist` as usual.
111+
enabled via custom scopes ([Scopes](@ref)).
141112

142113
### Future: Network Devices and Topology
143114

docs/src/scopes.md

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@ considered valid.
1515

1616
Let's take the example of a webcam handle generated by VideoIO.jl. This handle
1717
is a C pointer, and thus has process scope. We can open the handle on a given
18-
process, and set the scope of the resulting data to a `ProcessScope()`, which
19-
defaults to the current Julia process:
18+
process, and set the scope of the resulting data to be locked to the current
19+
process with `Dagger.scope` to construct a `ProcessScope`:
2020

2121
```julia
22-
using VideoIO
22+
using VideoIO, Distributed
2323

2424
function get_handle()
2525
handle = VideoIO.opencamera()
2626
proc = Dagger.thunk_processor()
27-
scope = ProcessScope()
27+
scope = Dagger.scope(worker=myid()) # constructs a `ProcessScope`
2828
return Dagger.tochunk(handle, proc, scope)
2929
end
3030

@@ -41,7 +41,7 @@ cam_frame = Dagger.@spawn read(cam_handle)
4141

4242
The `cam_frame` task is executed within any processor on the same process that
4343
the `cam_handle` task was executed on. Of course, the resulting camera frame is
44-
*not* scoped to anywhere specific (denoted as `AnyScope()`), and thus
44+
*not* scoped to anywhere specific (denoted as `AnyScope`), and thus
4545
computations on it may execute anywhere.
4646

4747
You may also encounter situations where you want to use a callable struct (such
@@ -53,13 +53,15 @@ using Flux
5353
m = Chain(...)
5454
# If `m` is only safe to transfer to and execute on this process,
5555
# we can set a `ProcessScope` on it:
56-
result = Dagger.@spawn scope=ProcessScope() m(rand(8,8))
56+
result = Dagger.@spawn scope=Dagger.scope(worker=myid()) m(rand(8,8))
5757
```
5858

5959
Setting a scope on the function treats it as a regular piece of data (like the
6060
arguments to the function), so it participates in the scoping rules described
6161
in the following sections all the same.
6262

63+
[`Dagger.scope`](@ref)
64+
6365
Now, let's try out some other kinds of scopes, starting with `NodeScope`. This
6466
scope encompasses the server that one or more Julia processes may be running
6567
on. Say we want to use memory mapping (mmap) to more efficiently send arrays
@@ -75,6 +77,7 @@ function generate()
7577
arr = Mmap.mmap(path, Matrix{Int}, (64,64))
7678
fill!(arr, 1)
7779
Mmap.sync!(arr)
80+
# Note: Dagger.scope() does not yet support node scopes
7881
Dagger.tochunk(path, Dagger.thunk_processor(), NodeScope())
7982
end
8083

@@ -87,14 +90,14 @@ a = Dagger.@spawn generate()
8790
@assert fetch(Dagger.@spawn consume(a)) == 64*64
8891
```
8992

90-
Whatever server `a` executed on, `b` will also execute on!
93+
Whatever server `a` executed on, `b` will also execute on it!
9194

9295
Finally, we come to the "lowest" scope on the scope hierarchy, the
9396
`ExactScope`. This scope specifies one exact processor as the bounding scope,
94-
and is typically useful in certain limited cases. We won't provide an example
95-
here, because you don't usually need to ever use this scope, but if you already
96-
understand the `NodeScope` and `ProcessScope`, the `ExactScope` should be easy
97-
to figure out.
97+
and is typically useful in certain limited cases (such as data existing only on
98+
a specific GPU). We won't provide an example here, because you don't usually
99+
need to ever use this scope, but if you already understand the `NodeScope` and
100+
`ProcessScope`, the `ExactScope` should be easy to figure out.
98101

99102
## Union Scopes
100103

src/sch/Sch.jl

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import Random: randperm
99
import Base: @invokelatest
1010

1111
import ..Dagger
12-
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, WeakChunk, OSProc, AnyScope
12+
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope
1313
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, default_enabled, get_processors, get_parent, execute!, rmprocs!, addprocs!, thunk_processor, constrain, cputhreadtime
1414

1515
const OneToMany = Dict{Thunk, Set{Thunk}}
@@ -143,13 +143,13 @@ end
143143
Stores DAG-global options to be passed to the Dagger.Sch scheduler.
144144
145145
# Arguments
146-
- `single::Int=0`: Force all work onto worker with specified id. `0` disables
147-
this option.
148-
- `proclist=nothing`: Force scheduler to use one or more processors that are
149-
instances/subtypes of a contained type. Alternatively, a function can be
150-
supplied, and the function will be called with a processor as the sole
151-
argument and should return a `Bool` result to indicate whether or not to use
152-
the given processor. `nothing` enables all default processors.
146+
- `single::Int=0`: (Deprecated) Force all work onto worker with specified id.
147+
`0` disables this option.
148+
- `proclist=nothing`: (Deprecated) Force scheduler to use one or more
149+
processors that are instances/subtypes of a contained type. Alternatively, a
150+
function can be supplied, and the function will be called with a processor as
151+
the sole argument and should return a `Bool` result to indicate whether or not
152+
to use the given processor. `nothing` enables all default processors.
153153
- `allow_errors::Bool=true`: Allow thunks to error without affecting
154154
non-dependent thunks.
155155
- `checkpoint=nothing`: If not `nothing`, uses the provided function to save
@@ -176,11 +176,11 @@ end
176176
Stores Thunk-local options to be passed to the Dagger.Sch scheduler.
177177
178178
# Arguments
179-
- `single::Int=0`: Force thunk onto worker with specified id. `0` disables this
180-
option.
181-
- `proclist=nothing`: Force thunk to use one or more processors that are
182-
instances/subtypes of a contained type. Alternatively, a function can be
183-
supplied, and the function will be called with a processor as the sole
179+
- `single::Int=0`: (Deprecated) Force thunk onto worker with specified id. `0`
180+
disables this option.
181+
- `proclist=nothing`: (Deprecated) Force thunk to use one or more processors
182+
that are instances/subtypes of a contained type. Alternatively, a function can
183+
be supplied, and the function will be called with a processor as the sole
184184
argument and should return a `Bool` result to indicate whether or not to use
185185
the given processor. `nothing` enables all default processors.
186186
- `time_util::Dict{Type,Any}=Dict{Type,Any}()`: Indicates the maximum expected
@@ -634,7 +634,12 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
634634
scope = if task.f isa Chunk
635635
task.f.scope
636636
else
637-
AnyScope()
637+
if task.options.proclist !== nothing
638+
# proclist overrides scope selection
639+
AnyScope()
640+
else
641+
DefaultScope()
642+
end
638643
end
639644
for input in task.inputs
640645
input = unwrap_weak_checked(input)
@@ -682,7 +687,8 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
682687

683688
for proc in local_procs
684689
gproc = get_parent(proc)
685-
if can_use_proc(task, gproc, proc, opts, scope)
690+
can_use, scope = can_use_proc(task, gproc, proc, opts, scope)
691+
if can_use
686692
has_cap, est_time_util, est_alloc_util = has_capacity(state, proc, gproc.pid, opts.time_util, opts.alloc_util, sig)
687693
if has_cap
688694
# Schedule task onto proc
@@ -693,7 +699,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
693699
end
694700
end
695701
end
696-
state.cache[task] = SchedulingException("No processors available, try making proclist more liberal")
702+
state.cache[task] = SchedulingException("No processors available, try widening scope")
697703
state.errored[task] = true
698704
set_failed!(state, task)
699705
@goto pop_task
@@ -706,7 +712,8 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
706712
cap, extra_util = nothing, nothing
707713
procs_found = false
708714
# N.B. if we only have one processor, we need to select it now
709-
if can_use_proc(task, entry.gproc, entry.proc, opts, scope)
715+
can_use, scope = can_use_proc(task, entry.gproc, entry.proc, opts, scope)
716+
if can_use
710717
has_cap, est_time_util, est_alloc_util = has_capacity(state, entry.proc, entry.gproc.pid, opts.time_util, opts.alloc_util, sig)
711718
if has_cap
712719
selected_entry = entry
@@ -723,14 +730,15 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
723730
if procs_found
724731
push!(failed_scheduling, task)
725732
else
726-
state.cache[task] = SchedulingException("No processors available, try making proclist more liberal")
733+
state.cache[task] = SchedulingException("No processors available, try widening scope")
727734
state.errored[task] = true
728735
set_failed!(state, task)
729736
end
730737
@goto pop_task
731738
end
732739

733-
if can_use_proc(task, entry.gproc, entry.proc, opts, scope)
740+
can_use, scope = can_use_proc(task, entry.gproc, entry.proc, opts, scope)
741+
if can_use
734742
has_cap, est_time_util, est_alloc_util = has_capacity(state, entry.proc, entry.gproc.pid, opts.time_util, opts.alloc_util, sig)
735743
if has_cap
736744
# Select this processor

src/sch/eager.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ function init_eager()
1818
ctx = eager_context()
1919
@async try
2020
sopts = SchedulerOptions(;allow_errors=true)
21-
topts = ThunkOptions(;single=1)
21+
scope = Dagger.ExactScope(Dagger.ThreadProc(1, 1))
2222
atexit() do
2323
EAGER_FORCE_KILL[] = true
2424
close(EAGER_THUNK_CHAN)
2525
end
26-
Dagger.compute(ctx, Dagger.delayed(eager_thunk; options=topts)(); options=sopts)
26+
Dagger.compute(ctx, Dagger.delayed(eager_thunk; scope)(); options=sopts)
2727
catch err
2828
iob = IOContext(IOBuffer(), :color=>true)
2929
println(iob, "Error in eager scheduler:")

src/sch/util.jl

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ function get_propagated_options(thunk)
1414
nt = NamedTuple()
1515
for key in thunk.propagates
1616
value = if key == :scope
17-
isa(thunk.f, Chunk) ? thunk.f.scope : AnyScope()
17+
isa(thunk.f, Chunk) ? thunk.f.scope : DefaultScope()
1818
elseif key == :processor
1919
isa(thunk.f, Chunk) ? thunk.f.processor : OSProc()
2020
elseif key in fieldnames(Thunk)
@@ -263,41 +263,55 @@ end
263263

264264
function can_use_proc(task, gproc, proc, opts, scope)
265265
# Check against proclist
266-
if opts.proclist === nothing
267-
if !default_enabled(proc)
268-
@debug "Rejected $proc: !default_enabled(proc)"
269-
return false
270-
end
271-
elseif opts.proclist isa Function
272-
if !Base.invokelatest(opts.proclist, proc)
273-
@debug "Rejected $proc: proclist(proc) == false"
274-
return false
266+
if opts.proclist !== nothing
267+
@warn "The `proclist` option is deprecated, please use scopes instead\nSee https://juliaparallel.org/Dagger.jl/stable/scopes/ for details" maxlog=1
268+
if opts.proclist isa Function
269+
if !Base.invokelatest(opts.proclist, proc)
270+
@debug "[$(task.id)] Rejected $proc: proclist(proc) == false"
271+
return false, scope
272+
end
273+
scope = constrain(scope, Dagger.ExactScope(proc))
274+
elseif opts.proclist isa Vector
275+
if !(typeof(proc) in opts.proclist)
276+
@debug "[$(task.id)] Rejected $proc: !(typeof(proc) in proclist)"
277+
return false, scope
278+
end
279+
scope = constrain(scope,
280+
Dagger.UnionScope(map(Dagger.ProcessorTypeScope, opts.proclist)))
281+
else
282+
throw(SchedulingException("proclist must be a Function, Vector, or nothing"))
275283
end
276-
elseif opts.proclist isa Vector
277-
if !(typeof(proc) in opts.proclist)
278-
@debug "Rejected $proc: !(typeof(proc) in proclist)"
279-
return false
284+
if scope isa Dagger.InvalidScope
285+
@debug "[$(task.id)] Rejected $proc: Not contained in task scope ($scope)"
286+
return false, scope
280287
end
281-
else
282-
throw(SchedulingException("proclist must be a Function, Vector, or nothing"))
283288
end
284289

285290
# Check against single
286291
if opts.single !== nothing
292+
@warn "The `single` option is deprecated, please use scopes instead\nSee https://juliaparallel.org/Dagger.jl/stable/scopes/ for details" maxlog=1
287293
if gproc.pid != opts.single
288-
@debug "Rejected $proc: gproc.pid != single"
289-
return false
294+
@debug "[$(task.id)] Rejected $proc: gproc.pid ($(gproc.pid)) != single ($(opts.single))"
295+
return false, scope
296+
end
297+
scope = constrain(scope, Dagger.ProcessScope(opts.single))
298+
if scope isa Dagger.InvalidScope
299+
@debug "[$(task.id)] Rejected $proc: Not contained in task scope ($scope)"
300+
return false, scope
290301
end
291302
end
292303

293-
# Check scope
304+
# Check against scope
294305
proc_scope = Dagger.ExactScope(proc)
295306
if constrain(scope, proc_scope) isa Dagger.InvalidScope
296-
@debug "Rejected $proc: Task scope ($scope) vs. processor scope ($proc_scope)"
297-
return false
307+
@debug "[$(task.id)] Rejected $proc: Not contained in task scope ($scope)"
308+
return false, scope
298309
end
299310

300-
return true
311+
@label accept
312+
313+
@debug "[$(task.id)] Accepted $proc"
314+
return true, scope
301315
end
302316

303317
function has_capacity(state, p, gp, time_util, alloc_util, sig)

0 commit comments

Comments
 (0)