Skip to content

Commit 8c14210

Browse files
committed
Upload W.I.P improved mlflow logging data handling
1 parent b51fae6 commit 8c14210

File tree

11 files changed

+180
-113
lines changed

11 files changed

+180
-113
lines changed

src/MLFlowClient.jl

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ using JSON
88
using ShowCases
99
using FilePathsBase: AbstractPath
1010

11+
include("types/mlflow.jl")
12+
export MLFlow
13+
1114
include("types/tag.jl")
1215
export Tag
1316

@@ -45,10 +48,8 @@ export
4548
RunInfo,
4649
RunInputs
4750

48-
include("types/mlflow.jl")
49-
export MLFlow
50-
5151
include("utils.jl")
52+
export refresh
5253
include("api.jl")
5354

5455
include("services/experiment.jl")
@@ -67,4 +68,9 @@ export
6768
deleterun,
6869
restorerun,
6970
getrun
71+
include("services/loggers.jl")
72+
export
73+
logmetric,
74+
logbatch
75+
7076
end

src/services/experiment.jl

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,10 @@ The ID of the newly created experiment.
2020
"""
2121
function createexperiment(instance::MLFlow, name::String;
2222
artifact_location::Union{String, Missing}=missing,
23-
tags::Union{Dict{<:Any}, Array{<:Any}}=[])::String
24-
tags = tags |> parsetags
25-
23+
tags::MLFlowUpsertData{Tag}=Tag[])::String
2624
try
2725
result = mlfpost(instance, "experiments/create"; name=name,
28-
artifact_location=artifact_location, tags=tags)
26+
artifact_location=artifact_location, tags=(tags |> parse))
2927
return result["experiment_id"]
3028
catch e
3129
if isa(e, HTTP.ExceptionRequest.StatusError) && e.status == 400
@@ -51,7 +49,7 @@ Get metadata for an experiment. This method works on deleted experiments.
5149
# Returns
5250
An instance of type [`Experiment`](@ref).
5351
"""
54-
function getexperiment(instance::MLFlow, experiment_id::String)
52+
function getexperiment(instance::MLFlow, experiment_id::String)::Experiment
5553
try
5654
arguments = (:experiment_id => experiment_id,)
5755
result = mlfget(instance, "experiments/get"; arguments...)
@@ -60,7 +58,7 @@ function getexperiment(instance::MLFlow, experiment_id::String)
6058
throw(e)
6159
end
6260
end
63-
getexperiment(instance::MLFlow, experiment_id::Integer) =
61+
getexperiment(instance::MLFlow, experiment_id::Integer)::Experiment =
6462
getexperiment(instance, string(experiment_id))
6563

6664
"""
@@ -79,7 +77,8 @@ deleted experiments share the same name, the API will return one of them.
7977
# Returns
8078
An instance of type [`Experiment`](@ref).
8179
"""
82-
function getexperimentbyname(instance::MLFlow, experiment_name::String)
80+
function getexperimentbyname(instance::MLFlow,
81+
experiment_name::String)::Experiment
8382
try
8483
arguments = (:experiment_name => experiment_name,)
8584
result = mlfget(instance, "experiments/get-by-name"; arguments...)
@@ -198,7 +197,7 @@ updateexperiment(instance::MLFlow, experiment::Experiment, new_name::String) =
198197
updateexperiment(instance, experiment.experiment_id, new_name::String)
199198

200199
"""
201-
searchexperiments(instance::MLFlow; max_results::Integer=20000,
200+
searchexperiments(instance::MLFlow; max_results::Int64=20000,
202201
page_token::String="", filter::String="", order_by::Array{String}=[],
203202
view_type::ViewType=ACTIVE_ONLY)
204203
@@ -217,7 +216,7 @@ unspecified, return only active experiments.
217216
# Returns
218217
- vector of [`MLFlowExperiment`](@ref) experiments that were found in the MLFlow instance
219218
"""
220-
function searchexperiments(instance::MLFlow; max_results::Integer=20000,
219+
function searchexperiments(instance::MLFlow; max_results::Int64=20000,
221220
page_token::String="", filter::String="", order_by::Array{String}=String[],
222221
view_type::ViewType=ACTIVE_ONLY)::Tuple{Array{Experiment}, Union{String, Nothing}}
223222
endpoint = "experiments/search"

src/services/loggers.jl

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
"""
2+
logmetric(instance::MLFlow, run_id::String, key::String, value::Float64;
3+
timestamp::Int64=round(Int, now() |> datetime2unix),
4+
step::Union{Int64, Missing}=missing)
5+
logmetric(instance::MLFlow, run::Run, key::String, value::Float64;
6+
timestamp::Int64=round(Int, now() |> datetime2unix),
7+
step::Union{Int64, Missing}=missing)
8+
9+
Log a metric for a run. A metric is a key-value pair (string key, float value)
10+
with an associated timestamp. Examples include the various metrics that
11+
represent ML model accuracy. A metric can be logged multiple times.
12+
13+
# Arguments
14+
- `instance`: [`MLFlow`](@ref) configuration.
15+
- `run_id`: ID of the run under which to log the metric.
16+
- `key`: Name of the metric.
17+
- `value`: Double value of the metric being logged.
18+
- `timestamp`: Unix timestamp in milliseconds at the time metric was logged.
19+
- `step`: Step at which to log the metric.
20+
"""
21+
logmetric(instance::MLFlow, run_id::String, key::String, value::Float64;
22+
timestamp::Int64=round(Int, now() |> datetime2unix),
23+
step::Union{Int64, Missing}=missing) =
24+
mlfpost(instance, "runs/log-metric"; run_id=run_id, key=key, value=value, timestamp=timestamp, step=step)
25+
logmetric(instance::MLFlow, run::Run, key::String, value::Float64;
26+
timestamp::Int64=round(Int, now() |> datetime2unix),
27+
step::Union{Int64, Missing}=missing) =
28+
logmetric(instance, run.info.run_id, key, value; timestamp=timestamp, step=step)
29+
30+
"""
31+
logbatch(instance::MLFlow, run_id::String, metrics::Array{Metric},
32+
params::Array{Param}, tags::Array{Tag})
33+
34+
Log a batch of metrics, params, and tags for a run. In case of error, partial
35+
data may be written.
36+
37+
For more information about this function, check [MLFlow official documentation](https://mlflow.org/docs/latest/rest-api.html#log-batch).
38+
"""
39+
function logbatch(instance::MLFlow, run_id::String;
40+
metrics::Array{Metric}=Metric[], params::Array{Param}=Param[],
41+
tags::MLFlowUpsertData{Tag}=Tag[])
42+
mlfpost(instance, "runs/log-batch"; run_id=run_id, metrics=metrics,
43+
params=params, tags=(tags |> parse))
44+
end

src/services/run.jl

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""
22
createrun(instance::MLFlow, experiment_id::String;
33
run_name::Union{String, Missing}=missing,
4-
start_time::Union{Integer, Missing}=missing,
4+
start_time::Union{Int64, Missing}=missing,
55
tags::Union{Dict{<:Any}, Array{<:Any}}=[])
66
77
Create a new run within an experiment. A run is usually a single execution of a
@@ -19,13 +19,11 @@ An instance of type [`Run`](@ref).
1919
"""
2020
function createrun(instance::MLFlow, experiment_id::String;
2121
run_name::Union{String, Missing}=missing,
22-
start_time::Union{Integer, Missing}=missing,
23-
tags::Union{Dict{<:Any}, Array{<:Any}}=[])
24-
tags = tags |> parsetags
25-
22+
start_time::Union{Int64, Missing}=missing,
23+
tags::MLFlowUpsertData{Tag}=Tag[])
2624
try
2725
result = mlfpost(instance, "runs/create"; experiment_id=experiment_id,
28-
run_name=run_name, start_time=start_time, tags=tags)
26+
run_name=run_name, start_time=start_time, tags=(tags |> parse))
2927
return result["run"] |> Run
3028
catch e
3129
throw(e)
@@ -34,13 +32,13 @@ end
3432
createrun(instance::MLFlow, experiment_id::Integer;
3533
run_name::Union{String, Missing}=missing,
3634
start_time::Union{Integer, Missing}=missing,
37-
tags::Union{Dict{<:Any}, Array{<:Any}}=[]) =
35+
tags::MLFlowUpsertData{Tag}=Tag[]) =
3836
createrun(instance, string(experiment_id); run_name=run_name,
3937
start_time=start_time, tags=tags)
4038
createrun(instance::MLFlow, experiment::Experiment;
4139
run_name::Union{String, Missing}=missing,
4240
start_time::Union{Integer, Missing}=missing,
43-
tags::Union{Dict{<:Any}, Array{<:Any}}=[]) =
41+
tags::MLFlowUpsertData{Tag}=Tag[]) =
4442
createrun(instance, string(experiment.experiment_id); run_name=run_name,
4543
start_time=start_time, tags=tags)
4644

src/types/mlflow.jl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ mlf = MLFlow(remote_url, headers=Dict("Authorization" => "Bearer <your-secret-to
2828
"""
2929
struct MLFlow
3030
apiroot::String
31-
apiversion::Union{Integer, AbstractFloat}
31+
apiversion::AbstractFloat
3232
headers::Dict
3333
end
3434
MLFlow(apiroot; apiversion=2.0, headers=Dict()) = MLFlow(apiroot, apiversion, headers)
@@ -40,3 +40,5 @@ function MLFlow()
4040
return MLFlow(apiroot)
4141
end
4242
Base.show(io::IO, t::MLFlow) = show(io, ShowCase(t, [:apiroot,:apiversion], new_lines=true))
43+
44+
abstract type LoggingData end

src/types/run.jl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ Metric associated with a run, represented as a key-value pair.
99
- `timestamp::Int64`: The timestamp at which this metric was recorded.
1010
- `step::Int64`: Step at which to log the metric.
1111
"""
12-
struct Metric
12+
struct Metric <: LoggingData
1313
key::String
1414
value::Float64
1515
timestamp::Int64
1616
step::Int64
1717
end
18+
Metric(data::Dict{String, Any}) = Metric(data["key"], data["value"],
19+
data["timestamp"], data["step"])
1820
Base.show(io::IO, t::Metric) = show(io, ShowCase(t, new_lines=true))
1921

2022
"""
@@ -26,10 +28,11 @@ Param associated with a run.
2628
- `key::String`: Key identifying this param.
2729
- `value::String`: Value associated with this param.
2830
"""
29-
struct Param
31+
struct Param <: LoggingData
3032
key::String
3133
value::String
3234
end
35+
Param(data::Dict{String, Any}) = Param(data["key"], data["value"])
3336
Base.show(io::IO, t::Param) = show(io, ShowCase(t, new_lines=true))
3437

3538
"""

src/types/tag.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ Generic tag type for MLFlow entities.
77
- `key::String`: The tag key.
88
- `value::String`: The tag value.
99
"""
10-
struct Tag
10+
struct Tag <: LoggingData
1111
key::String
1212
value::String
1313
end
14-
Tag(data::Dict{String, Any}) = Tag(data["key"], data["value"])
14+
Tag(data::Dict{String, Any})::Tag = Tag(data["key"], data["value"] |> string)
1515
Base.show(io::IO, t::Tag) = show(io, ShowCase(t, new_lines=true))

src/utils.jl

Lines changed: 35 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,89 +1,55 @@
1-
IntOrString = Union{Int, String}
1+
const IntOrString = Union{Int, String}
2+
const MLFlowUpsertData{T} = Union{Array{T}, Dict{String, String},
3+
Array{Pair{String, String}}, Array{Dict{String, String}}}
24

35
const MLFLOW_ERROR_CODES = (;
46
RESOURCE_ALREADY_EXISTS = "RESOURCE_ALREADY_EXISTS",
57
RESOURCE_DOES_NOT_EXIST = "RESOURCE_DOES_NOT_EXIST",
68
)
79

8-
"""
9-
pairtags_to_dictarray(pair_array::Array{Pair{Any, Any}})
10-
11-
Transforms an array of `Pair` tags into an array of MLFlow compatible `Dict`
12-
format tags.
13-
14-
```@example
15-
# Having an array of pairs
16-
["foo" => "bar", "missy" => "gala"]
17-
18-
# Will be transformed into an array of dictionaries
19-
[Dict("key" => "foo", "value" => "bar"), Dict("key" => "missy", "value" => "gala")]
20-
```
21-
"""
22-
function pairtags_to_dictarray(pair_array::Array{<:Pair})::Array{<:Dict}
23-
dict_array = Dict[]
24-
for pair in pair_array
25-
key = string(pair.first)
26-
value = string(pair.second)
27-
push!(dict_array, Dict("key" => key, "value" => value))
10+
function dict_to_array(dict::Dict{String, String})::MLFlowUpsertData
11+
tags = Tag[]
12+
for (key, value) in dict
13+
push!(tags, Tag(key, value))
2814
end
2915

30-
return dict_array
16+
return tags
3117
end
3218

33-
"""
34-
tagsdict_to_dictarray(dict::Dict{Any, Any})
35-
36-
Transforms a dictionary into an array of `Dict`.
37-
38-
```@example
39-
# Having a dictionary
40-
Dict("foo" => "bar", "missy" => "gala")
41-
42-
# Will be transformed into an array of dictionaries
43-
[Dict("key" => "foo", "value" => "bar"), Dict("key" => "missy", "value" => "gala")]
44-
```
45-
"""
46-
function tagsdict_to_dictarray(dict::Dict{<:Any})::Array{<:Dict}
47-
dict_array = Dict[]
48-
for (key, value) in dict
49-
push!(dict_array, Dict("key" => key |> string,
50-
"value" => value |> string))
19+
function pairsarray_to_array(pair_array::Array{<:Pair})::MLFlowUpsertData
20+
entity_array = Tag[]
21+
for pair in pair_array
22+
println(pair)
23+
key = pair.first |> string
24+
value = pair.second |> string
25+
push!(entity_array, Tag(key, value))
5126
end
5227

53-
return dict_array
28+
return entity_array
5429
end
5530

56-
"""
57-
tagarray_to_dictarray(tag_array::Array{Tag})
58-
59-
Transforms an array of `Tag` into an array of `Dict`.
60-
61-
```@example
62-
# Having an array of tags
63-
[Tag("foo", "bar"), Tag("missy", "gala")]
64-
65-
# Will be transformed into an array of dictionaries
66-
[Dict("key" => "foo", "value" => "bar"), Dict("key" => "missy", "value" => "gala")]
67-
```
68-
"""
69-
function tagarray_to_dictarray(tag_array::Array{Tag})::Array{<:Dict}
70-
dict_array = Dict[]
71-
for tag in tag_array
72-
push!(dict_array, Dict("key" => tag.key , "value" => tag.value))
31+
function dictarray_to_array(dict_array::Array{Dict{String, String}})::MLFlowUpsertData
32+
tags = Tag[]
33+
for dict in dict_array
34+
push!(tags, Tag(dict["key"], dict["value"]))
7335
end
7436

75-
return dict_array
37+
return tags
7638
end
7739

78-
function parsetags(tags::Union{Dict{<:Any}, Array{<:Any}})::Array{<:Dict}
79-
parsed_tags = Dict[]
80-
if tags isa Array{Tag}
81-
parsed_tags = tags |> tagarray_to_dictarray
82-
elseif tags isa Array{<:Pair}
83-
parsed_tags = tags |> pairtags_to_dictarray
84-
elseif tags isa Dict{<:Any}
85-
parsed_tags = tags |> tagsdict_to_dictarray
40+
function parse(entities::MLFlowUpsertData{T}) where T<:LoggingData
41+
println(typeof(entities))
42+
if entities isa Dict{String, String}
43+
return entities |> dict_to_array
44+
elseif entities isa Array{Pair{String, String}}
45+
return entities |> pairsarray_to_array
46+
elseif entities isa Array{Dict{String, String}}
47+
return entities |> dictarray_to_array
8648
end
87-
88-
return parsed_tags
49+
return entities
8950
end
51+
52+
refresh(instance::MLFlow, experiment::Experiment)::Experiment =
53+
getexperiment(instance, experiment.experiment_id)
54+
refresh(instance::MLFlow, run::Run)::Run =
55+
getrun(instance, run.info.run_id)

test/runtests.jl

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,5 @@ end
55
include("base.jl")
66

77
include("services/experiment.jl")
8-
include("services/run.jl")
9-
# include("test_functional.jl")
10-
# include("test_runs.jl")
11-
# include("test_loggers.jl")
8+
# include("services/run.jl")
9+
# include("services/loggers.jl")

0 commit comments

Comments
 (0)