Skip to content

Commit 96aa4dc

Browse files
committed
Add scope for union of processor type
Adds ProcessorTypeScope(T), which matches processors that are a subtype of T. In the process, also expands the scope system to support lazily-evaluated scoping behavior, such as doing a subtype check or checking for `default_enabled`, via "taints".
1 parent 8d6b234 commit 96aa4dc

File tree

6 files changed

+115
-14
lines changed

6 files changed

+115
-14
lines changed

src/sch/Sch.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import Random: randperm
99
import Base: @invokelatest
1010

1111
import ..Dagger
12-
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, WeakChunk, OSProc, AnyScope
12+
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, WeakChunk, OSProc, DefaultScope
1313
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, default_enabled, get_processors, get_parent, execute!, rmprocs!, addprocs!, thunk_processor, constrain, cputhreadtime
1414

1515
const OneToMany = Dict{Thunk, Set{Thunk}}
@@ -634,7 +634,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
634634
scope = if task.f isa Chunk
635635
task.f.scope
636636
else
637-
AnyScope()
637+
DefaultScope()
638638
end
639639
for input in task.inputs
640640
input = unwrap_weak_checked(input)
@@ -693,7 +693,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
693693
end
694694
end
695695
end
696-
state.cache[task] = SchedulingException("No processors available, try making proclist more liberal")
696+
state.cache[task] = SchedulingException("No processors available, try widening scope")
697697
state.errored[task] = true
698698
set_failed!(state, task)
699699
@goto pop_task
@@ -723,7 +723,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
723723
if procs_found
724724
push!(failed_scheduling, task)
725725
else
726-
state.cache[task] = SchedulingException("No processors available, try making proclist more liberal")
726+
state.cache[task] = SchedulingException("No processors available, try widening scope")
727727
state.errored[task] = true
728728
set_failed!(state, task)
729729
end

src/sch/util.jl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ function get_propagated_options(thunk)
1414
nt = NamedTuple()
1515
for key in thunk.propagates
1616
value = if key == :scope
17-
isa(thunk.f, Chunk) ? thunk.f.scope : AnyScope()
17+
isa(thunk.f, Chunk) ? thunk.f.scope : DefaultScope()
1818
elseif key == :processor
1919
isa(thunk.f, Chunk) ? thunk.f.processor : OSProc()
2020
elseif key in fieldnames(Thunk)
@@ -285,18 +285,20 @@ function can_use_proc(task, gproc, proc, opts, scope)
285285
# Check against single
286286
if opts.single !== nothing
287287
if gproc.pid != opts.single
288-
@debug "Rejected $proc: gproc.pid != single"
288+
@debug "[$(task.id)] Rejected $proc: gproc.pid ($(gproc.pid)) != single ($(opts.single))"
289289
return false
290290
end
291291
end
292292

293293
# Check scope
294294
proc_scope = Dagger.ExactScope(proc)
295295
if constrain(scope, proc_scope) isa Dagger.InvalidScope
296-
@debug "Rejected $proc: Task scope ($scope) vs. processor scope ($proc_scope)"
296+
@debug "[$(task.id)] Rejected $proc: Not contained in task scope ($scope)"
297297
return false
298298
end
299299

300+
@debug "[$(task.id)] Accepted $proc"
301+
300302
return true
301303
end
302304

src/scopes.jl

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,28 @@
1-
export AnyScope, UnionScope, NodeScope, ProcessScope, ExactScope
1+
export AnyScope, DefaultScope, UnionScope, NodeScope, ProcessScope, ExactScope, ProcessorTypeScope
22

33
abstract type AbstractScope end
44

5-
"Default scope that is unconstrained."
5+
"Widest scope that contains all processors."
66
struct AnyScope <: AbstractScope end
77

8+
abstract type AbstractScopeTaint end
9+
10+
"Taints a scope for later evaluation."
11+
struct TaintScope <: AbstractScope
12+
scope::AbstractScope
13+
taints::Set{AbstractScopeTaint}
14+
end
15+
Base.:(==)(ts1::TaintScope, ts2::TaintScope) =
16+
ts1.scope == ts2.scope &&
17+
length(ts1.taints) == length(ts2.taints) &&
18+
all(collect(ts1.taints) .== collect(ts2.taints))
19+
20+
struct DefaultEnabledTaint <: AbstractScopeTaint end
21+
22+
"Default scope that contains the set of `default_enabled` processors."
23+
DefaultScope() = TaintScope(AnyScope(),
24+
Set{AbstractScopeTaint}([DefaultEnabledTaint()]))
25+
826
"Union of two or more scopes."
927
struct UnionScope <: AbstractScope
1028
scopes::Tuple
@@ -35,6 +53,13 @@ end
3553
ProcessScope(p::OSProc) = ProcessScope(p.pid)
3654
ProcessScope() = ProcessScope(myid())
3755

56+
"Scoped to any processor with a given supertype."
57+
struct ProcessorTypeTaint{T} <: AbstractScopeTaint end
58+
59+
ProcessorTypeScope(T) =
60+
TaintScope(AnyScope(),
61+
Set{AbstractScopeTaint}([ProcessorTypeTaint{T}()]))
62+
3863
"Scoped to a specific processor."
3964
struct ExactScope <: AbstractScope
4065
parent::ProcessScope
@@ -58,7 +83,46 @@ Base.isless(x, ::AnyScope) = true
5883
constrain(::AnyScope, ::AnyScope) = AnyScope()
5984
constrain(::AnyScope, y) = y
6085

86+
# N.B. TaintScope taints constraining (until encountering an `ExactScope`) to
87+
# allow lazy evaluation of the taint matching on the final processor
88+
taint_match(::DefaultEnabledTaint, x::Processor) = default_enabled(x)
89+
taint_match(::ProcessorTypeTaint{T}, x::Processor) where T = x isa T
90+
Base.isless(::TaintScope, ::TaintScope) = false
91+
Base.isless(::TaintScope, ::AnyScope) = true
92+
Base.isless(::TaintScope, x) = false
93+
Base.isless(x, ::TaintScope) = true
94+
function constrain(x::TaintScope, y::TaintScope)
95+
scope = constrain(x.scope, y.scope)
96+
if scope isa InvalidScope
97+
return scope
98+
end
99+
taints = Set{AbstractScopeTaint}()
100+
for tx in x.taints
101+
push!(taints, tx)
102+
end
103+
for ty in y.taints
104+
push!(taints, ty)
105+
end
106+
return TaintScope(scope, taints)
107+
end
108+
function constrain(x::TaintScope, y)
109+
scope = constrain(x.scope, y)
110+
if scope isa InvalidScope
111+
return scope
112+
end
113+
return TaintScope(scope, x.taints)
114+
end
115+
function constrain(x::TaintScope, y::ExactScope)
116+
for taint in x.taints
117+
if !taint_match(taint, y.processor)
118+
return InvalidScope(x, y)
119+
end
120+
end
121+
return constrain(x.scope, y)
122+
end
123+
61124
Base.isless(::UnionScope, ::UnionScope) = false
125+
Base.isless(::UnionScope, ::TaintScope) = true
62126
Base.isless(::UnionScope, ::AnyScope) = true
63127
function constrain(x::UnionScope, y::UnionScope)
64128
zs = Vector{AbstractScope}()
@@ -75,12 +139,14 @@ end
75139
constrain(x::UnionScope, y) = constrain(x, UnionScope((y,)))
76140

77141
Base.isless(::NodeScope, ::NodeScope) = false
142+
Base.isless(::NodeScope, ::TaintScope) = true
78143
Base.isless(::NodeScope, ::AnyScope) = true
79144
constrain(x::NodeScope, y::NodeScope) =
80145
x == y ? y : InvalidScope(x, y)
81146

82147
Base.isless(::ProcessScope, ::ProcessScope) = false
83148
Base.isless(::ProcessScope, ::NodeScope) = true
149+
Base.isless(::ProcessScope, ::TaintScope) = true
84150
Base.isless(::ProcessScope, ::AnyScope) = true
85151
constrain(x::ProcessScope, y::ProcessScope) =
86152
x == y ? y : InvalidScope(x, y)
@@ -90,6 +156,7 @@ constrain(x::NodeScope, y::ProcessScope) =
90156
Base.isless(::ExactScope, ::ExactScope) = false
91157
Base.isless(::ExactScope, ::ProcessScope) = true
92158
Base.isless(::ExactScope, ::NodeScope) = true
159+
Base.isless(::ExactScope, ::TaintScope) = true
93160
Base.isless(::ExactScope, ::AnyScope) = true
94161
constrain(x::ExactScope, y::ExactScope) =
95162
x == y ? y : InvalidScope(x, y)

src/thunk.jl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ arguments are still passed as-is.
3939
- `processor::Processor=OSProc()` - The processor associated with `f`. Useful if
4040
`f` is a callable struct that exists on a given processor and should be
4141
transferred appropriately.
42-
- `scope::Dagger.AbstractScope=AnyScope()` - The scope associated with `f`.
42+
- `scope::Dagger.AbstractScope=DefaultScope()` - The scope associated with `f`.
4343
Useful if `f` is a function or callable struct that may only be transferred to,
4444
and executed within, the specified scope.
4545
@@ -80,7 +80,7 @@ mutable struct Thunk
8080
if !isa(f, Chunk) && (!isnothing(processor) || !isnothing(scope))
8181
f = tochunk(f,
8282
something(processor, OSProc()),
83-
something(scope, AnyScope()))
83+
something(scope, DefaultScope()))
8484
end
8585
xs = Any[xs...]
8686
if options !== nothing
@@ -315,7 +315,7 @@ function spawn(f, args...; processor=nothing, scope=nothing, kwargs...)
315315
if !isnothing(processor) || !isnothing(scope)
316316
f = tochunk(f,
317317
something(processor, get_options(:processor, OSProc())),
318-
something(scope, get_options(:scope, AnyScope())))
318+
something(scope, get_options(:scope, DefaultScope())))
319319
end
320320
uid, future, finalizer_ref, thunk_ref = if myid() == 1
321321
_spawn(f, args...; options, kwargs...)

test/scopes.jl

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,17 @@
3131
es1_ch = Dagger.tochunk(nothing, OSProc(), es1)
3232
es2_ch = Dagger.tochunk(nothing, OSProc(), es2)
3333

34+
os1 = ExactScope(OSProc(1))
35+
36+
@testset "Default Scope" begin
37+
ds = DefaultScope()
38+
for (s1, s2) in ((ds, es1), (es1, ds))
39+
@test Dagger.constrain(s1, s2) == es1
40+
end
41+
for (s1, s2) in ((ds, os1), (os1, ds))
42+
@test Dagger.constrain(s1, s2) isa Dagger.InvalidScope
43+
end
44+
end
3445
@testset "Node Scope" begin
3546
@everywhere node_scope_test(ch...) = Dagger.system_uuid()
3647

@@ -128,6 +139,26 @@
128139
@test es1 in us_res.scopes
129140
@test !(es2 in us_res.scopes)
130141
end
142+
@testset "Processor Type Scope" begin
143+
pts_th = ProcessorTypeScope(Dagger.ThreadProc)
144+
pts_os = ProcessorTypeScope(Dagger.OSProc)
145+
146+
@test Dagger.constrain(pts_th, es1) == es1
147+
@test Dagger.constrain(pts_th, os1) isa Dagger.InvalidScope
148+
149+
@test Dagger.constrain(pts_os, es1) isa Dagger.InvalidScope
150+
@test Dagger.constrain(pts_os, os1) == os1
151+
152+
# Duplicate
153+
pts_th_dup = Dagger.constrain(pts_th, pts_th)
154+
@test Dagger.constrain(pts_th_dup, es1) == es1
155+
@test Dagger.constrain(pts_th_dup, os1) isa Dagger.InvalidScope
156+
157+
# Empty intersection
158+
pts_all = Dagger.constrain(pts_th, pts_os)
159+
@test Dagger.constrain(pts_all, es1) isa Dagger.InvalidScope
160+
@test Dagger.constrain(pts_all, os1) isa Dagger.InvalidScope
161+
end
131162
# TODO: Test scope propagation
132163

133164
rmprocs([wid1, wid2])

test/thunk.jl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import Dagger: Chunk
2121
end
2222
MulProc() = MulProc(myid())
2323
Dagger.get_parent(mp::MulProc) = OSProc(mp.owner)
24+
Dagger.move(src::MulProc, dest::Dagger.OSProc, x::Function) = Base.:*
2425
Dagger.move(src::MulProc, dest::Dagger.ThreadProc, x::Function) = Base.:*
2526
end
2627

@@ -202,7 +203,7 @@ end
202203
a = delayed(+; processor=Dagger.ThreadProc(1,1))(1,2)
203204
@test a.f isa Chunk
204205
@test a.f.processor isa Dagger.ThreadProc
205-
@test a.f.scope isa AnyScope
206+
@test a.f.scope == DefaultScope()
206207

207208
a = delayed(+; processor=Dagger.ThreadProc(1,1), scope=NodeScope())(1,2)
208209
@test a.f isa Chunk
@@ -236,7 +237,7 @@ end
236237
a = Dagger.Sch._find_thunk(_a)
237238
@test a.f isa Chunk
238239
@test a.f.processor isa Dagger.ThreadProc
239-
@test a.f.scope isa AnyScope
240+
@test a.f.scope == DefaultScope()
240241

241242
_a = Dagger.spawn(+, 1, 2; processor=Dagger.ThreadProc(1,1), scope=NodeScope())
242243
a = Dagger.Sch._find_thunk(_a)

0 commit comments

Comments
 (0)