Skip to content

Commit e571124

Browse files
committed
Sch: Use W1T1 if profitable
1 parent 65bb3fd commit e571124

File tree

2 files changed

+22
-9
lines changed

2 files changed

+22
-9
lines changed

src/sch/Sch.jl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -626,16 +626,17 @@ end
626626
resize!(sorted_procs, length(input_procs))
627627
costs = @reusable_dict :schedule!_costs Processor Float64 OSProc() 0.0 32
628628
costs_cleanup = @reuse_defer_cleanup empty!(costs)
629-
estimate_task_costs!(sorted_procs, costs, state, input_procs, task)
630-
costs_cleanup() # We don't use costs here
629+
estimate_task_costs!(sorted_procs, costs, state, input_procs, task; sig)
631630
input_procs_cleanup()
632631
scheduled = false
633632

634-
# Move our corresponding ThreadProc to be the last considered
633+
# Move our corresponding ThreadProc to be the last considered,
634+
# if the task is expected to run for longer than the time it takes to
635+
# schedule it onto another worker (estimated at 1ms).
635636
if length(sorted_procs) > 1
636637
sch_threadproc = Dagger.ThreadProc(myid(), Threads.threadid())
637638
sch_thread_idx = findfirst(proc->proc==sch_threadproc, sorted_procs)
638-
if sch_thread_idx !== nothing
639+
if sch_thread_idx !== nothing && costs[sch_threadproc] > 1_000_000 # 1ms
639640
deleteat!(sorted_procs, sch_thread_idx)
640641
push!(sorted_procs, sch_threadproc)
641642
end

src/sch/util.jl

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -551,13 +551,13 @@ current estimated per-processor compute pressure, and transfer costs for each
551551
`Chunk` argument to `task`. Returns `(procs, costs)`, with `procs` sorted in
552552
order of ascending cost.
553553
"""
554-
function estimate_task_costs(state, procs, task)
554+
function estimate_task_costs(state, procs, task; sig=nothing)
555555
sorted_procs = Vector{Processor}(undef, length(procs))
556556
costs = Dict{Processor,Float64}()
557-
estimate_task_costs!(sorted_procs, costs, state, procs, task)
557+
estimate_task_costs!(sorted_procs, costs, state, procs, task; sig)
558558
return sorted_procs, costs
559559
end
560-
@reuse_scope function estimate_task_costs!(sorted_procs, costs, state, procs, task)
560+
@reuse_scope function estimate_task_costs!(sorted_procs, costs, state, procs, task; sig=nothing)
561561
tx_rate = state.transfer_rate[]
562562

563563
# Find all Chunks
@@ -569,6 +569,12 @@ end
569569
end
570570
end
571571

572+
# Estimate the cost of executing the task itself
573+
if sig === nothing
574+
sig = signature(task.f, task.inputs)
575+
end
576+
est_time_util = get(state.signature_time_cost, sig, 1000^3)
577+
572578
# Estimate total cost for executing this task on each candidate processor
573579
for proc in procs
574580
gproc = get_parent(proc)
@@ -582,8 +588,14 @@ end
582588
tx_cost = impute_sum(affinity(chunk)[2] for chunk in chunks_filt)
583589

584590
# Estimate total cost to move data and get task running after currently-scheduled tasks
585-
est_time_util = get(state.worker_time_pressure[gproc.pid], proc, 0)
586-
costs[proc] = est_time_util + (tx_cost/tx_rate)
591+
est_business = get(state.worker_time_pressure[get_parent(proc).pid], proc, 0)
592+
593+
# Add fixed cost for cross-worker task transfer (esimated at 1ms)
594+
# TODO: Actually estimate/benchmark this
595+
task_xfer_cost = gproc.pid != myid() ? 1_000_000 : 0 # 1ms
596+
597+
# Compute final cost
598+
costs[proc] = est_time_util + est_business + (tx_cost/tx_rate) + task_xfer_cost
587599
end
588600
chunks_cleanup()
589601

0 commit comments

Comments
 (0)