Skip to content

Commit 8b9ebd0

Browse files
authored
Merge pull request #41 from JuliaComputing/tan/retry
fix and improve retries
2 parents 089e02a + 4f4c5bd commit 8b9ebd0

File tree

6 files changed

+93
-74
lines changed

6 files changed

+93
-74
lines changed

Project.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,19 @@ authors = ["Tanmay Mohapatra <[email protected]>"]
44
keywords = ["kubernetes", "client"]
55
license = "MIT"
66
desc = "Julia Kubernetes Client"
7-
version = "0.5.0"
7+
version = "0.5.1"
88

99
[deps]
1010
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
1111
Downloads = "f43a241f-c20a-4ad4-852c-f6b1247861c6"
1212
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
1313
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
1414
Swagger = "2d69052b-6a58-5cd9-a030-a48559c324ac"
15-
Retry = "20febd7b-183b-5ae2-ac4a-720e7ce64774"
1615

1716
[compat]
1817
Downloads = "1"
1918
Swagger = "0.3"
2019
JSON = "0.21"
21-
Retry = "0.4"
2220
julia = "1"
2321

2422
[extras]

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,9 @@ end
7575

7676
A Kubernetes context can be manipulated with:
7777

78-
- `set_server`: Set the API server location
79-
- `set_ns`: Set the namespace to deal with
78+
- `set_server`: Set the API server location ("http://localhost:8001" if not set)
79+
- `set_ns`: Set the namespace to deal with (`default` namespace is not set)
80+
- `set_retries`: Set the number of times an API call should be retried on a retriable error (5 if not set)
8081

8182
Other convenience methods:
8283

src/Kuber.jl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ module Kuber
33
using JSON
44
using Swagger
55
using Downloads
6-
using Retry
76

87
include("api/Kubernetes.jl")
98
using .Kubernetes
@@ -17,7 +16,7 @@ include("apialiases.jl")
1716
include("helpers.jl")
1817
include("simpleapi.jl")
1918

20-
export KuberContext, set_server, set_ns, get_server, get_ns, kuber_type, kuber_obj, @K_str
19+
export KuberContext, set_server, set_ns, set_retries, get_server, get_ns, kuber_type, kuber_obj, @K_str
2120
export get, list, watch, put!, update!, delete!, sel, get_logs, list_namespaced_custom_metrics, list_custom_metrics
2221

2322
end # module

src/helpers.jl

Lines changed: 37 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,29 @@
11
const DEFAULT_NAMESPACE = "default"
22
const DEFAULT_URI = "http://localhost:8001"
33

4+
"""delay customized by TPS requirement"""
5+
k8s_delay(tps, max_tries=1) = ExponentialBackOff(n=max_tries, first_delay=(1/tps), factor=1.75, jitter=0.1)
6+
7+
"""
8+
Swagger status codes that can be retried.
9+
0: network error (HTTP was not even attempted)
10+
500-504: unexpected server error
11+
"""
12+
const k8s_retryable_codes = [0, 500, 501, 502, 503, 504]
13+
14+
function k8s_retry_cond(s, e, retryable_codes=k8s_retryable_codes)
15+
if (e isa Swagger.ApiException) && (e.status in retryable_codes)
16+
return (s, true)
17+
end
18+
(s, false)
19+
end
20+
21+
"""
22+
Retry api call automatically (if `max_tries > 1`) on certain retryable failures.
23+
Backoff to use when retrying k8s APIs. The default minimum is 2 TPS.
24+
"""
25+
k8s_retry(f; max_tries=1, tps=2) = retry(f, delays=k8s_delay(tps,max_tries), check=k8s_retry_cond)()
26+
427
const KuberEventStream = Channel{Any}
528

629
struct KApi
@@ -13,6 +36,7 @@ mutable struct KuberContext
1336
apis::Dict{Symbol,Vector{KApi}}
1437
modelapi::Dict{Symbol,KApi}
1538
namespace::String
39+
default_retries::Int
1640
initialized::Bool
1741

1842
function KuberContext()
@@ -26,6 +50,7 @@ mutable struct KuberContext
2650
kctx.apis = Dict{Symbol,Vector}()
2751
kctx.modelapi = Dict{Symbol,KApi}()
2852
kctx.namespace = DEFAULT_NAMESPACE
53+
kctx.default_retries = 5
2954
kctx.initialized = false
3055
return kctx
3156
end
@@ -36,6 +61,12 @@ struct KuberWatchContext
3661
stream::KuberEventStream
3762
end
3863

64+
function set_retries(ctx::KuberContext, n)
65+
ctx.default_retries = n
66+
end
67+
retries(ctx::KuberContext) = ctx.default_retries
68+
retries(watch::KuberWatchContext) = retries(watch.ctx)
69+
3970
convert(::Type{Vector{UInt8}}, s::T) where {T<:AbstractString} = collect(codeunits(s))
4071
convert(::Type{T}, json::String) where {T<:SwaggerModel} = convert(T, JSON.parse(json))
4172
convert(::Type{Dict{String,Any}}, model::T) where {T<:SwaggerModel} = JSON.parse(JSON.json(model))
@@ -87,7 +118,7 @@ show(io::IO, ctx::KuberContext) = print("Kubernetes namespace ", ctx.namespace,
87118
get_server(ctx::KuberContext) = ctx.client.root
88119
get_ns(ctx::KuberContext) = ctx.namespace
89120

90-
function set_server(ctx::KuberContext, uri::String=DEFAULT_URI, reset_api_versions::Bool=false; max_tries=1, kwargs...)
121+
function set_server(ctx::KuberContext, uri::String=DEFAULT_URI, reset_api_versions::Bool=false; max_tries=retries(ctx), kwargs...)
91122
rtfn = (default,data)->kuber_type(ctx, default, data)
92123
ctx.client = Swagger.Client(uri; get_return_type=rtfn, kwargs...)
93124
ctx.client.headers["Connection"] = "close"
@@ -139,15 +170,10 @@ function override_pref(name, server_pref, override)
139170
server_pref
140171
end
141172

142-
function fetch_misc_apis_versions(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=1)
173+
function fetch_misc_apis_versions(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx))
143174
apis = ctx.apis
144-
vers = @repeat max_tries try
175+
vers = k8s_retry(; max_tries=max_tries) do
145176
getAPIVersions(ApisApi(ctx.client))
146-
catch e
147-
@retry if isa(e, Base.IOError)
148-
@debug("Retrying getAPIVersions")
149-
sleep(2)
150-
end
151177
end
152178
api_groups = vers.groups
153179
for apigrp in api_groups
@@ -183,15 +209,10 @@ function fetch_misc_apis_versions(ctx::KuberContext; override=nothing, verbose::
183209
apis
184210
end
185211

186-
function fetch_core_version(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=1)
212+
function fetch_core_version(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx))
187213
apis = ctx.apis
188-
api_vers = @repeat max_tries try
214+
api_vers = k8s_retry(; max_tries=max_tries) do
189215
getCoreAPIVersions(CoreApi(ctx.client))
190-
catch e
191-
@retry if isa(e, Base.IOError)
192-
@debug("Retrying getCoreAPIVersions")
193-
sleep(2)
194-
end
195216
end
196217
name = "Core"
197218
pref_vers = override_pref(name, api_vers.versions[1], override)
@@ -236,7 +257,7 @@ function build_model_api_map(ctx::KuberContext)
236257
modelapi
237258
end
238259

239-
function set_api_versions!(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=1)
260+
function set_api_versions!(ctx::KuberContext; override=nothing, verbose::Bool=false, max_tries=retries(ctx))
240261
ctx.initialized = false
241262
empty!(ctx.apis)
242263
empty!(ctx.modelapi)
@@ -255,28 +276,3 @@ function set_api_versions!(ctx::KuberContext; override=nothing, verbose::Bool=fa
255276
ctx.initialized = true
256277
nothing
257278
end
258-
259-
"""
260-
Retry function `f` for `max_tries` number of times if function fails with `IOError`
261-
"""
262-
function retry_on_error(f::Function; max_tries=1)
263-
@repeat max_tries try
264-
return f()
265-
catch e
266-
@retry if isa(e, Base.IOError)
267-
@debug("Retrying Kubernetes API call ...")
268-
sleep(2)
269-
end
270-
end
271-
end
272-
273-
"""
274-
Macro to retry an expression on `IOError`. Note that the variable `max_tries` needs to be inscope for this to work.
275-
"""
276-
macro retry_on_error(e)
277-
esc(quote
278-
retry_on_error(;max_tries=(@isdefined max_tries) ? max_tries : 1) do
279-
$(e)
280-
end
281-
end)
282-
end

src/simpleapi.jl

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ _delopts(; kwargs...) = Typedefs.MetaV1.DeleteOptions(; preconditions=Typedefs.M
1212
_kubectx(ctx::KuberContext) = ctx
1313
_kubectx(ctx::KuberWatchContext) = ctx.ctx
1414

15-
function _get_apictx(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, apiversion::Union{String,Nothing}; max_tries::Int=1)
15+
function _get_apictx(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, apiversion::Union{String,Nothing}; max_tries::Int=retries(ctx))
1616
kubectx = _kubectx(ctx)
1717
kubectx.initialized || set_api_versions!(kubectx; max_tries=max_tries)
1818

@@ -50,7 +50,7 @@ end
5050
function list(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, name::String;
5151
apiversion::Union{String,Nothing}=nothing,
5252
namespace::Union{String,Nothing}=_kubectx(ctx).namespace,
53-
max_tries::Int=1,
53+
max_tries::Int=retries(ctx),
5454
watch=isa(ctx, KuberWatchContext),
5555
resourceVersion=nothing,
5656
kwargs...)
@@ -71,7 +71,9 @@ function list(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, name::Strin
7171
end
7272

7373
if !watch || resourceVersion === nothing
74-
result = @retry_on_error apicall(apictx, args...; kwargs...)
74+
result = k8s_retry(; max_tries=max_tries) do
75+
apicall(apictx, args...; kwargs...)
76+
end
7577
end
7678

7779
# if not watching, retuen the first result
@@ -83,13 +85,15 @@ function list(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, name::Strin
8385
end
8486

8587
# start watch and return the HTTP response object on completion
86-
return @retry_on_error apicall(apictx, eventstream, args...; watch=watch, resourceVersion=resourceVersion, kwargs...)
88+
return k8s_retry(; max_tries=max_tries) do
89+
apicall(apictx, eventstream, args...; watch=watch, resourceVersion=resourceVersion, kwargs...)
90+
end
8791
end
8892

8993
function list(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol;
9094
apiversion::Union{String,Nothing}=nothing,
9195
namespace::Union{String,Nothing}=_kubectx(ctx).namespace,
92-
max_tries::Int=1,
96+
max_tries::Int=retries(ctx),
9397
watch=isa(ctx, KuberWatchContext),
9498
resourceVersion=nothing,
9599
kwargs...)
@@ -111,7 +115,9 @@ function list(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol;
111115
end
112116

113117
if !watch || resourceVersion === nothing
114-
result = @retry_on_error apicall(apictx, args...; kwargs...)
118+
result = k8s_retry(; max_tries=max_tries) do
119+
apicall(apictx, args...; kwargs...)
120+
end
115121
end
116122

117123
# if not watching, retuen the first result
@@ -123,12 +129,14 @@ function list(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol;
123129
end
124130

125131
# start watch and return the HTTP response object on completion
126-
return @retry_on_error apicall(apictx, eventstream, args...; watch=watch, resourceVersion=resourceVersion, kwargs...)
132+
return k8s_retry(; max_tries=max_tries) do
133+
apicall(apictx, eventstream, args...; watch=watch, resourceVersion=resourceVersion, kwargs...)
134+
end
127135
end
128136

129137
function get(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, name::String;
130138
apiversion::Union{String,Nothing}=nothing,
131-
max_tries::Integer=1,
139+
max_tries::Integer=retries(ctx),
132140
watch=isa(ctx, KuberWatchContext),
133141
resourceVersion=nothing,
134142
kwargs...)
@@ -146,7 +154,9 @@ function get(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, name::String
146154
end
147155

148156
if !watch || resourceVersion === nothing
149-
result = @retry_on_error apicall(apictx, args...; kwargs...)
157+
result = k8s_retry(; max_tries=max_tries) do
158+
apicall(apictx, args...; kwargs...)
159+
end
150160
end
151161

152162
# if not watching, retuen the first result
@@ -158,14 +168,16 @@ function get(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol, name::String
158168
end
159169

160170
# start watch and return the HTTP response object on completion
161-
return @retry_on_error apicall(apictx, eventstream, args...; watch=watch, resourceVersion=resourceVersion, kwargs...)
171+
return k8s_retry(; max_tries=max_tries) do
172+
apicall(apictx, eventstream, args...; watch=watch, resourceVersion=resourceVersion, kwargs...)
173+
end
162174
end
163175

164176
function get(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol;
165177
apiversion::Union{String,Nothing}=nothing,
166178
label_selector=nothing,
167179
namespace::Union{String,Nothing}=_kubectx(ctx).namespace,
168-
max_tries::Integer=1,
180+
max_tries::Integer=retries(ctx),
169181
watch=isa(ctx, KuberWatchContext),
170182
resourceVersion=nothing,
171183
kwargs...)
@@ -185,7 +197,9 @@ function get(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol;
185197
end
186198

187199
if !watch || resourceVersion === nothing
188-
result = @retry_on_error apicall(apictx, args...; labelSelector=label_selector, kwargs...)
200+
result = k8s_retry(; max_tries=max_tries) do
201+
apicall(apictx, args...; labelSelector=label_selector, kwargs...)
202+
end
189203
end
190204

191205
# if not watching, retuen the first result
@@ -197,50 +211,62 @@ function get(ctx::Union{KuberContext,KuberWatchContext}, O::Symbol;
197211
end
198212

199213
# start watch and return the HTTP response object on completion
200-
return @retry_on_error apicall(apictx, eventstream, args...; watch=watch, resourceVersion=resourceVersion, labelSelector=label_selector, kwargs...)
214+
return k8s_retry(; max_tries=max_tries) do
215+
apicall(apictx, eventstream, args...; watch=watch, resourceVersion=resourceVersion, labelSelector=label_selector, kwargs...)
216+
end
201217
end
202218

203219
function watch(ctx::KuberContext, O::Symbol, outstream::Channel, name::String;
204220
apiversion::Union{String,Nothing}=nothing,
205221
namespace::Union{String,Nothing}=ctx.namespace,
206-
max_tries::Int=1,
207-
resourceVersion=nothing,
222+
max_tries::Int=retries(ctx),
208223
kwargs...)
209224
apictx = _get_apictx(ctx, O, apiversion; max_tries=max_tries)
210225
namespaced = (namespace !== nothing) && !isempty(namespace)
211226
allnamespaces = namespaced && (namespace == "*")
212227

213228
if allnamespaces
214229
apicall = eval(Symbol("watch$(O)ForAllNamespaces"))
215-
return @retry_on_error apicall(apictx, outstream, name; kwargs...)
230+
return k8s_retry(; max_tries=max_tries) do
231+
apicall(apictx, outstream, name; kwargs...)
232+
end
216233
elseif namespaced
217234
apicall = eval(Symbol("watchNamespaced$O"))
218-
return @retry_on_error apicall(apictx, outstream, name, namespace; kwargs...)
235+
return k8s_retry(; max_tries=max_tries) do
236+
apicall(apictx, outstream, name, namespace; kwargs...)
237+
end
219238
else
220239
apicall = eval(Symbol("watch$O"))
221-
return @retry_on_error apicall(apictx, outstream, name; kwargs...)
240+
return k8s_retry(; max_tries=max_tries) do
241+
apicall(apictx, outstream, name; kwargs...)
242+
end
222243
end
223244
end
224245

225246
function watch(ctx::KuberContext, O::Symbol, outstream::Channel;
226247
apiversion::Union{String,Nothing}=nothing,
227248
namespace::Union{String,Nothing}=ctx.namespace,
228-
max_tries::Int=1,
229-
resourceVersion=nothing,
249+
max_tries::Int=retries(ctx),
230250
kwargs...)
231251
apictx = _get_apictx(ctx, O, apiversion; max_tries=max_tries)
232252
namespaced = (namespace !== nothing) && !isempty(namespace)
233253
allnamespaces = namespaced && (namespace == "*")
234254

235255
if allnamespaces
236256
apicall = eval(Symbol("watch$(O)ForAllNamespaces"))
237-
return @retry_on_error apicall(apictx, outstream; kwargs...)
257+
return k8s_retry(; max_tries=max_tries) do
258+
apicall(apictx, outstream; kwargs...)
259+
end
238260
elseif namespaced
239261
apicall = eval(Symbol("watchNamespaced$O"))
240-
return @retry_on_error apicall(apictx, outstream, namespace; kwargs...)
262+
return k8s_retry(; max_tries=max_tries) do
263+
apicall(apictx, outstream, namespace; kwargs...)
264+
end
241265
else
242266
apicall = eval(Symbol("watch$O"))
243-
return @retry_on_error apicall(apictx, outstream; kwargs...)
267+
return k8s_retry(; max_tries=max_tries) do
268+
apicall(apictx, outstream; kwargs...)
269+
end
244270
end
245271
end
246272

test/runtests.jl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ function init_context(override=nothing, verbose=true)
1515
ctx = KuberContext()
1616
set_server(ctx, "http://localhost:8001")
1717
set_ns(ctx, "default")
18+
set_retries(ctx, 3)
1819
Kuber.set_api_versions!(ctx; override=override, verbose=verbose)
1920
ctx
2021
end
@@ -308,6 +309,4 @@ function test_all()
308309
end
309310
end
310311

311-
#if !parse(Bool, get(ENV, "CI", "false"))
312-
test_all()
313-
#end
312+
test_all()

0 commit comments

Comments
 (0)