Skip to content

Commit ad3c040

Browse files
authored
Merge pull request #42 from JuliaComputing/tan/retry
also allow retries of mutating APIs
2 parents 8b9ebd0 + 56c6b22 commit ad3c040

File tree

4 files changed

+81
-32
lines changed

4 files changed

+81
-32
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ A Kubernetes context can be manipulated with:
7777

7878
- `set_server`: Set the API server location ("http://localhost:8001" if not set)
7979
- `set_ns`: Set the namespace to deal with (`default` namespace is not set)
80-
- `set_retries`: Set the number of times an API call should be retried on a retriable error (5 if not set)
80+
- `set_retries`: Set the number of times an API call should be retried on a retriable error (5 if not set) and whether all APIs should be retried (only non mutating APIs are retried by default)
8181

8282
Other convenience methods:
8383

src/helpers.jl

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ mutable struct KuberContext
3737
modelapi::Dict{Symbol,KApi}
3838
namespace::String
3939
default_retries::Int
40+
retry_all_apis::Bool
4041
initialized::Bool
4142

4243
function KuberContext()
@@ -51,6 +52,7 @@ mutable struct KuberContext
5152
kctx.modelapi = Dict{Symbol,KApi}()
5253
kctx.namespace = DEFAULT_NAMESPACE
5354
kctx.default_retries = 5
55+
kctx.retry_all_apis = false
5456
kctx.initialized = false
5557
return kctx
5658
end
@@ -61,11 +63,23 @@ struct KuberWatchContext
6163
stream::KuberEventStream
6264
end
6365

64-
function set_retries(ctx::KuberContext, n)
65-
ctx.default_retries = n
66+
"""
67+
set_retries(ctx; count=5, all_apis=false)
68+
69+
Args:
70+
- ctx: the context to set the options for
71+
72+
Keyword Args:
73+
- count: how many times to retry (default 5)
74+
- all_apis: whether to retry even mutating APIs e.g. `put!` (default false)
75+
"""
76+
function set_retries(ctx::KuberContext; count::Int=ctx.default_retries, all_apis::Bool=ctx.all_apis)
77+
ctx.default_retries = count
78+
ctx.retry_all_apis = all_apis
79+
ctx
6680
end
67-
retries(ctx::KuberContext) = ctx.default_retries
68-
retries(watch::KuberWatchContext) = retries(watch.ctx)
81+
retries(ctx::KuberContext, mutating::Bool=true) = (mutating && !ctx.retry_all_apis) ? 1 : ctx.default_retries
82+
retries(watch::KuberWatchContext, mutating::Bool=true) = retries(watch.ctx, mutating)
6983

7084
convert(::Type{Vector{UInt8}}, s::T) where {T<:AbstractString} = collect(codeunits(s))
7185
convert(::Type{T}, json::String) where {T<:SwaggerModel} = convert(T, JSON.parse(json))
@@ -118,14 +132,37 @@ show(io::IO, ctx::KuberContext) = print("Kubernetes namespace ", ctx.namespace,
118132
get_server(ctx::KuberContext) = ctx.client.root
119133
get_ns(ctx::KuberContext) = ctx.namespace
120134

121-
function set_server(ctx::KuberContext, uri::String=DEFAULT_URI, reset_api_versions::Bool=false; max_tries=retries(ctx), kwargs...)
135+
"""
136+
set_server(ctx, uri, reset_api_versions=false; max_tries=5, kwargs...)
137+
138+
Set the Kubernetes API server endpoint for a context.
139+
140+
Args:
141+
- ctx: the context for which to set the API server endpoint
142+
- uri: the API server endpoint uri
143+
- reset_api_versions: whether to probe the server again for API versions supported (false by default)
144+
145+
Keyword Args:
146+
- max_tries: retries allowed while probing API versions from server
147+
- kwargs: other keyword args to pass on while constructing the client for API server (see Swagger.jl - https://github.com/JuliaComputing/Swagger.jl#readme)
148+
"""
149+
function set_server(ctx::KuberContext, uri::String=DEFAULT_URI, reset_api_versions::Bool=false; max_tries=retries(ctx, false), kwargs...)
122150
rtfn = (default,data)->kuber_type(ctx, default, data)
123151
ctx.client = Swagger.Client(uri; get_return_type=rtfn, kwargs...)
124152
ctx.client.headers["Connection"] = "close"
125153
reset_api_versions && set_api_versions!(ctx; max_tries=max_tries)
126154
ctx.client
127155
end
128156

157+
"""
158+
set_ns(ctx, namespace)
159+
160+
Set the namespace this context should operate with.
161+
162+
Args:
163+
- ctx: the context to set the namespace for
164+
- namespace: the namespace to set (String)
165+
"""
129166
set_ns(ctx::KuberContext, namespace::String) = (ctx.namespace = namespace)
130167

131168
camel(a) = string(uppercase(a[1])) * (a[2:end])
@@ -170,7 +207,7 @@ function override_pref(name, server_pref, override)
170207
server_pref
171208
end
172209

173-
function fetch_misc_apis_versions(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx))
210+
function fetch_misc_apis_versions(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx, false))
174211
apis = ctx.apis
175212
vers = k8s_retry(; max_tries=max_tries) do
176213
getAPIVersions(ApisApi(ctx.client))
@@ -209,7 +246,7 @@ function fetch_misc_apis_versions(ctx::KuberContext; override=nothing, verbose::
209246
apis
210247
end
211248

212-
function fetch_core_version(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx))
249+
function fetch_core_version(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx, false))
213250
apis = ctx.apis
214251
api_vers = k8s_retry(; max_tries=max_tries) do
215252
getCoreAPIVersions(CoreApi(ctx.client))
@@ -257,7 +294,7 @@ function build_model_api_map(ctx::KuberContext)
257294
modelapi
258295
end
259296

260-
function set_api_versions!(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx))
297+
function set_api_versions!(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx, false))
261298
ctx.initialized = false
262299
empty!(ctx.apis)
263300
empty!(ctx.modelapi)

src/simpleapi.jl

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ _delopts(; kwargs...) = Typedefs.MetaV1.DeleteOptions(; preconditions=Typedefs.M
1212
_kubectx(ctx::KuberContext) = ctx
1313
_kubectx(ctx::KuberWatchContext) = ctx.ctx
1414

15-
function _get_apictx(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, apiversion::Union{String,Nothing}; max_tries::Int=retries(ctx))
15+
function _get_apictx(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, apiversion::Union{String,Nothing}; max_tries::Int=retries(ctx, false))
1616
kubectx = _kubectx(ctx)
1717
kubectx.initialized || set_api_versions!(kubectx; max_tries=max_tries)
1818

@@ -50,7 +50,7 @@ end
5050
function list(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, name::String;
5151
apiversion::Union{String,Nothing}=nothing,
5252
namespace::Union{String,Nothing}=_kubectx(ctx).namespace,
53-
max_tries::Int=retries(ctx),
53+
max_tries::Int=retries(ctx, false),
5454
watch=isa(ctx, KuberWatchContext),
5555
resourceVersion=nothing,
5656
kwargs...)
@@ -76,7 +76,7 @@ function list(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, name::Strin
7676
end
7777
end
7878

79-
# if not watching, retuen the first result
79+
# if not watching, return the first result
8080
watch || (return result)
8181
if result !== nothing
8282
resourceVersion = result.metadata.resourceVersion
@@ -93,7 +93,7 @@ end
9393
function list(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol;
9494
apiversion::Union{String,Nothing}=nothing,
9595
namespace::Union{String,Nothing}=_kubectx(ctx).namespace,
96-
max_tries::Int=retries(ctx),
96+
max_tries::Int=retries(ctx, false),
9797
watch=isa(ctx, KuberWatchContext),
9898
resourceVersion=nothing,
9999
kwargs...)
@@ -177,7 +177,7 @@ function get(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol;
177177
apiversion::Union{String,Nothing}=nothing,
178178
label_selector=nothing,
179179
namespace::Union{String,Nothing}=_kubectx(ctx).namespace,
180-
max_tries::Integer=retries(ctx),
180+
max_tries::Integer=retries(ctx, false),
181181
watch=isa(ctx, KuberWatchContext),
182182
resourceVersion=nothing,
183183
kwargs...)
@@ -219,7 +219,7 @@ end
219219
function watch(ctx::KuberContext, O::Symbol, outstream::Channel, name::String;
220220
apiversion::Union{String,Nothing}=nothing,
221221
namespace::Union{String,Nothing}=ctx.namespace,
222-
max_tries::Int=retries(ctx),
222+
max_tries::Int=retries(ctx, false),
223223
kwargs...)
224224
apictx = _get_apictx(ctx, O, apiversion; max_tries=max_tries)
225225
namespaced = (namespace !== nothing) && !isempty(namespace)
@@ -246,7 +246,7 @@ end
246246
function watch(ctx::KuberContext, O::Symbol, outstream::Channel;
247247
apiversion::Union{String,Nothing}=nothing,
248248
namespace::Union{String,Nothing}=ctx.namespace,
249-
max_tries::Int=retries(ctx),
249+
max_tries::Int=retries(ctx, false),
250250
kwargs...)
251251
apictx = _get_apictx(ctx, O, apiversion; max_tries=max_tries)
252252
namespaced = (namespace !== nothing) && !isempty(namespace)
@@ -270,58 +270,70 @@ function watch(ctx::KuberContext, O::Symbol, outstream::Channel;
270270
end
271271
end
272272

273-
function put!(ctx::KuberContext, v::T) where {T<:SwaggerModel}
273+
function put!(ctx::KuberContext, v::T; max_tries::Int=retries(ctx, true)) where {T<:SwaggerModel}
274274
vjson = convert(Dict{String,Any}, v)
275-
put!(ctx, Symbol(vjson["kind"]), vjson)
275+
put!(ctx, Symbol(vjson["kind"]), vjson; max_tries=max_tries)
276276
end
277277

278-
function put!(ctx::KuberContext, O::Symbol, d::Dict{String,Any})
278+
function put!(ctx::KuberContext, O::Symbol, d::Dict{String,Any}; max_tries::Int=retries(ctx, true))
279279
apictx = _get_apictx(ctx, O, get(d, "apiVersion", nothing))
280280
if (apicall = _api_function("create$O")) !== nothing
281-
return apicall(apictx, d)
281+
return k8s_retry(; max_tries=max_tries) do
282+
apicall(apictx, d)
283+
end
282284
elseif (apicall = _api_function("createNamespaced$O")) !== nothing
283-
return apicall(apictx, ctx.namespace, d)
285+
return k8s_retry(; max_tries=max_tries) do
286+
apicall(apictx, ctx.namespace, d)
287+
end
284288
else
285289
throw(ArgumentError("No API functions could be located using :$O"))
286290
end
287291
end
288292

289-
function delete!(ctx::KuberContext, v::T; kwargs...) where {T<:SwaggerModel}
293+
function delete!(ctx::KuberContext, v::T; max_tries::Int=retries(ctx, true), kwargs...) where {T<:SwaggerModel}
290294
vjson = convert(Dict{String,Any}, v)
291295
kind = vjson["kind"]
292296
name = vjson["metadata"]["name"]
293-
delete!(ctx, Symbol(kind), name; apiversion=get(vjson, "apiVersion", nothing), kwargs...)
297+
delete!(ctx, Symbol(kind), name; apiversion=get(vjson, "apiVersion", nothing), max_tries=max_tries, kwargs...)
294298
end
295299

296-
function delete!(ctx::KuberContext, O::Symbol, name::String; apiversion::Union{String,Nothing}=nothing, kwargs...)
300+
function delete!(ctx::KuberContext, O::Symbol, name::String; apiversion::Union{String,Nothing}=nothing, max_tries::Int=retries(ctx, true), kwargs...)
297301
apictx = _get_apictx(ctx, O, apiversion)
298302

299303
params = [apictx, name]
300304

301305
if (apicall = _api_function("delete$O")) !== nothing
302-
return apicall(params...; kwargs...)
306+
return k8s_retry(; max_tries=max_tries) do
307+
apicall(params...; kwargs...)
308+
end
303309
elseif (apicall = _api_function("deleteNamespaced$O")) !== nothing
304310
push!(params, ctx.namespace)
305-
return apicall(params...; kwargs...)
311+
return k8s_retry(; max_tries=max_tries) do
312+
apicall(params...; kwargs...)
313+
end
306314
else
307315
throw(ArgumentError("No API functions could be located using :$O"))
308316
end
309317
end
310318

311-
function update!(ctx::KuberContext, v::T, patch, patch_type) where {T<:SwaggerModel}
319+
function update!(ctx::KuberContext, v::T, patch, patch_type; max_tries::Int=retries(ctx, true)) where {T<:SwaggerModel}
312320
vjson = convert(Dict{String,Any}, v)
313321
kind = vjson["kind"]
314322
name = vjson["metadata"]["name"]
315-
update!(ctx, Symbol(kind), name, patch, patch_type; apiversion=get(vjson, "apiVersion", nothing))
323+
update!(ctx, Symbol(kind), name, patch, patch_type; apiversion=get(vjson, "apiVersion", nothing), max_tries=max_tries)
316324
end
317325

318-
function update!(ctx::KuberContext, O::Symbol, name::String, patch, patch_type; apiversion::Union{String,Nothing}=nothing)
326+
function update!(ctx::KuberContext, O::Symbol, name::String, patch, patch_type; apiversion::Union{String,Nothing}=nothing, max_tries::Int=retries(ctx, true))
319327
apictx = _get_apictx(ctx, O, apiversion)
320328

321329
if (apicall = _api_function("patch$O")) !== nothing
322-
return apicall(apictx, name, patch; _mediaType=patch_type)
330+
return k8s_retry(; max_tries=max_tries) do
331+
apicall(apictx, name, patch; _mediaType=patch_type)
332+
end
323333
elseif (apicall = _api_function("patchNamespaced$O")) !== nothing
324-
return apicall(apictx, name, ctx.namespace, patch; _mediaType=patch_type)
334+
return k8s_retry(; max_tries=max_tries) do
335+
apicall(apictx, name, ctx.namespace, patch; _mediaType=patch_type)
336+
end
325337
else
326338
throw(ArgumentError("No API functions could be located using :$O"))
327339
end

test/runtests.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ function init_context(override=nothing, verbose=true)
1515
ctx = KuberContext()
1616
set_server(ctx, "http://localhost:8001")
1717
set_ns(ctx, "default")
18-
set_retries(ctx, 3)
18+
set_retries(ctx; count=3, all_apis=false)
1919
Kuber.set_api_versions!(ctx; override=override, verbose=verbose)
2020
ctx
2121
end

0 commit comments

Comments
 (0)