Skip to content

Commit db5eecd

Browse files
committed
Sch: Use MetricsTracker
1 parent f1617f4 commit db5eecd

File tree

4 files changed

+96
-7
lines changed

4 files changed

+96
-7
lines changed

Project.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6"
1111
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
1212
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
1313
MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94"
14+
MetricsTracker = "9a9c6fec-044d-4a27-aa18-2b01ca4026eb"
1415
OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e"
1516
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
1617
Preferences = "21216c6a-2e73-6563-6e65-726566657250"

src/Dagger.jl

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ else
4343
import Distributed: Future, RemoteChannel, myid, workers, nworkers, procs, remotecall, remotecall_wait, remotecall_fetch, check_same_host
4444
end
4545

46+
import MetricsTracker as MT
47+
const reuse_metrics = @load_preference("reuse-metrics", false)
48+
const metrics_path = @load_preference("metrics-path", "metrics.json")
49+
4650
include("lib/util.jl")
4751
include("utils/dagdebug.jl")
4852

@@ -67,6 +71,9 @@ include("submission.jl")
6771
include("chunks.jl")
6872
include("memory-spaces.jl")
6973

74+
# Metrics
75+
include("utils/metrics.jl")
76+
7077
# Task scheduling
7178
include("compute.jl")
7279
include("utils/clock.jl")
@@ -126,6 +133,30 @@ function set_distributed_package!(value)
126133
@info "Dagger.jl preference has been set, restart your Julia session for this change to take effect!"
127134
end
128135

136+
"""
137+
set_reuse_metrics!(value::Bool)
138+
139+
Set a [preference](https://github.com/JuliaPackaging/Preferences.jl) for
140+
enabling or disabling the reuse of collected metrics across Julia sessions.
141+
You will need to restart Julia after setting a new preference.
142+
"""
143+
function set_reuse_metrics!(value::Bool)
144+
@set_preferences!("reuse-metrics" => value)
145+
@info "Dagger.jl preference has been set, restart your Julia session for this change to take effect!"
146+
end
147+
148+
"""
149+
set_metrics_path!(value::String)
150+
151+
Set a [preference](https://github.com/JuliaPackaging/Preferences.jl) for
152+
the path to save and load metrics. You will need to restart Julia after setting
153+
a new preference.
154+
"""
155+
function set_metrics_path!(value::String)
156+
@set_preferences!("metrics-path" => value)
157+
@info "Dagger.jl preference has been set, restart your Julia session for this change to take effect!"
158+
end
159+
129160
# Precompilation
130161
import PrecompileTools: @compile_workload
131162
include("precompile.jl")
@@ -189,6 +220,26 @@ function __init__()
189220
catch err
190221
@warn "Error parsing JULIA_DAGGER_DEBUG" exception=err
191222
end
223+
224+
if reuse_metrics
225+
if isfile(metrics_path)
226+
# Load metrics
227+
@dagdebug nothing :metrics "Loading metrics"
228+
try
229+
MT.load_metrics!(metrics_path)
230+
catch err
231+
@warn "Error loading metrics" exception=(err, catch_backtrace())
232+
end
233+
else
234+
@dagdebug nothing :metrics "Metrics file not found"
235+
end
236+
237+
atexit() do
238+
# Save metrics on exit
239+
@dagdebug nothing :metrics "Saving metrics"
240+
MT.save_metrics(metrics_path)
241+
end
242+
end
192243
end
193244

194245
end # module

src/sch/Sch.jl

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ import Base: @invokelatest
1515

1616
import ..Dagger
1717
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject
18-
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, task_processor, constrain, cputhreadtime
18+
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, task_processor, constrain
1919
import ..Dagger: @dagdebug, @safe_lock_spin1
2020
import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek
21+
import ScopedValues: @with
22+
23+
import MetricsTracker as MT
2124

2225
import ..Dagger
2326

@@ -1648,6 +1651,12 @@ function do_task(to_proc, task_desc)
16481651
end
16491652
end
16501653

1654+
# Compute signature
1655+
@warn "Fix kwargs" maxlog=1
1656+
sig = DataType[Tf, map(fetched_args) do x
1657+
chunktype(x)
1658+
end...]
1659+
16511660
#= FIXME: If MaxUtilization, stop processors and wait
16521661
if (est_time_util isa MaxUtilization) && (real_time_util > 0)
16531662
# FIXME: Stop processors
@@ -1660,8 +1669,11 @@ function do_task(to_proc, task_desc)
16601669
timespan_start(ctx, :compute, (;thunk_id, processor=to_proc), (;f))
16611670
res = nothing
16621671

1663-
# Start counting time and GC allocations
1664-
threadtime_start = cputhreadtime()
1672+
# Setup metrics for time monitoring
1673+
mspec = MT.MetricsSpec(MT.TimeMetric(), Dagger.SignatureMetric(), Dagger.ProcessorMetric())
1674+
local_cache = MT.MetricsCache()
1675+
1676+
# Start counting GC allocations
16651677
# FIXME
16661678
#gcnum_start = Base.gc_num()
16671679

@@ -1677,9 +1689,13 @@ function do_task(to_proc, task_desc)
16771689
cancel_token=Dagger.DTASK_CANCEL_TOKEN[],
16781690
))
16791691

1692+
# Execute
16801693
res = Dagger.with_options(propagated) do
1681-
# Execute
1682-
execute!(to_proc, f, fetched_args...; fetched_kwargs...)
1694+
@with Dagger.TASK_SIGNATURE=>sig Dagger.TASK_PROCESSOR=>to_proc begin
1695+
MT.@with_metrics mspec Dagger :execute! thunk_id MT.SyncInto(local_cache) begin
1696+
execute!(to_proc, f, fetched_args...; fetched_kwargs...)
1697+
end
1698+
end
16831699
end
16841700

16851701
# Check if result is safe to store
@@ -1705,10 +1721,16 @@ function do_task(to_proc, task_desc)
17051721
RemoteException(myid(), CapturedException(ex, bt))
17061722
end
17071723

1708-
threadtime = cputhreadtime() - threadtime_start
1724+
lock(MT.GLOBAL_METRICS_CACHE) do global_cache
1725+
MT.sync_results_into!(global_cache, local_cache)
1726+
end
1727+
17091728
# FIXME: This is not a realistic measure of max. required memory
17101729
#gc_allocd = min(max(UInt64(Base.gc_num().allocd) - UInt64(gcnum_start.allocd), UInt64(0)), UInt64(1024^4))
17111730
timespan_finish(ctx, :compute, (;thunk_id, processor=to_proc), (;f, result=result_meta))
1731+
1732+
threadtime = MT.cache_lookup(local_cache, Dagger, :execute!, thunk_id, MT.TimeMetric())
1733+
17121734
lock(TASK_SYNC) do
17131735
real_time_util[] -= est_time_util
17141736
pop!(TASKS_RUNNING, thunk_id)
@@ -1723,7 +1745,7 @@ function do_task(to_proc, task_desc)
17231745
storage_pressure=real_alloc_util,
17241746
storage_capacity=storage_cap,
17251747
loadavg=((Sys.loadavg()...,) ./ Sys.CPU_THREADS),
1726-
threadtime=threadtime,
1748+
threadtime,
17271749
# FIXME: Add runtime allocation tracking
17281750
gc_allocd=(isa(result_meta, Chunk) ? result_meta.handle.size : 0),
17291751
transfer_rate=(transfer_size[] > 0 && transfer_time[] > 0) ? round(UInt64, transfer_size[] / (transfer_time[] / 10^9)) : nothing,

src/utils/metrics.jl

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
const TASK_SIGNATURE = ScopedValue{Union{Vector{DataType}, Nothing}}(nothing)
2+
struct SignatureMetric <: MT.AbstractMetric end
3+
MT.metric_applies(::SignatureMetric, ::Val{:execute!}) = true
4+
MT.metric_type(::SignatureMetric) = Union{Vector{DataType}, Nothing}
5+
MT.start_metric(::SignatureMetric) = nothing
6+
MT.stop_metric(::SignatureMetric, _) = TASK_SIGNATURE[]
7+
8+
const TASK_PROCESSOR = ScopedValue{Union{Processor, Nothing}}(nothing)
9+
struct ProcessorMetric <: MT.AbstractMetric end
10+
MT.metric_applies(::ProcessorMetric, ::Val{:execute!}) = true
11+
MT.metric_type(::ProcessorMetric) = Union{Processor, Nothing}
12+
MT.start_metric(::ProcessorMetric) = nothing
13+
MT.stop_metric(::ProcessorMetric, _) = TASK_PROCESSOR[]
14+
15+
# FIXME: struct TransferTimeMetric <: MT.AbstractMetric end

0 commit comments

Comments
 (0)