Skip to content

Commit 7e30787

Browse files
committed
Add deferral for reuseable cleanup
1 parent dbf903d commit 7e30787

File tree

5 files changed

+75
-31
lines changed

5 files changed

+75
-31
lines changed

src/Dagger.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import Random: AbstractRNG
1717
import UUIDs: UUID, uuid4
1818

1919
if !isdefined(Base, :ScopedValues)
20-
import ScopedValues: ScopedValue, with
20+
import ScopedValues: ScopedValue, @with, with
2121
else
22-
import Base.ScopedValues: ScopedValue, with
22+
import Base.ScopedValues: ScopedValue, @with, with
2323
end
2424
import TaskLocalValues: TaskLocalValue
2525

src/sch/Sch.jl

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ import ..Dagger: @dagdebug, @safe_lock_spin1, @maybelog, @take_or_alloc!
2020
import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek
2121

2222
import ..Dagger: ReusableCache, ReusableLinkedList, ReusableDict
23-
import ..Dagger: @reusable, @reusable_dict, @reusable_vector, @reusable_tasks
23+
import ..Dagger: @reusable, @reusable_dict, @reusable_vector, @reusable_tasks, @reuse_scope, @reuse_defer_cleanup
2424

2525
import TimespanLogging
2626

2727
import TaskLocalValues: TaskLocalValue
28+
import ScopedValues: @with
2829

2930
const OneToMany = Dict{Thunk, Set{Thunk}}
3031

@@ -530,7 +531,7 @@ struct ScheduleTaskSpec
530531
est_alloc_util::UInt64
531532
est_occupancy::UInt32
532533
end
533-
function schedule!(ctx, state, sch_options, procs=procs_to_use(ctx, sch_options))
534+
@reuse_scope function schedule!(ctx, state, sch_options, procs=procs_to_use(ctx, sch_options))
534535
lock(state.lock) do
535536
safepoint(state)
536537

@@ -541,8 +542,9 @@ function schedule!(ctx, state, sch_options, procs=procs_to_use(ctx, sch_options)
541542

542543
# Schedule tasks
543544
to_fire = @reusable_dict :schedule!_to_fire ScheduleTaskLocation Vector{ScheduleTaskSpec} ScheduleTaskLocation(OSProc(), OSProc()) ScheduleTaskSpec[] 1024
545+
to_fire_cleanup = @reuse_defer_cleanup empty!(to_fire)
544546
failed_scheduling = @reusable_vector :schedule!_failed_scheduling Union{Thunk,Nothing} nothing 32
545-
547+
failed_scheduling_cleanup = @reuse_defer_cleanup empty!(failed_scheduling)
546548
# Select a new task and get its options
547549
task = nothing
548550
@label pop_task
@@ -611,21 +613,20 @@ function schedule!(ctx, state, sch_options, procs=procs_to_use(ctx, sch_options)
611613

612614
input_procs = @reusable_vector :schedule!_input_procs Processor OSProc() 32
613615
input_procs_cleanup = @reuse_defer_cleanup empty!(input_procs)
614-
for gp in Dagger.compatible_processors(scope, procs)
615-
subprocs = get_processors(gp)
616-
for proc in subprocs
617-
if !(proc in input_procs)
618-
push!(input_procs, proc)
619-
end
616+
for proc in Dagger.compatible_processors(scope, procs)
617+
if !(proc in input_procs)
618+
push!(input_procs, proc)
620619
end
621620
end
622621

623622
sorted_procs = @reusable_vector :schedule!_sorted_procs Processor OSProc() 32
623+
sorted_procs_cleanup = @reuse_defer_cleanup empty!(sorted_procs)
624624
resize!(sorted_procs, length(input_procs))
625625
costs = @reusable_dict :schedule!_costs Processor Float64 OSProc() 0.0 32
626+
costs_cleanup = @reuse_defer_cleanup empty!(costs)
626627
estimate_task_costs!(sorted_procs, costs, state, input_procs, task)
627-
empty!(costs) # We don't use costs here
628-
empty!(input_procs)
628+
costs_cleanup() # We don't use costs here
629+
input_procs_cleanup()
629630
scheduled = false
630631

631632
# Move our corresponding ThreadProc to be the last considered
@@ -655,27 +656,29 @@ function schedule!(ctx, state, sch_options, procs=procs_to_use(ctx, sch_options)
655656
state.worker_time_pressure[gproc.pid][proc] =
656657
get(state.worker_time_pressure[gproc.pid], proc, 0) +
657658
est_time_util
658-
@dagdebug task :schedule "Scheduling to $gproc -> $proc"
659-
empty!(sorted_procs)
659+
@dagdebug task :schedule "Scheduling to $gproc -> $proc (cost: $(costs[proc]), pressure: $(state.worker_time_pressure[gproc.pid][proc]))"
660+
sorted_procs_cleanup()
661+
costs_cleanup()
660662
@goto pop_task
661663
end
662664
end
663665
end
664666
ex = SchedulingException("No processors available, try widening scope")
665667
store_result!(state, task, ex; error=true)
666668
set_failed!(state, task)
667-
empty!(sorted_procs)
669+
sorted_procs_cleanup()
670+
costs_cleanup()
668671
@goto pop_task
669672

670673
# Fire all newly-scheduled tasks
671674
@label fire_tasks
672675
for (task_loc, task_spec) in to_fire
673676
fire_tasks!(ctx, task_loc, task_spec, state)
674677
end
675-
empty!(to_fire)
678+
to_fire_cleanup()
676679

677680
append!(state.ready, failed_scheduling)
678-
empty!(failed_scheduling)
681+
failed_scheduling_cleanup()
679682
end
680683
end
681684

@@ -801,9 +804,10 @@ struct TaskSpec
801804
end
802805
Base.hash(task::TaskSpec, h::UInt) = hash(task.thunk_id, hash(TaskSpec, h))
803806

804-
function fire_tasks!(ctx, task_loc::ScheduleTaskLocation, task_specs::Vector{ScheduleTaskSpec}, state)
807+
@reuse_scope function fire_tasks!(ctx, task_loc::ScheduleTaskLocation, task_specs::Vector{ScheduleTaskSpec}, state)
805808
gproc, proc = task_loc.gproc, task_loc.proc
806809
to_send = @reusable_vector :fire_tasks!_to_send Union{TaskSpec,Nothing} nothing 1024
810+
to_send_cleanup = @reuse_defer_cleanup empty!(to_send)
807811
for task_spec in task_specs
808812
thunk = task_spec.task
809813
push!(state.running, thunk)
@@ -859,7 +863,7 @@ function fire_tasks!(ctx, task_loc::ScheduleTaskLocation, task_specs::Vector{Sch
859863
@reusable_tasks :fire_tasks!_task_cache 32 _->nothing "fire_tasks!" FireTaskSpec(proc, state.chan, task_spec)
860864
end
861865
end
862-
empty!(to_send)
866+
to_send_cleanup()
863867
end
864868

865869
struct FireTaskSpec
@@ -1306,7 +1310,7 @@ end
13061310
13071311
Executes a single task specified by `task` on `to_proc`.
13081312
"""
1309-
function do_task(to_proc, task::TaskSpec)
1313+
@reuse_scope function do_task(to_proc, task::TaskSpec)
13101314
thunk_id = task.thunk_id
13111315

13121316
ctx_vars = task.ctx_vars
@@ -1461,7 +1465,9 @@ function do_task(to_proc, task::TaskSpec)
14611465
f = Dagger.value(first(data))
14621466
@assert !(f isa Chunk) "Failed to unwrap thunk function"
14631467
fetched_args = @reusable_vector :do_task_fetched_args Any nothing 32
1468+
fetched_args_cleanup = @reuse_defer_cleanup empty!(fetched_args)
14641469
fetched_kwargs = @reusable_vector :do_task_fetched_kwargs Pair{Symbol,Any} :NULL=>nothing 32
1470+
fetched_kwargs_cleanup = @reuse_defer_cleanup empty!(fetched_kwargs)
14651471
for idx in 2:length(data)
14661472
arg = data[idx]
14671473
if Dagger.ispositional(arg)
@@ -1535,8 +1541,8 @@ function do_task(to_proc, task::TaskSpec)
15351541
bt = catch_backtrace()
15361542
RemoteException(myid(), CapturedException(ex, bt))
15371543
finally
1538-
empty!(fetched_args)
1539-
empty!(fetched_kwargs)
1544+
fetched_args_cleanup()
1545+
fetched_kwargs_cleanup()
15401546
end
15411547

15421548
threadtime = cputhreadtime() - threadtime_start

src/sch/util.jl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -542,11 +542,12 @@ function estimate_task_costs(state, procs, task)
542542
estimate_task_costs!(sorted_procs, costs, state, procs, task)
543543
return sorted_procs, costs
544544
end
545-
function estimate_task_costs!(sorted_procs, costs, state, procs, task)
545+
@reuse_scope function estimate_task_costs!(sorted_procs, costs, state, procs, task)
546546
tx_rate = state.transfer_rate[]
547547

548548
# Find all Chunks
549549
chunks = @reusable_vector :estimate_task_costs_chunks Union{Chunk,Nothing} nothing 32
550+
chunks_cleanup = @reuse_defer_cleanup empty!(chunks)
550551
for input in task.inputs
551552
if Dagger.valuetype(input) <: Chunk
552553
push!(chunks, Dagger.value(input)::Chunk)
@@ -568,7 +569,7 @@ function estimate_task_costs!(sorted_procs, costs, state, procs, task)
568569
est_time_util = get(state.worker_time_pressure[gproc.pid], proc, 0)
569570
costs[proc] = est_time_util + (tx_cost/tx_rate)
570571
end
571-
empty!(chunks)
572+
chunks_cleanup()
572573

573574
# Shuffle procs around, so equally-costly procs are equally considered
574575
np = length(procs)

src/submission.jl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ end
5252
eager_submit_internal!(ctx, state, task, tid, payload::Tuple{<:AnyPayload}) =
5353
eager_submit_internal!(ctx, state, task, tid, payload[1])
5454
const UID_TO_TID_CACHE = TaskLocalValue{ReusableCache{Dict{UInt64,Int},Nothing}}(()->ReusableCache(Dict{UInt64,Int}, nothing, 1))
55-
function eager_submit_internal!(ctx, state, task, tid, payload::AnyPayload; uid_to_tid=nothing)
55+
@reuse_scope function eager_submit_internal!(ctx, state, task, tid, payload::AnyPayload; uid_to_tid=nothing)
5656
maybe_take_or_alloc!(UID_TO_TID_CACHE[], uid_to_tid) do uid_to_tid
5757
if payload isa PayloadMulti
5858
thunk_ids = Sch.ThunkID[]
@@ -79,8 +79,11 @@ function eager_submit_internal!(ctx, state, task, tid, payload::AnyPayload; uid_
7979
@maybelog ctx timespan_start(ctx, :add_thunk, (;thunk_id=id), (;f=fargs[1], args=fargs[2:end], options, uid))
8080

8181
old_fargs = @reusable_vector :eager_submit_internal!_old_fargs Argument Argument(ArgPosition(), nothing) 32
82+
old_fargs_cleanup = @reuse_defer_cleanup empty!(old_fargs)
8283
append!(old_fargs, Iterators.map(copy, fargs))
84+
8385
syncdeps_vec = @reusable_vector :eager_submit_interal!_syncdeps_vec Any nothing 32
86+
syncdeps_vec_cleanup = @reuse_defer_cleanup empty!(syncdeps_vec)
8487
if options.syncdeps !== nothing
8588
append!(syncdeps_vec, options.syncdeps)
8689
end
@@ -159,7 +162,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload::AnyPayload; uid_
159162
end
160163
end
161164
end
162-
empty!(syncdeps_vec)
165+
syncdeps_vec_cleanup()
163166

164167
GC.@preserve old_fargs fargs begin
165168
# Create the `Thunk`
@@ -181,7 +184,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload::AnyPayload; uid_
181184
state.thunk_dict[thunk.id] = WeakThunk(thunk)
182185
#=FIXME:REALLOC=#
183186
Sch.reschedule_syncdeps!(state, thunk)
184-
empty!(old_fargs) # reschedule_syncdeps! preserves all referenced tasks/chunks
187+
old_fargs_cleanup() # reschedule_syncdeps! preserves all referenced tasks/chunks
185188
@dagdebug thunk :submit "Added to scheduler"
186189
if future !== nothing
187190
# Ensure we attach a future before the thunk is scheduled
@@ -205,7 +208,6 @@ function eager_submit_internal!(ctx, state, task, tid, payload::AnyPayload; uid_
205208

206209
return thunk_id
207210
end
208-
empty!(equiv_chunks)
209211
end
210212
end
211213
struct UnrefThunk

src/utils/reuse.jl

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,40 @@
1+
struct ReuseCleanup
2+
done::Base.RefValue{Bool}
3+
f::Function
4+
end
5+
const REUSE_SCOPE_DEFERRED = ScopedValue{Union{Vector{ReuseCleanup},Nothing}}(nothing)
6+
macro reuse_scope(ex)
7+
@assert @capture(ex, function f_(args__) body_ end)
8+
esc(quote
9+
function $f($(args...))
10+
@with $REUSE_SCOPE_DEFERRED=>Vector{$ReuseCleanup}() begin
11+
try
12+
$body
13+
finally
14+
deferred = $REUSE_SCOPE_DEFERRED[]
15+
@assert deferred !== nothing
16+
for cleanup in deferred
17+
cleanup.done[] || cleanup()
18+
end
19+
end
20+
end
21+
end
22+
end)
23+
end
24+
macro reuse_defer_cleanup(ex)
25+
@gensym cleanup
26+
quote
27+
let $cleanup = $ReuseCleanup(Base.RefValue(false), ()->$(esc(ex)))
28+
push!($REUSE_SCOPE_DEFERRED[], $cleanup)
29+
$cleanup
30+
end
31+
end
32+
end
33+
function (cleanup::ReuseCleanup)()
34+
cleanup.done[] = true
35+
cleanup.f()
36+
end
37+
138
struct ReusableCache{T,Tnull}
239
cache::Vector{T}
340
used::Vector{Bool}
@@ -140,8 +177,6 @@ end
140177

141178
# FIXME: Provide ReusableObject{T} interface
142179
# FIXME: Allow objects to be GC'd (if lost via throw/unexpected control flow) (provide optional warning mode on finalization)
143-
# FIXME: Add take/replace interface
144-
# FIXME: Add function annotation for multiple reuse points
145180

146181
#= FIXME: UniquingCache
147182
struct UniquingCache{K,V}

0 commit comments

Comments
 (0)