Skip to content

Commit 81eea2b

Browse files
committed
estimate_task_costs: Reduce allocations
1 parent c59e4cc commit 81eea2b

File tree

1 file changed

+16
-12
lines changed

1 file changed

+16
-12
lines changed

src/sch/util.jl

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -368,9 +368,7 @@ end
368368

369369
"Like `sum`, but replaces `nothing` entries with the average of non-`nothing` entries."
370370
function impute_sum(xs)
371-
length(xs) == 0 && return 0
372-
373-
total = zero(eltype(xs))
371+
total = 0
374372
nothing_count = 0
375373
something_count = 0
376374
for x in xs
@@ -382,7 +380,8 @@ function impute_sum(xs)
382380
end
383381
end
384382

385-
total + nothing_count * total / something_count
383+
something_count == 0 && return 0
384+
return total + nothing_count * total / something_count
386385
end
387386

388387
"Collects all arguments for `task`, converting Thunk inputs to Chunks."
@@ -412,15 +411,20 @@ function estimate_task_costs(state, procs, task, inputs)
412411
end
413412
end
414413

415-
# Estimate network transfer costs based on data size
416-
# N.B. `affinity(x)` really means "data size of `x`"
417-
# N.B. We treat same-worker transfers as having zero transfer cost
418-
# TODO: For non-Chunk, model cost from scheduler to worker
419-
# TODO: Measure and model processor move overhead
420-
transfer_costs = Dict(proc=>impute_sum([affinity(chunk)[2] for chunk in filter(c->get_parent(processor(c))!=get_parent(proc), chunks)]) for proc in procs)
414+
costs = Dict{Processor,Float64}()
415+
for proc in procs
416+
chunks_filt = Iterators.filter(c->get_parent(processor(c))!=get_parent(proc), chunks)
417+
418+
# Estimate network transfer costs based on data size
419+
# N.B. `affinity(x)` really means "data size of `x`"
420+
# N.B. We treat same-worker transfers as having zero transfer cost
421+
# TODO: For non-Chunk, model cost from scheduler to worker
422+
# TODO: Measure and model processor move overhead
423+
tx_cost = impute_sum(affinity(chunk)[2] for chunk in chunks_filt)
421424

422-
# Estimate total cost to move data and get task running after currently-scheduled tasks
423-
costs = Dict(proc=>state.worker_time_pressure[get_parent(proc).pid][proc]+(tx_cost/tx_rate) for (proc, tx_cost) in transfer_costs)
425+
# Estimate total cost to move data and get task running after currently-scheduled tasks
426+
costs[proc] = state.worker_time_pressure[get_parent(proc).pid][proc] + (tx_cost/tx_rate)
427+
end
424428

425429
# Shuffle procs around, so equally-costly procs are equally considered
426430
P = randperm(length(procs))

0 commit comments

Comments
 (0)