Skip to content

Commit 56c6b22

Browse files
committed
also allow retries of mutating APIs
State changing APIs are not retried by default. But added an option to switch that on when the caller desires. Should be safe when retried only on 50x server errors or connect errors. The option can be switched on for the KuberContext by calling the `set_retries` method. Alternatively each API call also accepts a `max_retries` keywork argument to override the default set in the context.
1 parent 8b9ebd0 commit 56c6b22

File tree

4 files changed

+81
-32
lines changed

4 files changed

+81
-32
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ A Kubernetes context can be manipulated with:
7777

7878
- `set_server`: Set the API server location ("http://localhost:8001" if not set)
7979
- `set_ns`: Set the namespace to deal with (`default` namespace is not set)
80-
- `set_retries`: Set the number of times an API call should be retried on a retriable error (5 if not set)
80+
- `set_retries`: Set the number of times an API call should be retried on a retriable error (5 if not set) and whether all APIs should be retried (only non mutating APIs are retried by default)
8181

8282
Other convenience methods:
8383

src/helpers.jl

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ mutable struct KuberContext
3737
modelapi::Dict{Symbol,KApi}
3838
namespace::String
3939
default_retries::Int
40+
retry_all_apis::Bool
4041
initialized::Bool
4142

4243
function KuberContext()
@@ -51,6 +52,7 @@ mutable struct KuberContext
5152
kctx.modelapi = Dict{Symbol,KApi}()
5253
kctx.namespace = DEFAULT_NAMESPACE
5354
kctx.default_retries = 5
55+
kctx.retry_all_apis = false
5456
kctx.initialized = false
5557
return kctx
5658
end
@@ -61,11 +63,23 @@ struct KuberWatchContext
6163
stream::KuberEventStream
6264
end
6365

64-
function set_retries(ctx::KuberContext, n)
65-
ctx.default_retries = n
66+
"""
67+
set_retries(ctx; count=5, all_apis=false)
68+
69+
Args:
70+
- ctx: the context to set the options for
71+
72+
Keyword Args:
73+
- count: how many times to retry (default 5)
74+
- all_apis: whether to retry even mutating APIs e.g. `put!` (default false)
75+
"""
76+
function set_retries(ctx::KuberContext; count::Int=ctx.default_retries, all_apis::Bool=ctx.all_apis)
77+
ctx.default_retries = count
78+
ctx.retry_all_apis = all_apis
79+
ctx
6680
end
67-
retries(ctx::KuberContext) = ctx.default_retries
68-
retries(watch::KuberWatchContext) = retries(watch.ctx)
81+
retries(ctx::KuberContext, mutating::Bool=true) = (mutating && !ctx.retry_all_apis) ? 1 : ctx.default_retries
82+
retries(watch::KuberWatchContext, mutating::Bool=true) = retries(watch.ctx, mutating)
6983

7084
convert(::Type{Vector{UInt8}}, s::T) where {T<:AbstractString} = collect(codeunits(s))
7185
convert(::Type{T}, json::String) where {T<:SwaggerModel} = convert(T, JSON.parse(json))
@@ -118,14 +132,37 @@ show(io::IO, ctx::KuberContext) = print("Kubernetes namespace ", ctx.namespace,
118132
get_server(ctx::KuberContext) = ctx.client.root
119133
get_ns(ctx::KuberContext) = ctx.namespace
120134

121-
function set_server(ctx::KuberContext, uri::String=DEFAULT_URI, reset_api_versions::Bool=false; max_tries=retries(ctx), kwargs...)
135+
"""
136+
set_server(ctx, uri, reset_api_versions=false; max_tries=5, kwargs...)
137+
138+
Set the Kubernetes API server endpoint for a context.
139+
140+
Args:
141+
- ctx: the context for which to set the API server endpoint
142+
- uri: the API server endpoint uri
143+
- reset_api_versions: whether to probe the server again for API versions supported (false by default)
144+
145+
Keyword Args:
146+
- max_tries: retries allowed while probing API versions from server
147+
- kwargs: other keyword args to pass on while constructing the client for API server (see Swagger.jl - https://github.com/JuliaComputing/Swagger.jl#readme)
148+
"""
149+
function set_server(ctx::KuberContext, uri::String=DEFAULT_URI, reset_api_versions::Bool=false; max_tries=retries(ctx, false), kwargs...)
122150
rtfn = (default,data)->kuber_type(ctx, default, data)
123151
ctx.client = Swagger.Client(uri; get_return_type=rtfn, kwargs...)
124152
ctx.client.headers["Connection"] = "close"
125153
reset_api_versions && set_api_versions!(ctx; max_tries=max_tries)
126154
ctx.client
127155
end
128156

157+
"""
158+
set_ns(ctx, namespace)
159+
160+
Set the namespace this context should operate with.
161+
162+
Args:
163+
- ctx: the context to set the namespace for
164+
- namespace: the namespace to set (String)
165+
"""
129166
set_ns(ctx::KuberContext, namespace::String) = (ctx.namespace = namespace)
130167

131168
camel(a) = string(uppercase(a[1])) * (a[2:end])
@@ -170,7 +207,7 @@ function override_pref(name, server_pref, override)
170207
server_pref
171208
end
172209

173-
function fetch_misc_apis_versions(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx))
210+
function fetch_misc_apis_versions(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx, false))
174211
apis = ctx.apis
175212
vers = k8s_retry(; max_tries=max_tries) do
176213
getAPIVersions(ApisApi(ctx.client))
@@ -209,7 +246,7 @@ function fetch_misc_apis_versions(ctx::KuberContext; override=nothing, verbose::
209246
apis
210247
end
211248

212-
function fetch_core_version(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx))
249+
function fetch_core_version(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx, false))
213250
apis = ctx.apis
214251
api_vers = k8s_retry(; max_tries=max_tries) do
215252
getCoreAPIVersions(CoreApi(ctx.client))
@@ -257,7 +294,7 @@ function build_model_api_map(ctx::KuberContext)
257294
modelapi
258295
end
259296

260-
function set_api_versions!(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx))
297+
function set_api_versions!(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx, false))
261298
ctx.initialized = false
262299
empty!(ctx.apis)
263300
empty!(ctx.modelapi)

src/simpleapi.jl

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ _delopts(; kwargs...) = Typedefs.MetaV1.DeleteOptions(; preconditions=Typedefs.M
1212
_kubectx(ctx::KuberContext) = ctx
1313
_kubectx(ctx::KuberWatchContext) = ctx.ctx
1414

15-
function _get_apictx(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, apiversion::Union{String,Nothing}; max_tries::Int=retries(ctx))
15+
function _get_apictx(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, apiversion::Union{String,Nothing}; max_tries::Int=retries(ctx, false))
1616
kubectx = _kubectx(ctx)
1717
kubectx.initialized || set_api_versions!(kubectx; max_tries=max_tries)
1818

@@ -50,7 +50,7 @@ end
5050
function list(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, name::String;
5151
apiversion::Union{String,Nothing}=nothing,
5252
namespace::Union{String,Nothing}=_kubectx(ctx).namespace,
53-
max_tries::Int=retries(ctx),
53+
max_tries::Int=retries(ctx, false),
5454
watch=isa(ctx, KuberWatchContext),
5555
resourceVersion=nothing,
5656
kwargs...)
@@ -76,7 +76,7 @@ function list(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, name::Strin
7676
end
7777
end
7878

79-
# if not watching, retuen the first result
79+
# if not watching, return the first result
8080
watch || (return result)
8181
if result !== nothing
8282
resourceVersion = result.metadata.resourceVersion
@@ -93,7 +93,7 @@ end
9393
function list(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol;
9494
apiversion::Union{String,Nothing}=nothing,
9595
namespace::Union{String,Nothing}=_kubectx(ctx).namespace,
96-
max_tries::Int=retries(ctx),
96+
max_tries::Int=retries(ctx, false),
9797
watch=isa(ctx, KuberWatchContext),
9898
resourceVersion=nothing,
9999
kwargs...)
@@ -177,7 +177,7 @@ function get(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol;
177177
apiversion::Union{String,Nothing}=nothing,
178178
label_selector=nothing,
179179
namespace::Union{String,Nothing}=_kubectx(ctx).namespace,
180-
max_tries::Integer=retries(ctx),
180+
max_tries::Integer=retries(ctx, false),
181181
watch=isa(ctx, KuberWatchContext),
182182
resourceVersion=nothing,
183183
kwargs...)
@@ -219,7 +219,7 @@ end
219219
function watch(ctx::KuberContext, O::Symbol, outstream::Channel, name::String;
220220
apiversion::Union{String,Nothing}=nothing,
221221
namespace::Union{String,Nothing}=ctx.namespace,
222-
max_tries::Int=retries(ctx),
222+
max_tries::Int=retries(ctx, false),
223223
kwargs...)
224224
apictx = _get_apictx(ctx, O, apiversion; max_tries=max_tries)
225225
namespaced = (namespace !== nothing) && !isempty(namespace)
@@ -246,7 +246,7 @@ end
246246
function watch(ctx::KuberContext, O::Symbol, outstream::Channel;
247247
apiversion::Union{String,Nothing}=nothing,
248248
namespace::Union{String,Nothing}=ctx.namespace,
249-
max_tries::Int=retries(ctx),
249+
max_tries::Int=retries(ctx, false),
250250
kwargs...)
251251
apictx = _get_apictx(ctx, O, apiversion; max_tries=max_tries)
252252
namespaced = (namespace !== nothing) && !isempty(namespace)
@@ -270,58 +270,70 @@ function watch(ctx::KuberContext, O::Symbol, outstream::Channel;
270270
end
271271
end
272272

273-
function put!(ctx::KuberContext, v::T) where {T<:SwaggerModel}
273+
function put!(ctx::KuberContext, v::T; max_tries::Int=retries(ctx, true)) where {T<:SwaggerModel}
274274
vjson = convert(Dict{String,Any}, v)
275-
put!(ctx, Symbol(vjson["kind"]), vjson)
275+
put!(ctx, Symbol(vjson["kind"]), vjson; max_tries=max_tries)
276276
end
277277

278-
function put!(ctx::KuberContext, O::Symbol, d::Dict{String,Any})
278+
function put!(ctx::KuberContext, O::Symbol, d::Dict{String,Any}; max_tries::Int=retries(ctx, true))
279279
apictx = _get_apictx(ctx, O, get(d, "apiVersion", nothing))
280280
if (apicall = _api_function("create$O")) !== nothing
281-
return apicall(apictx, d)
281+
return k8s_retry(; max_tries=max_tries) do
282+
apicall(apictx, d)
283+
end
282284
elseif (apicall = _api_function("createNamespaced$O")) !== nothing
283-
return apicall(apictx, ctx.namespace, d)
285+
return k8s_retry(; max_tries=max_tries) do
286+
apicall(apictx, ctx.namespace, d)
287+
end
284288
else
285289
throw(ArgumentError("No API functions could be located using :$O"))
286290
end
287291
end
288292

289-
function delete!(ctx::KuberContext, v::T; kwargs...) where {T<:SwaggerModel}
293+
function delete!(ctx::KuberContext, v::T; max_tries::Int=retries(ctx, true), kwargs...) where {T<:SwaggerModel}
290294
vjson = convert(Dict{String,Any}, v)
291295
kind = vjson["kind"]
292296
name = vjson["metadata"]["name"]
293-
delete!(ctx, Symbol(kind), name; apiversion=get(vjson, "apiVersion", nothing), kwargs...)
297+
delete!(ctx, Symbol(kind), name; apiversion=get(vjson, "apiVersion", nothing), max_tries=max_tries, kwargs...)
294298
end
295299

296-
function delete!(ctx::KuberContext, O::Symbol, name::String; apiversion::Union{String,Nothing}=nothing, kwargs...)
300+
function delete!(ctx::KuberContext, O::Symbol, name::String; apiversion::Union{String,Nothing}=nothing, max_tries::Int=retries(ctx, true), kwargs...)
297301
apictx = _get_apictx(ctx, O, apiversion)
298302

299303
params = [apictx, name]
300304

301305
if (apicall = _api_function("delete$O")) !== nothing
302-
return apicall(params...; kwargs...)
306+
return k8s_retry(; max_tries=max_tries) do
307+
apicall(params...; kwargs...)
308+
end
303309
elseif (apicall = _api_function("deleteNamespaced$O")) !== nothing
304310
push!(params, ctx.namespace)
305-
return apicall(params...; kwargs...)
311+
return k8s_retry(; max_tries=max_tries) do
312+
apicall(params...; kwargs...)
313+
end
306314
else
307315
throw(ArgumentError("No API functions could be located using :$O"))
308316
end
309317
end
310318

311-
function update!(ctx::KuberContext, v::T, patch, patch_type) where {T<:SwaggerModel}
319+
function update!(ctx::KuberContext, v::T, patch, patch_type; max_tries::Int=retries(ctx, true)) where {T<:SwaggerModel}
312320
vjson = convert(Dict{String,Any}, v)
313321
kind = vjson["kind"]
314322
name = vjson["metadata"]["name"]
315-
update!(ctx, Symbol(kind), name, patch, patch_type; apiversion=get(vjson, "apiVersion", nothing))
323+
update!(ctx, Symbol(kind), name, patch, patch_type; apiversion=get(vjson, "apiVersion", nothing), max_tries=max_tries)
316324
end
317325

318-
function update!(ctx::KuberContext, O::Symbol, name::String, patch, patch_type; apiversion::Union{String,Nothing}=nothing)
326+
function update!(ctx::KuberContext, O::Symbol, name::String, patch, patch_type; apiversion::Union{String,Nothing}=nothing, max_tries::Int=retries(ctx, true))
319327
apictx = _get_apictx(ctx, O, apiversion)
320328

321329
if (apicall = _api_function("patch$O")) !== nothing
322-
return apicall(apictx, name, patch; _mediaType=patch_type)
330+
return k8s_retry(; max_tries=max_tries) do
331+
apicall(apictx, name, patch; _mediaType=patch_type)
332+
end
323333
elseif (apicall = _api_function("patchNamespaced$O")) !== nothing
324-
return apicall(apictx, name, ctx.namespace, patch; _mediaType=patch_type)
334+
return k8s_retry(; max_tries=max_tries) do
335+
apicall(apictx, name, ctx.namespace, patch; _mediaType=patch_type)
336+
end
325337
else
326338
throw(ArgumentError("No API functions could be located using :$O"))
327339
end

test/runtests.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ function init_context(override=nothing, verbose=true)
1515
ctx = KuberContext()
1616
set_server(ctx, "http://localhost:8001")
1717
set_ns(ctx, "default")
18-
set_retries(ctx, 3)
18+
set_retries(ctx; count=3, all_apis=false)
1919
Kuber.set_api_versions!(ctx; override=override, verbose=verbose)
2020
ctx
2121
end

0 commit comments

Comments
 (0)