Skip to content

Commit 5361c43

Browse files
authored
Merge pull request #558 from JuliaParallel/jps/optimizations
Various performance and memory optimizations
2 parents 4172430 + 1899a86 commit 5361c43

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+4183
-2206
lines changed

docs/src/api-dagger/types.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ DTask
1616
## Task Options Types
1717
```@docs
1818
Options
19-
Sch.ThunkOptions
2019
Sch.SchedulerOptions
2120
```
2221

docs/src/checkpointing.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ z = collect(Z)
7171
```
7272

7373
Two changes were made: first, we `enumerate(X.chunks)` so that we can get a
74-
unique index to identify each `chunk`; second, we specify a `ThunkOptions` to
74+
unique index to identify each `chunk`; second, we specify options to
7575
`delayed` with a `checkpoint` and `restore` function that is specialized to
7676
write or read the given chunk to or from a file on disk, respectively. Notice
7777
the usage of `collect` in the `checkpoint` function, and the use of

docs/src/task-spawning.md

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ or `spawn` if it's more convenient:
1212

1313
`Dagger.spawn(f, Dagger.Options(options), args...; kwargs...)`
1414

15-
When called, it creates an [`DTask`](@ref) (also known as a "thunk" or
16-
"task") object representing a call to function `f` with the arguments `args` and
17-
keyword arguments `kwargs`. If it is called with other thunks as args/kwargs,
15+
When called, it creates an [`DTask`](@ref) (also known as a "task" or
16+
"thunk") object representing a call to function `f` with the arguments `args` and
17+
keyword arguments `kwargs`. If it is called with other tasks as args/kwargs,
1818
such as in `Dagger.@spawn f(Dagger.@spawn g())`, then, in this example, the
1919
function `f` gets passed the results of executing `g()`, once that result is
2020
available. If `g()` isn't yet finished executing, then the execution of `f`
@@ -29,9 +29,17 @@ it'll be passed as-is to the function `f` (with some exceptions).
2929

3030
!!! note "Task / thread occupancy"
3131
By default, `Dagger` assumes that tasks saturate the thread they are running on and does not try to schedule other tasks on the thread.
32-
This default can be controlled by specifying [`Sch.ThunkOptions`](@ref) (more details can be found under [Scheduler and Thunk options](@ref)).
32+
This default can be controlled by specifying [`Options`](@ref) (more details can be found under [Task and Scheduler options](@ref)).
3333
The section [Changing the thread occupancy](@ref) shows a runnable example of how to achieve this.
3434

35+
## Options
36+
37+
The [`Options`](@ref Dagger.Options) struct in the second argument position is
38+
optional; if provided, it is passed to the scheduler to control its
39+
behavior. [`Options`](@ref Dagger.Options) contains option
40+
key-value pairs, which can be any field in [`Options`](@ref)
41+
(see [Task and Scheduler options](@ref)).
42+
3543
## Simple example
3644

3745
Let's see a very simple directed acyclic graph (or DAG) constructed with Dagger:
@@ -51,7 +59,7 @@ s = Dagger.@spawn combine(p, q, r)
5159
@assert fetch(s) == 16
5260
```
5361

54-
The thunks `p`, `q`, `r`, and `s` have the following structure:
62+
The tasks `p`, `q`, `r`, and `s` have the following structure:
5563

5664
![graph](https://user-images.githubusercontent.com/25916/26920104-7b9b5fa4-4c55-11e7-97fb-fe5b9e73cae6.png)
5765

@@ -108,15 +116,16 @@ x::DTask
108116
@assert fetch(x) == 3 # fetch the result of `@spawn`
109117
```
110118

111-
This is useful for nested execution, where an `@spawn`'d thunk calls `@spawn`. This is detailed further in [Dynamic Scheduler Control](@ref).
119+
This is useful for nested execution, where an `@spawn`'d task calls `@spawn`.
120+
This is detailed further in [Dynamic Scheduler Control](@ref).
112121

113122
## Options
114123

115124
The [`Options`](@ref Dagger.Options) struct in the second argument position is
116125
optional; if provided, it is passed to the scheduler to control its
117126
behavior. [`Options`](@ref Dagger.Options) contains a `NamedTuple` of option
118127
key-value pairs, which can be any of:
119-
- Any field in [`Sch.ThunkOptions`](@ref) (see [Scheduler and Thunk options](@ref))
128+
- Any field in [`Options`](@ref) (see [Task and Scheduler options](@ref))
120129
- `meta::Bool` -- Pass the input [`Chunk`](@ref) objects themselves to `f` and
121130
not the value contained in them.
122131

@@ -127,19 +136,19 @@ There are also some extra options that can be passed, although they're considere
127136

128137
## Errors
129138

130-
If a thunk errors while running under the eager scheduler, it will be marked as
131-
having failed, all dependent (downstream) thunks will be marked as failed, and
132-
any future thunks that use a failed thunk as input will fail. Failure can be
139+
If a task errors while running under the eager scheduler, it will be marked as
140+
having failed, all dependent (downstream) tasks will be marked as failed, and
141+
any future tasks that use a failed task as input will fail. Failure can be
133142
determined with `fetch`, which will re-throw the error that the
134-
originally-failing thunk threw. `wait` and `isready` will *not* check whether a
135-
thunk or its upstream failed; they only check if the thunk has completed, error
143+
originally-failing task threw. `wait` and `isready` will *not* check whether a
144+
task or its upstream failed; they only check if the task has completed, error
136145
or not.
137146

138147
This failure behavior is not the default for lazy scheduling ([Lazy API](@ref)),
139-
but can be enabled by setting the scheduler/thunk option ([Scheduler and Thunk options](@ref))
148+
but can be enabled by setting the scheduler/task option ([Task and Scheduler options](@ref))
140149
`allow_error` to `true`. However, this option isn't terribly useful for
141-
non-dynamic usecases, since any thunk failure will propagate down to the output
142-
thunk regardless of where it occurs.
150+
non-dynamic usecases, since any task failure will propagate down to the output
151+
task regardless of where it occurs.
143152

144153
## Cancellation
145154

@@ -198,7 +207,7 @@ end
198207
```
199208

200209
Alternatively, if you want to compute but not fetch the result of a lazy
201-
operation, you can call `compute` on the thunk. This will return a `Chunk`
210+
operation, you can call `compute` on the task. This will return a `Chunk`
202211
object which references the result (see [Chunks](@ref) for more details):
203212

204213
```julia
@@ -215,16 +224,14 @@ Note that, as a legacy API, usage of the lazy API is generally discouraged for m
215224
- Distinct schedulers don't share runtime metrics or learned parameters, thus causing the scheduler to act less intelligently
216225
- Distinct schedulers can't share work or data directly
217226

218-
## Scheduler and Thunk options
227+
## Task and Scheduler options
219228

220229
While Dagger generally "just works", sometimes one needs to exert some more
221230
fine-grained control over how the scheduler allocates work. There are two
222-
parallel mechanisms to achieve this: Scheduler options (from
223-
[`Sch.SchedulerOptions`](@ref)) and Thunk options (from
224-
[`Sch.ThunkOptions`](@ref)). These two options structs contain many shared
225-
options, with the difference being that Scheduler options operate
226-
globally across an entire DAG, and Thunk options operate on a thunk-by-thunk
227-
basis.
231+
parallel mechanisms to achieve this: Task options (from [`Options`](@ref)) and
232+
Scheduler options (from [`Sch.SchedulerOptions`](@ref)). Scheduler
233+
options operate globally across an entire DAG, and Task options operate on a
234+
task-by-task basis.
228235

229236
Scheduler options can be constructed and passed to `collect()` or `compute()`
230237
as the keyword argument `options` for lazy API usage:
@@ -238,7 +245,7 @@ compute(t; options=opts)
238245
collect(t; options=opts)
239246
```
240247

241-
Thunk options can be passed to `@spawn/spawn`, `@par`, and `delayed` similarly:
248+
Task options can be passed to `@spawn/spawn`, `@par`, and `delayed` similarly:
242249

243250
```julia
244251
# Execute on worker 1
@@ -251,8 +258,9 @@ delayed(+; single=1)(1, 2)
251258

252259
## Changing the thread occupancy
253260

254-
One of the supported [`Sch.ThunkOptions`](@ref) is the `occupancy` keyword.
255-
This keyword can be used to communicate that a task is not expected to fully saturate a CPU core (e.g. due to being IO-bound).
261+
One of the supported [`Options`](@ref) is the `occupancy` keyword.
262+
This keyword can be used to communicate that a task is not expected to fully
263+
saturate a CPU core (e.g. due to being IO-bound).
256264
The basic usage looks like this:
257265

258266
```julia

ext/GraphVizExt.jl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,22 @@ Requires the `all_task_deps` event enabled in `enable_logging!`
2121
2222
Options:
2323
- `disconnected`: If `true`, render disconnected vertices (tasks or arguments without upstream/downstream dependencies)
24+
- `show_data`: If `true`, show the data dependencies in the graph
2425
- `color_by`: How to color tasks; if `:fn`, then color by unique function name, if `:proc`, then color by unique processor
2526
- `layout_engine`: The layout engine to use for GraphViz rendering
2627
- `times`: If `true`, annotate each task with its start and finish times
2728
- `times_digits`: Number of digits to display in the time annotations
2829
- `colors`: A list of colors to use for coloring tasks
2930
- `name_to_color`: A function that maps task names to colors
3031
"""
31-
function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false,
32+
function Dagger.render_logs(logs::Dict, ::Val{:graphviz};
33+
disconnected=false, show_data::Bool=true,
3234
color_by=:fn, layout_engine="dot",
3335
times::Bool=true, times_digits::Integer=3,
3436
colors=Dagger.Viz.default_colors,
3537
name_to_color=Dagger.Viz.name_to_color)
36-
dot = Dagger.Viz.logs_to_dot(logs; disconnected, times, times_digits,
38+
dot = Dagger.Viz.logs_to_dot(logs; disconnected, show_data,
39+
times, times_digits,
3740
color_by, colors, name_to_color)
3841
gv = GraphViz.Graph(dot)
3942
GraphViz.layout!(gv; engine=layout_engine)

src/Dagger.jl

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import Random: AbstractRNG
1717
import UUIDs: UUID, uuid4
1818

1919
if !isdefined(Base, :ScopedValues)
20-
import ScopedValues: ScopedValue, with
20+
import ScopedValues: ScopedValue, @with, with
2121
else
22-
import Base.ScopedValues: ScopedValue, with
22+
import Base.ScopedValues: ScopedValue, @with, with
2323
end
2424
import TaskLocalValues: TaskLocalValue
2525

@@ -32,7 +32,6 @@ import TimespanLogging: timespan_start, timespan_finish
3232

3333
import Adapt
3434

35-
# Preferences
3635
import Preferences: @load_preference, @set_preferences!
3736

3837
if @load_preference("distributed-package") == "DistributedNext"
@@ -43,29 +42,35 @@ else
4342
import Distributed: Future, RemoteChannel, myid, workers, nworkers, procs, remotecall, remotecall_wait, remotecall_fetch, check_same_host
4443
end
4544

45+
import MacroTools: @capture, prewalk
46+
4647
include("lib/util.jl")
4748
include("utils/dagdebug.jl")
4849

4950
# Distributed data
5051
include("utils/locked-object.jl")
5152
include("utils/tasks.jl")
52-
53-
import MacroTools: @capture, prewalk
54-
55-
include("options.jl")
53+
include("utils/reuse.jl")
5654
include("processor.jl")
5755
include("threadproc.jl")
56+
include("sch_options.jl")
5857
include("context.jl")
5958
include("utils/processors.jl")
59+
include("scopes.jl")
60+
include("utils/scopes.jl")
61+
include("chunks.jl")
62+
include("utils/signature.jl")
63+
include("options.jl")
6064
include("dtask.jl")
6165
include("cancellation.jl")
6266
include("task-tls.jl")
63-
include("scopes.jl")
64-
include("utils/scopes.jl")
67+
include("argument.jl")
6568
include("queue.jl")
6669
include("thunk.jl")
70+
include("utils/fetch.jl")
71+
include("utils/chunks.jl")
72+
include("utils/logging.jl")
6773
include("submission.jl")
68-
include("chunks.jl")
6974
include("memory-spaces.jl")
7075

7176
# Task scheduling
@@ -85,15 +90,15 @@ include("stream.jl")
8590
include("stream-buffers.jl")
8691
include("stream-transfer.jl")
8792

93+
# File IO
94+
include("file-io.jl")
95+
8896
# Array computations
8997
include("array/darray.jl")
9098
include("array/alloc.jl")
9199
include("array/map-reduce.jl")
92100
include("array/copy.jl")
93-
94-
# File IO
95-
include("file-io.jl")
96-
101+
include("array/random.jl")
97102
include("array/operators.jl")
98103
include("array/indexing.jl")
99104
include("array/setindex.jl")
@@ -104,19 +109,19 @@ include("array/linalg.jl")
104109
include("array/mul.jl")
105110
include("array/cholesky.jl")
106111
include("array/lu.jl")
107-
include("array/random.jl")
108112

109113
import KernelAbstractions, Adapt
110114

111115
# GPU
112116
include("gpu.jl")
113117

114-
# Logging and Visualization
118+
# Logging
119+
include("utils/logging-events.jl")
120+
121+
# Visualization
115122
include("visualization.jl")
116123
include("ui/gantt-common.jl")
117124
include("ui/gantt-text.jl")
118-
include("utils/logging-events.jl")
119-
include("utils/logging.jl")
120125
include("utils/viz.jl")
121126

122127
"""

src/argument.jl

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
mutable struct ArgPosition
2+
positional::Bool
3+
idx::Int
4+
kw::Symbol
5+
end
6+
ArgPosition() = ArgPosition(true, 0, :NULL)
7+
ArgPosition(pos::ArgPosition) = ArgPosition(pos.positional, pos.idx, pos.kw)
8+
ispositional(pos::ArgPosition) = pos.positional
9+
iskw(pos::ArgPosition) = !pos.positional
10+
raw_position(pos::ArgPosition) = ispositional(pos) ? pos.idx : pos.kw
11+
function pos_idx(pos::ArgPosition)
12+
@assert pos.positional
13+
@assert pos.idx > 0
14+
@assert pos.kw == :NULL
15+
return pos.idx
16+
end
17+
function pos_kw(pos::ArgPosition)
18+
@assert !pos.positional
19+
@assert pos.idx == 0
20+
@assert pos.kw != :NULL
21+
return pos.kw
22+
end
23+
mutable struct Argument
24+
pos::ArgPosition
25+
value
26+
end
27+
Argument(pos::Integer, value) = Argument(ArgPosition(true, pos, :NULL), value)
28+
Argument(kw::Symbol, value) = Argument(ArgPosition(false, 0, kw), value)
29+
ispositional(arg::Argument) = ispositional(arg.pos)
30+
iskw(arg::Argument) = iskw(arg.pos)
31+
pos_idx(arg::Argument) = pos_idx(arg.pos)
32+
pos_kw(arg::Argument) = pos_kw(arg.pos)
33+
raw_position(arg::Argument) = raw_position(arg.pos)
34+
value(arg::Argument) = arg.value
35+
valuetype(arg::Argument) = typeof(arg.value)
36+
Base.iterate(arg::Argument) = (arg.pos, true)
37+
function Base.iterate(arg::Argument, state::Bool)
38+
if state
39+
return (arg.value, false)
40+
else
41+
return nothing
42+
end
43+
end
44+
45+
Base.copy(arg::Argument) = Argument(ArgPosition(arg.pos), arg.value)
46+
chunktype(arg::Argument) = chunktype(value(arg))

src/array/alloc.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ function stage(ctx, a::AllocateArray)
8686
args = a.want_index ? (i, size(x)) : (size(x),)
8787

8888
if isnothing(a.procgrid)
89-
scope = get_options(:compute_scope, get_options(:scope, DefaultScope()))
89+
scope = get_compute_scope()
9090
else
9191
scope = ExactScope(a.procgrid[CartesianIndex(mod1.(Tuple(I), size(a.procgrid))...)])
9292
end

src/array/darray.jl

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ domainchunks(d::DArray) = d.subdomains
173173
size(x::DArray) = size(domain(x))
174174
stage(ctx, c::DArray) = c
175175

176-
function Base.collect(d::DArray; tree=false)
176+
function Base.collect(d::DArray{T,N}; tree=false, copyto=false) where {T,N}
177177
a = fetch(d)
178178
if isempty(d.chunks)
179179
return Array{eltype(d)}(undef, size(d)...)
@@ -183,6 +183,13 @@ function Base.collect(d::DArray; tree=false)
183183
return fetch(a.chunks[1])
184184
end
185185

186+
if copyto
187+
C = Array{T,N}(undef, size(a))
188+
DC = view(C, Blocks(size(a)...))
189+
copyto!(DC, a)
190+
return C
191+
end
192+
186193
dimcatfuncs = [(x...) -> d.concat(x..., dims=i) for i in 1:ndims(d)]
187194
if tree
188195
collect(fetch(treereduce_nd(map(x -> ((args...,) -> Dagger.@spawn x(args...)) , dimcatfuncs), a.chunks)))
@@ -458,7 +465,7 @@ function stage(ctx::Context, d::Distribute)
458465
# TODO: fix hashing
459466
#hash = uhash(idx, Base.hash(Distribute, Base.hash(d.data)))
460467
if isnothing(d.procgrid)
461-
scope = get_options(:compute_scope, get_options(:scope, DefaultScope()))
468+
scope = get_compute_scope()
462469
else
463470
scope = ExactScope(d.procgrid[CartesianIndex(mod1.(Tuple(I), size(d.procgrid))...)])
464471
end
@@ -478,7 +485,7 @@ function stage(ctx::Context, d::Distribute)
478485
#hash = uhash(c, Base.hash(Distribute, Base.hash(d.data)))
479486
c = d.domainchunks[I]
480487
if isnothing(d.procgrid)
481-
scope = get_options(:compute_scope, get_options(:scope, DefaultScope()))
488+
scope = get_compute_scope()
482489
else
483490
scope = ExactScope(d.procgrid[CartesianIndex(mod1.(Tuple(I), size(d.procgrid))...)])
484491
end

0 commit comments

Comments
 (0)