Skip to content

Commit c4bd0be

Browse files
committed
Finishing with logbatch functionality
1 parent 8c14210 commit c4bd0be

File tree

10 files changed

+235
-56
lines changed

10 files changed

+235
-56
lines changed

src/api.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ MLFlowClient.uri(mlf, "experiments/get", Dict(:experiment_id=>10))
99
```
1010
"""
1111
uri(mlf::MLFlow, endpoint::String;
12-
parameters::Dict{Symbol, <:Any}=Dict{Symbol, IntOrString}()) =
12+
parameters::Dict{Symbol, <:Any}=Dict{Symbol, NumberOrString}()) =
1313
URI("$(mlf.apiroot)/$(mlf.apiversion)/mlflow/$(endpoint)";
1414
query=parameters)
1515

src/services/experiment.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ function createexperiment(instance::MLFlow, name::String;
2323
tags::MLFlowUpsertData{Tag}=Tag[])::String
2424
try
2525
result = mlfpost(instance, "experiments/create"; name=name,
26-
artifact_location=artifact_location, tags=(tags |> parse))
26+
artifact_location=artifact_location,
27+
tags=parse(Tag, tags))
2728
return result["experiment_id"]
2829
catch e
2930
if isa(e, HTTP.ExceptionRequest.StatusError) && e.status == 400

src/services/loggers.jl

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ logmetric(instance::MLFlow, run::Run, key::String, value::Float64;
2828
logmetric(instance, run.info.run_id, key, value; timestamp=timestamp, step=step)
2929

3030
"""
31-
logbatch(instance::MLFlow, run_id::String, metrics::Array{Metric},
31+
logbatch(instance::MLFlow, run_id::String; metrics::Array{Metric},
32+
params::Array{Param}, tags::Array{Tag})
33+
logbatch(instance::MLFlow, run::Run; metrics::Array{Metric},
3234
params::Array{Param}, tags::Array{Tag})
3335
3436
Log a batch of metrics, params, and tags for a run. In case of error, partial
@@ -37,8 +39,15 @@ data may be written.
3739
For more information about this function, check [MLFlow official documentation](https://mlflow.org/docs/latest/rest-api.html#log-batch).
3840
"""
3941
function logbatch(instance::MLFlow, run_id::String;
40-
metrics::Array{Metric}=Metric[], params::Array{Param}=Param[],
41-
tags::MLFlowUpsertData{Tag}=Tag[])
42-
mlfpost(instance, "runs/log-batch"; run_id=run_id, metrics=metrics,
43-
params=params, tags=(tags |> parse))
42+
metrics::MLFlowUpsertData{Metric}=Metric[],
43+
params::MLFlowUpsertData{Param}=Param[], tags::MLFlowUpsertData{Tag}=Tag[])
44+
mlfpost(instance, "runs/log-batch"; run_id=run_id,
45+
metrics=parse(Metric, metrics), params=parse(Param, params),
46+
tags=parse(Tag, tags))
4447
end
48+
logbatch(instance::MLFlow, run::Run;
49+
metrics::MLFlowUpsertData{Metric}=Metric[],
50+
params::MLFlowUpsertData{Param}=Param[],
51+
tags::MLFlowUpsertData{Tag}=Tag[]) =
52+
logbatch(instance, run.info.run_id; metrics=metrics, params=params,
53+
tags=tags)

src/services/run.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ function createrun(instance::MLFlow, experiment_id::String;
2323
tags::MLFlowUpsertData{Tag}=Tag[])
2424
try
2525
result = mlfpost(instance, "runs/create"; experiment_id=experiment_id,
26-
run_name=run_name, start_time=start_time, tags=(tags |> parse))
26+
run_name=run_name, start_time=start_time, tags=parse(Tag, tags))
2727
return result["run"] |> Run
2828
catch e
2929
throw(e)

src/types/run.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
11
"""
2-
Metric
2+
Metric <: LoggingData
33
44
Metric associated with a run, represented as a key-value pair.
55
66
# Fields
77
- `key::String`: Key identifying this metric.
88
- `value::Float64`: Value associated with this metric.
99
- `timestamp::Int64`: The timestamp at which this metric was recorded.
10-
- `step::Int64`: Step at which to log the metric.
10+
- `step::Union{Int64, Missing}`: Step at which to log the metric.
1111
"""
1212
struct Metric <: LoggingData
1313
key::String
1414
value::Float64
1515
timestamp::Int64
16-
step::Int64
16+
step::Union{Int64, Missing}
1717
end
1818
Metric(data::Dict{String, Any}) = Metric(data["key"], data["value"],
1919
data["timestamp"], data["step"])
2020
Base.show(io::IO, t::Metric) = show(io, ShowCase(t, new_lines=true))
2121

2222
"""
23-
Param
23+
Param <: LoggingData
2424
2525
Param associated with a run.
2626

src/types/tag.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""
2-
Tag
2+
Tag <: LoggingData
33
44
Generic tag type for MLFlow entities.
55

src/utils.jl

Lines changed: 76 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,101 @@
1-
const IntOrString = Union{Int, String}
2-
const MLFlowUpsertData{T} = Union{Array{T}, Dict{String, String},
3-
Array{Pair{String, String}}, Array{Dict{String, String}}}
1+
const NumberOrString = Union{Number, String}
2+
const MLFlowUpsertData{T} = Union{
3+
Array{T},
4+
Array{<:Dict{String, <:Any}},
5+
Dict{String, <:NumberOrString},
6+
Array{<:Pair{String, <:NumberOrString}},
7+
Array{<:Tuple{String, <:NumberOrString}}
8+
}
49

510
const MLFLOW_ERROR_CODES = (;
611
RESOURCE_ALREADY_EXISTS = "RESOURCE_ALREADY_EXISTS",
712
RESOURCE_DOES_NOT_EXIST = "RESOURCE_DOES_NOT_EXIST",
813
)
914

10-
function dict_to_array(dict::Dict{String, String})::MLFlowUpsertData
11-
tags = Tag[]
15+
function dict_to_T_array(::Type{T}, dict::Dict{String, <:NumberOrString}) where T<:LoggingData
16+
entities = T[]
1217
for (key, value) in dict
13-
push!(tags, Tag(key, value))
18+
if T<:Metric
19+
push!(entities, Metric(key, Float64(value),
20+
round(Int, now() |> datetime2unix), missing))
21+
else
22+
push!(entities, T(key, value |> string))
23+
end
1424
end
1525

16-
return tags
26+
return entities
1727
end
1828

19-
function pairsarray_to_array(pair_array::Array{<:Pair})::MLFlowUpsertData
20-
entity_array = Tag[]
29+
function pairarray_to_T_array(::Type{T}, pair_array::Array{<:Pair}) where T<:LoggingData
30+
entities = T[]
2131
for pair in pair_array
22-
println(pair)
2332
key = pair.first |> string
24-
value = pair.second |> string
25-
push!(entity_array, Tag(key, value))
33+
if T<:Metric
34+
value = pair.second
35+
push!(entities, Metric(key, Float64(value),
36+
round(Int, now() |> datetime2unix), missing))
37+
else
38+
value = pair.second |> string
39+
push!(entities, T(key, value))
40+
end
2641
end
2742

28-
return entity_array
43+
return entities
2944
end
3045

31-
function dictarray_to_array(dict_array::Array{Dict{String, String}})::MLFlowUpsertData
32-
tags = Tag[]
46+
function tuplearray_to_T_array(::Type{T},
47+
tuple_array::Array{<:Tuple{String, <:NumberOrString}}) where T<:LoggingData
48+
entities = T[]
49+
for tuple in tuple_array
50+
if length(tuple) != 2
51+
error("Tuple must have exactly two elements (format: (key, value))")
52+
end
53+
54+
key = tuple |> first |> string
55+
if T<: Metric
56+
value = tuple |> last
57+
push!(entities, Metric(key, Float64(value),
58+
round(Int, now() |> datetime2unix), missing))
59+
else
60+
value = tuple |> last |> string
61+
push!(entities, T(key, value))
62+
end
63+
end
64+
65+
return entities
66+
end
67+
68+
function dictarray_to_T_array(::Type{T},
69+
dict_array::Array{<:Dict{String, <:Any}}) where T<:LoggingData
70+
entities = T[]
3371
for dict in dict_array
34-
push!(tags, Tag(dict["key"], dict["value"]))
72+
key = dict["key"] |> string
73+
if T<:Metric
74+
value = Float64(dict["value"])
75+
if haskey(dict, "timestamp")
76+
timestamp = dict["timestamp"]
77+
else
78+
timestamp = round(Int, now() |> datetime2unix)
79+
end
80+
push!(entities, Metric(key, value, timestamp, missing))
81+
else
82+
value = dict["value"] |> string
83+
push!(entities, T(key, value))
84+
end
3585
end
3686

37-
return tags
87+
return entities
3888
end
3989

40-
function parse(entities::MLFlowUpsertData{T}) where T<:LoggingData
41-
println(typeof(entities))
42-
if entities isa Dict{String, String}
43-
return entities |> dict_to_array
44-
elseif entities isa Array{Pair{String, String}}
45-
return entities |> pairsarray_to_array
46-
elseif entities isa Array{Dict{String, String}}
47-
return entities |> dictarray_to_array
90+
function parse(::Type{T}, entities::MLFlowUpsertData{T}) where T<:LoggingData
91+
if entities isa Dict{String, <:NumberOrString}
92+
return dict_to_T_array(T, entities)
93+
elseif entities isa Array{<:Dict{String, <:Any}}
94+
return dictarray_to_T_array(T, entities)
95+
elseif entities isa Array{<:Pair{String, <:NumberOrString}}
96+
return pairarray_to_T_array(T, entities)
97+
elseif entities isa Array{<:Tuple{String, <:NumberOrString}}
98+
return tuplearray_to_T_array(T, entities)
4899
end
49100
return entities
50101
end

test/runtests.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ end
55
include("base.jl")
66

77
include("services/experiment.jl")
8-
# include("services/run.jl")
9-
# include("services/loggers.jl")
8+
include("services/run.jl")
9+
include("services/loggers.jl")

test/services/experiment.jl

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,29 @@
1414
deleteexperiment(mlf, experiment.experiment_id)
1515
end
1616

17-
# @testset "with tags as array of tags" begin
18-
# experiment_id = createexperiment(mlf, UUIDs.uuid4() |> string;
19-
# tags=[Tag("test_key", "test_value")])
20-
# deleteexperiment(mlf, experiment_id)
21-
# end
17+
@testset "with tags as array of tags" begin
18+
experiment_id = createexperiment(mlf, UUIDs.uuid4() |> string;
19+
tags=[Tag("test_key", "test_value")])
20+
deleteexperiment(mlf, experiment_id)
21+
end
2222

2323
@testset "with tags as array of pairs" begin
2424
experiment_id = createexperiment(mlf, UUIDs.uuid4() |> string;
2525
tags=["test_key" => "test_value"])
2626
deleteexperiment(mlf, experiment_id)
2727
end
2828

29-
# @testset "with tags as array of dicts" begin
30-
# experiment_id = createexperiment(mlf, UUIDs.uuid4() |> string;
31-
# tags=[Dict("key" => "test_key", "value" => "test_value")])
32-
# deleteexperiment(mlf, experiment_id)
33-
# end
29+
@testset "with tags as array of dicts" begin
30+
experiment_id = createexperiment(mlf, UUIDs.uuid4() |> string;
31+
tags=[Dict("key" => "test_key", "value" => "test_value")])
32+
deleteexperiment(mlf, experiment_id)
33+
end
3434

35-
# @testset "with tags as dict" begin
36-
# experiment_id = createexperiment(mlf, UUIDs.uuid4() |> string;
37-
# tags=Dict("test_key" => "test_value"))
38-
# deleteexperiment(mlf, experiment_id)
39-
# end
35+
@testset "with tags as dict" begin
36+
experiment_id = createexperiment(mlf, UUIDs.uuid4() |> string;
37+
tags=Dict("test_key" => "test_value"))
38+
deleteexperiment(mlf, experiment_id)
39+
end
4040
end
4141

4242
@testset verbose = true "get experiment" begin

0 commit comments

Comments
 (0)