Skip to content

Commit 0bdfcc5

Browse files
Small optimizations on Scheduler creation (#148)
* make ChunkingArgs a bitstype * make DynamicScheduler a bitstype * dispatch symbol to its scheduler type by hand to improve performance * fix failed tests * inline tmapreduce * don't run boxing tests on 1 thread * revert make ChunkingArgs a bitstype * revert dispatch symbol to its scheduler type * nothing instead of sentinel values for chunking arguments * update ChunkingArgs docstring --------- Co-authored-by: Mason Protter <[email protected]>
1 parent 2affc74 commit 0bdfcc5

File tree

3 files changed

+153
-158
lines changed

3 files changed

+153
-158
lines changed

src/implementation.jl

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,33 @@
11
module Implementation
22

33
import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcollect
4-
using OhMyThreads: @spawn, @spawnat, WithTaskLocals, promise_task_local, ChannelLike, allowing_boxed_captures
4+
using OhMyThreads: @spawn, @spawnat, WithTaskLocals, promise_task_local, ChannelLike,
5+
allowing_boxed_captures
56
using OhMyThreads.Tools: nthtid
67
using OhMyThreads: Scheduler,
78
DynamicScheduler, StaticScheduler, GreedyScheduler,
89
SerialScheduler
9-
using OhMyThreads.Schedulers: chunking_enabled,
10+
using OhMyThreads.Schedulers: chunksplitter_mode, chunking_enabled,
1011
nchunks, chunksize, chunksplit, minchunksize, has_chunksplit,
12+
has_minchunksize, chunkingargs_to_kwargs,
1113
chunking_mode, ChunkingMode, NoChunking,
1214
FixedSize, FixedCount, scheduler_from_symbol, NotGiven,
13-
isgiven
15+
isgiven, threadpool as get_threadpool
1416
using Base: @propagate_inbounds
1517
using Base.Threads: nthreads, @threads
1618
using BangBang: append!!
1719
using ChunkSplitters: ChunkSplitters, index_chunks, Consecutive
1820
using ChunkSplitters.Internals: AbstractChunks, IndexChunks
1921

20-
const MaybeScheduler = Union{NotGiven, Scheduler, Symbol}
22+
const MaybeScheduler = Union{NotGiven, Scheduler, Symbol, Val}
2123

2224
include("macro_impl.jl")
2325

24-
function _index_chunks(sched, arg)
26+
@inline function _index_chunks(sched, arg)
2527
C = chunking_mode(sched)
26-
@assert C != NoChunking
27-
if C == FixedCount
28-
msz = isnothing(minchunksize(sched)) ? nothing : min(minchunksize(sched), length(arg))
29-
index_chunks(arg;
30-
n = nchunks(sched),
31-
split = chunksplit(sched),
32-
minsize = msz)::IndexChunks{
33-
typeof(arg), ChunkSplitters.Internals.FixedCount}
34-
elseif C == FixedSize
35-
index_chunks(arg;
36-
size = chunksize(sched),
37-
split = chunksplit(sched),
38-
minsize = minchunksize(sched))::IndexChunks{
39-
typeof(arg), ChunkSplitters.Internals.FixedSize}
40-
end
28+
@assert chunking_enabled(sched)
29+
kwargs = chunkingargs_to_kwargs(sched, arg)
30+
return index_chunks(arg; kwargs...)::IndexChunks{typeof(arg), chunksplitter_mode(C)}
4131
end
4232

4333
function _scheduler_from_userinput(scheduler::MaybeScheduler; kwargs...)
@@ -75,19 +65,23 @@ function has_multiple_chunks(scheduler, coll)
7565
if C == NoChunking || coll isa Union{AbstractChunks, ChunkSplitters.Internals.Enumerate}
7666
length(coll) > 1
7767
elseif C == FixedCount
78-
if isnothing(minchunksize(scheduler))
68+
if !has_minchunksize(scheduler)
7969
mcs = 1
8070
else
8171
mcs = max(min(minchunksize(scheduler), length(coll)), 1)
8272
end
8373
min(length(coll) ÷ mcs, nchunks(scheduler)) > 1
8474
elseif C == FixedSize
8575
length(coll) ÷ chunksize(scheduler) > 1
76+
else
77+
throw(ArgumentError("Unknown chunking mode: $C."))
8678
end
8779
end
8880

89-
90-
function tmapreduce(f, op, Arrs...;
81+
# we can inline this function because we use @noinline on the main function
82+
# it can save some time in cases where we do not hit the main function (e.g. when
83+
# fallback to mapreduce without any threading)
84+
@inline function tmapreduce(f, op, Arrs...;
9185
scheduler::MaybeScheduler = NotGiven(),
9286
outputtype::Type = Any,
9387
init = NotGiven(),
@@ -122,7 +116,7 @@ function _tmapreduce(f,
122116
::Type{OutputType},
123117
scheduler::DynamicScheduler,
124118
mapreduce_kwargs)::OutputType where {OutputType}
125-
(; threadpool) = scheduler
119+
threadpool = get_threadpool(scheduler)
126120
check_all_have_same_indices(Arrs)
127121
throw_if_boxed_captures(f, op)
128122
if chunking_enabled(scheduler)
@@ -151,7 +145,7 @@ function _tmapreduce(f,
151145
::Type{OutputType},
152146
scheduler::DynamicScheduler,
153147
mapreduce_kwargs)::OutputType where {OutputType, T}
154-
(; threadpool) = scheduler
148+
threadpool = get_threadpool(scheduler)
155149
throw_if_boxed_captures(f, op)
156150
tasks = map(only(Arrs)) do idcs
157151
@spawn threadpool promise_task_local(f)(idcs)
@@ -442,7 +436,7 @@ function tmap(f,
442436
if chunking_enabled(_scheduler)
443437
if _scheduler isa DynamicScheduler
444438
_scheduler = DynamicScheduler(;
445-
threadpool = _scheduler.threadpool,
439+
threadpool = threadpool(_scheduler),
446440
chunking = false)
447441
elseif _scheduler isa StaticScheduler
448442
_scheduler = StaticScheduler(; chunking = false)
@@ -468,7 +462,7 @@ function _tmap(scheduler::DynamicScheduler{NoChunking},
468462
f,
469463
A::AbstractArray,
470464
_Arrs::AbstractArray...;)
471-
(; threadpool) = scheduler
465+
threadpool = get_threadpool(scheduler)
472466
Arrs = (A, _Arrs...)
473467
throw_if_boxed_captures(f)
474468
tasks = map(eachindex(A)) do i
@@ -486,7 +480,7 @@ function _tmap(scheduler::DynamicScheduler{NoChunking},
486480
f,
487481
A::Union{AbstractChunks, ChunkSplitters.Internals.Enumerate},
488482
_Arrs::AbstractArray...)
489-
(; threadpool) = scheduler
483+
threadpool = get_threadpool(scheduler)
490484
throw_if_boxed_captures(f)
491485
tasks = map(A) do idcs
492486
@spawn threadpool promise_task_local(f)(idcs)

src/schedulers.jl

Lines changed: 68 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
module Schedulers
22

33
using Base.Threads: nthreads
4-
using ChunkSplitters: Split, Consecutive, RoundRobin
4+
using ChunkSplitters: Split, Consecutive, RoundRobin, ChunkSplitters
55

66
# Used to indicate that a keyword argument has not been set by the user.
77
# We don't use Nothing because nothing maybe sometimes be a valid user input (e.g. for init)
@@ -65,70 +65,61 @@ struct NoChunking <: ChunkingMode end
6565
struct FixedCount <: ChunkingMode end
6666
struct FixedSize <: ChunkingMode end
6767

68+
chunksplitter_mode(::Type{FixedCount}) = ChunkSplitters.Internals.FixedCount
69+
chunksplitter_mode(::Type{FixedSize}) = ChunkSplitters.Internals.FixedSize
70+
6871
"""
69-
ChunkingArgs{C, S <: Split}(n::Int, size::Int, split::S)
70-
ChunkingArgs(Sched::Type{<:Scheduler}, n::MaybeInteger, size::MaybeInteger, split::Union{Symbol, Split}; chunking)
72+
ChunkingArgs{C, S <: Split}(n::Union{Int, Nothing}, size::Union{Int, Nothing}, minsize::Union{Int, Nothing}, split::S)
73+
ChunkingArgs(Sched::Type{<:Scheduler}; n = nothing, size = nothing, minsize = nothing, split::Union{Symbol, Split}; chunking)
7174
7275
Stores all the information needed for chunking. The type parameter `C` is the chunking mode
73-
(`NoChunking`, `FixedSize`, or `FixedCount`).
74-
75-
`MaybeInteger` arguments are arguments that can be `NotGiven`. If it is the case, the
76-
constructor automatically throws errors or gives defaults values while taking into account
77-
the kind of scheduler (provided by `Sched`, e.g. `DynamicScheduler`). The `chunking` keyword
78-
argument is a boolean and if true, everything is skipped and `C = NoChunking`.
76+
(`NoChunking`, `FixedSize`, or `FixedCount`). The `chunking` keyword argument is a boolean
77+
and if true, everything is skipped and `C = NoChunking`.
7978
8079
Once the object is created, use the `has_fieldname(object)` function (e.g. `has_size(object)`)
81-
to know if the field is effectively used, since it is no longer
82-
`NotGiven` for type stability.
80+
to know if the field is effectively used.
8381
"""
8482
struct ChunkingArgs{C, S <: Split}
85-
n::Int
86-
size::Int
87-
split::S
83+
n::Union{Int, Nothing}
84+
size::Union{Int, Nothing}
8885
minsize::Union{Int, Nothing}
86+
split::S
87+
end
88+
function ChunkingArgs(::Type{NoChunking})
89+
ChunkingArgs{NoChunking, NoSplit}(nothing, nothing, nothing, NoSplit())
8990
end
90-
ChunkingArgs(::Type{NoChunking}) = ChunkingArgs{NoChunking, NoSplit}(-1, -1, NoSplit(), nothing)
9191
function ChunkingArgs(
92-
Sched::Type{<:Scheduler},
93-
n::MaybeInteger,
94-
size::MaybeInteger,
95-
split::Union{Symbol, Split};
96-
minsize=nothing,
92+
Sched::Type{<:Scheduler};
93+
n = nothing,
94+
size = nothing,
95+
minsize = nothing,
96+
split::Union{Symbol, Split},
9797
chunking
9898
)
9999
chunking || return ChunkingArgs(NoChunking)
100100

101-
if !isgiven(n) && !isgiven(size)
101+
if isnothing(n) && isnothing(size)
102102
n = default_nchunks(Sched)
103-
size = -1
104-
else
105-
n = isgiven(n) ? n : -1
106-
size = isgiven(size) ? size : -1
103+
elseif !isnothing(n) && !isnothing(size)
104+
throw(ArgumentError("nchunks and chunksize are mutually exclusive"))
107105
end
108-
109-
chunking_mode = size > 0 ? FixedSize : FixedCount
106+
chunking_mode = isnothing(n) ? FixedSize : FixedCount
110107
split = _parse_split(split)
111-
result = ChunkingArgs{chunking_mode, typeof(split)}(n, size, split, minsize)
112-
113-
# argument names in error messages are those of the scheduler constructor instead
114-
# of ChunkingArgs because the user should not be aware of the ChunkingArgs type
115-
# (e.g. `nchunks` instead of `n`)
116-
if !(has_n(result) || has_size(result))
117-
throw(ArgumentError("Either `nchunks` or `chunksize` must be a positive integer (or chunking=false)."))
118-
end
119-
if has_n(result) && has_size(result)
120-
throw(ArgumentError("`nchunks` and `chunksize` are mutually exclusive and only one of them may be a positive integer"))
121-
end
122-
return result
108+
return ChunkingArgs{chunking_mode, typeof(split)}(n, size, minsize, split)
123109
end
124110

125111
chunking_mode(::ChunkingArgs{C}) where {C} = C
126-
has_n(ca::ChunkingArgs) = ca.n > 0
127-
has_size(ca::ChunkingArgs) = ca.size > 0
112+
has_n(ca::ChunkingArgs) = !isnothing(ca.n)
113+
has_size(ca::ChunkingArgs) = !isnothing(ca.size)
128114
has_split(::ChunkingArgs{C, S}) where {C, S} = S !== NoSplit
129115
has_minsize(ca::ChunkingArgs) = !isnothing(ca.minsize)
130116
chunking_enabled(ca::ChunkingArgs) = chunking_mode(ca) != NoChunking
131117

118+
function chunkingargs_to_kwargs(ca::ChunkingArgs, arg)
119+
minsize = !has_minsize(ca) ? nothing : min(ca.minsize, length(arg))
120+
return (; ca.n, ca.size, minsize, ca.split)
121+
end
122+
132123
_chunkingstr(ca::ChunkingArgs{NoChunking}) = "none"
133124
function _chunkingstr(ca::ChunkingArgs{FixedCount})
134125
str = "fixed count ($(ca.n)), split :$(_splitid(ca.split))"
@@ -157,6 +148,10 @@ has_chunksize(sched::Scheduler) = has_size(chunking_args(sched))
157148
has_chunksplit(sched::Scheduler) = has_split(chunking_args(sched))
158149
has_minchunksize(sched::Scheduler) = has_minsize(chunking_args(sched))
159150

151+
function chunkingargs_to_kwargs(sched::Scheduler, arg)
152+
chunkingargs_to_kwargs(chunking_args(sched), arg)
153+
end
154+
160155
chunking_mode(sched::Scheduler) = chunking_mode(chunking_args(sched))
161156
chunking_enabled(sched::Scheduler) = chunking_enabled(chunking_args(sched))
162157
_chunkingstr(sched::Scheduler) = _chunkingstr(chunking_args(sched))
@@ -203,43 +198,45 @@ with other multithreaded code.
203198
* Possible options are `:default` and `:interactive`.
204199
* The high-priority pool `:interactive` should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes.
205200
"""
206-
struct DynamicScheduler{C <: ChunkingMode, S <: Split} <: Scheduler
207-
threadpool::Symbol
201+
struct DynamicScheduler{C <: ChunkingMode, S <: Split, threadpool} <: Scheduler
208202
chunking_args::ChunkingArgs{C, S}
209203

210204
function DynamicScheduler(threadpool::Symbol, ca::ChunkingArgs)
211205
if !(threadpool in (:default, :interactive))
212206
throw(ArgumentError("threadpool must be either :default or :interactive"))
213207
end
214-
new{chunking_mode(ca), typeof(ca.split)}(threadpool, ca)
208+
new{chunking_mode(ca), typeof(ca.split), threadpool}(ca)
215209
end
216210
end
217211

218212
function DynamicScheduler(;
219213
threadpool::Symbol = :default,
220-
nchunks::MaybeInteger = NotGiven(),
221-
ntasks::MaybeInteger = NotGiven(), # "alias" for nchunks
222-
chunksize::MaybeInteger = NotGiven(),
223-
chunking::Bool = true,
214+
nchunks = nothing,
215+
ntasks = nothing, # "alias" for nchunks
216+
chunksize = nothing,
224217
split::Union{Split, Symbol} = Consecutive(),
225-
minchunksize::Union{Nothing, Int}=nothing)
226-
if isgiven(ntasks)
227-
if isgiven(nchunks)
218+
minchunksize = nothing,
219+
chunking::Bool = true
220+
)
221+
if !isnothing(ntasks)
222+
if !isnothing(nchunks)
228223
throw(ArgumentError("For the dynamic scheduler, nchunks and ntasks are aliases and only one may be provided"))
229224
end
230225
nchunks = ntasks
231226
end
232-
ca = ChunkingArgs(DynamicScheduler, nchunks, chunksize, split; chunking, minsize=minchunksize)
227+
ca = ChunkingArgs(DynamicScheduler;
228+
n = nchunks, size = chunksize, minsize = minchunksize, split, chunking)
233229
return DynamicScheduler(threadpool, ca)
234230
end
235231
from_symbol(::Val{:dynamic}) = DynamicScheduler
236232
chunking_args(sched::DynamicScheduler) = sched.chunking_args
233+
threadpool(::DynamicScheduler{C, S, T}) where {C, S, T} = T
237234

238235
function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::DynamicScheduler)
239236
print(io, "DynamicScheduler", "\n")
240237
cstr = _chunkingstr(s.chunking_args)
241238
println(io, "├ Chunking: ", cstr)
242-
print(io, "└ Threadpool: ", s.threadpool)
239+
print(io, "└ Threadpool: ", threadpool(s))
243240
end
244241

245242
"""
@@ -278,19 +275,21 @@ struct StaticScheduler{C <: ChunkingMode, S <: Split} <: Scheduler
278275
end
279276

280277
function StaticScheduler(;
281-
nchunks::MaybeInteger = NotGiven(),
282-
ntasks::MaybeInteger = NotGiven(), # "alias" for nchunks
283-
chunksize::MaybeInteger = NotGiven(),
284-
chunking::Bool = true,
278+
nchunks = nothing,
279+
ntasks = nothing, # "alias" for nchunks
280+
chunksize = nothing,
281+
minchunksize = nothing,
285282
split::Union{Split, Symbol} = Consecutive(),
286-
minchunksize::Union{Nothing, Int} = nothing)
287-
if isgiven(ntasks)
288-
if isgiven(nchunks)
283+
chunking::Bool = true
284+
)
285+
if !isnothing(ntasks)
286+
if !isnothing(nchunks)
289287
throw(ArgumentError("For the static scheduler, nchunks and ntasks are aliases and only one may be provided"))
290288
end
291289
nchunks = ntasks
292290
end
293-
ca = ChunkingArgs(StaticScheduler, nchunks, chunksize, split; chunking, minsize=minchunksize)
291+
ca = ChunkingArgs(StaticScheduler;
292+
n = nchunks, size = chunksize, minsize = minchunksize, split, chunking)
294293
return StaticScheduler(ca)
295294
end
296295
from_symbol(::Val{:static}) = StaticScheduler
@@ -349,15 +348,17 @@ end
349348

350349
function GreedyScheduler(;
351350
ntasks::Integer = nthreads(),
352-
nchunks::MaybeInteger = NotGiven(),
353-
chunksize::MaybeInteger = NotGiven(),
354-
chunking::Bool = false,
351+
nchunks = nothing,
352+
chunksize = nothing,
353+
minchunksize = nothing,
355354
split::Union{Split, Symbol} = RoundRobin(),
356-
minchunksize::Union{Nothing, Int} = nothing)
357-
if isgiven(nchunks) || isgiven(chunksize)
355+
chunking::Bool = false
356+
)
357+
if !(isnothing(nchunks) && isnothing(chunksize))
358358
chunking = true
359359
end
360-
ca = ChunkingArgs(GreedyScheduler, nchunks, chunksize, split; chunking, minsize=minchunksize)
360+
ca = ChunkingArgs(GreedyScheduler;
361+
n = nchunks, size = chunksize, minsize = minchunksize, split, chunking)
361362
return GreedyScheduler(ntasks, ca)
362363
end
363364
from_symbol(::Val{:greedy}) = GreedyScheduler

0 commit comments

Comments
 (0)