Skip to content

DTask/Sch: Optimize DTask futures, optimize proc in scope checks #639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 96 additions & 13 deletions src/dtask.jl
Original file line number Diff line number Diff line change
@@ -1,19 +1,64 @@
export DTask

"""
LocalFuture

A fast, shared-memory alternative to Distributed's Future.
"""
mutable struct LocalFuture
const ready::Base.Event
errored::Bool
value::Union{Some{Any}, Nothing}

LocalFuture() = new(Base.Event(), false, nothing)
end
Base.isready(f::LocalFuture) = f.ready.set # FIXME: Use isready(f.ready)
function Base.wait(f::LocalFuture)
wait(f.ready)
return
end

"A future holding the result of a `Thunk`."
struct ThunkFuture
future::Future
mutable struct ThunkFuture
const from::Int
local_future::Union{LocalFuture, Nothing}
remote_future::Union{Future, Nothing}
end
function ThunkFuture(from::Int=myid())
if from == myid()
return ThunkFuture(from, LocalFuture(), nothing)
else
return ThunkFuture(from, nothing, Future())
end
end
function Base.isready(t::ThunkFuture)
if t.local_future !== nothing
return isready(t.local_future::LocalFuture)
else
return isready(t.remote_future::Future)::Bool
end
end
ThunkFuture(x::Integer) = ThunkFuture(Future(x))
ThunkFuture() = ThunkFuture(Future())
Base.isready(t::ThunkFuture) = isready(t.future)
Base.wait(t::ThunkFuture) = Dagger.Sch.thunk_yield() do
wait(t.future)
if t.from == myid()
wait(t.local_future)
else
wait(t.remote_future)
end
return
end
function Base.fetch(t::ThunkFuture; proc=OSProc(), raw=false)
error, value = Dagger.Sch.thunk_yield() do
fetch(t.future)
if t.from == myid()
if !isready(t.local_future)
Dagger.Sch.thunk_yield() do
wait(t.local_future)
end
end
value = something(t.local_future.value)
error = t.local_future.errored
else
error, value = Dagger.Sch.thunk_yield() do
fetch(t.remote_future)
end
end
if error
throw(value)
Expand All @@ -24,7 +69,43 @@ function Base.fetch(t::ThunkFuture; proc=OSProc(), raw=false)
return move(proc, value)
end
end
Base.put!(t::ThunkFuture, x; error=false) = put!(t.future, (error, x))
function Base.put!(t::ThunkFuture, x; error=false)
if isready(t)
throw(ConcurrencyViolationError("ThunkFuture can't be set twice"))
end

# Notify either or both futures
if t.local_future !== nothing
t.local_future.value = Some{Any}(x)
t.local_future.errored = error
notify(t.local_future.ready)
end
if t.remote_future !== nothing
put!(t.remote_future, (error, x))
end

return x
end
function Serialization.serialize(io::AbstractSerializer, t::ThunkFuture)
if t.remote_future === nothing
# Add a Future
t.remote_future = Future()
end

# Serialize normally
return invoke(serialize, Tuple{typeof(io), Any}, io, t)
end
function Serialization.deserialize(io::AbstractSerializer, ::Type{ThunkFuture})
# Deserialize normally
t = invoke(deserialize, Tuple{AbstractSerializer, DataType}, io, ThunkFuture)

if t.local_future !== nothing
# Remove the (now useless) LocalFuture
t.local_future = nothing
end

return t
end

"""
DTaskMetadata
Expand All @@ -45,24 +126,26 @@ executing. May be `fetch`'d or `wait`'d on at any time. See `Dagger.@spawn` for
more details.
"""
mutable struct DTask
uid::UInt
const uid::UInt
future::ThunkFuture
metadata::DTaskMetadata
const metadata::DTaskMetadata
thunk_ref::DRef

DTask(uid, future, metadata) = new(uid, future, metadata)
end

const EagerThunk = DTask

Base.isready(t::DTask) = isready(t.future)
Base.isready(t::DTask) = isready(t.future)::Bool
Base.istaskdone(t::DTask) = isready(t.future)
Base.istaskstarted(t::DTask) = isdefined(t, :thunk_ref)
function Base.wait(t::DTask)
if !istaskstarted(t)
throw(ConcurrencyViolationError("Cannot `wait` on an unlaunched `DTask`"))
end
wait(t.future)
if !isready(t)
wait(t.future)
end
return
end
function Base.fetch(t::DTask; raw=false)
Expand Down
3 changes: 1 addition & 2 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1101,8 +1101,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
end
task, occupancy = peek(queue)
scope = task.scope
if !isa(constrain(scope, Dagger.ExactScope(to_proc)),
InvalidScope) &&
if Dagger.proc_in_scope(to_proc, scope)
typemax(UInt32) - proc_occupancy_cached >= occupancy
# Compatible, steal this task
return dequeue_pair!(queue)
Expand Down
3 changes: 1 addition & 2 deletions src/sch/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,7 @@ function can_use_proc(state, task, gproc, proc, opts, scope)
end

# Check against scope
proc_scope = Dagger.ExactScope(proc)
if constrain(scope, proc_scope) isa Dagger.InvalidScope
if !Dagger.proc_in_scope(proc, scope)
@dagdebug task :scope "Rejected $proc: Not contained in task scope ($scope)"
return false, scope
end
Expand Down
9 changes: 9 additions & 0 deletions src/scopes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ abstract type AbstractScope end

"Widest scope that contains all processors."
struct AnyScope <: AbstractScope end
proc_in_scope(::Processor, ::AnyScope) = true

abstract type AbstractScopeTaint end
proc_in_scope(proc::Processor, scope::AbstractScope) =
!isa(constrain(scope, ExactScope(proc)), InvalidScope)

"Taints a scope for later evaluation."
struct TaintScope <: AbstractScope
Expand Down Expand Up @@ -44,6 +47,8 @@ UnionScope(scopes...) = UnionScope((scopes...,))
UnionScope(scopes::Vector{<:AbstractScope}) = UnionScope((scopes...,))
UnionScope(s::AbstractScope) = UnionScope((s,))
UnionScope() = UnionScope(())
proc_in_scope(proc::Processor, scope::UnionScope) =
any(subscope->proc_in_scope(proc, subscope), scope.scopes)

function Base.:(==)(us1::UnionScope, us2::UnionScope)
if length(us1.scopes) != length(us2.scopes)
Expand Down Expand Up @@ -78,6 +83,8 @@ function ProcessScope(wid::Integer)
end
ProcessScope(p::OSProc) = ProcessScope(p.pid)
ProcessScope() = ProcessScope(myid())
proc_in_scope(proc::Processor, scope::ProcessScope) =
root_worker_id(proc) == scope.wid

struct ProcessorTypeTaint{T} <: AbstractScopeTaint end

Expand All @@ -92,12 +99,14 @@ struct ExactScope <: AbstractScope
processor::Processor
end
ExactScope(proc) = ExactScope(ProcessScope(get_parent(proc).pid), proc)
proc_in_scope(proc::Processor, scope::ExactScope) = proc == scope.processor

"Indicates that the applied scopes `x` and `y` are incompatible."
struct InvalidScope <: AbstractScope
x::AbstractScope
y::AbstractScope
end
proc_in_scope(::Processor, ::InvalidScope) = false

# Show methods

Expand Down
Loading