Skip to content

Commit 4c46d56

Browse files
kenchung285dentinyrueian
authored
[apiserver] Add retry and timeout to apiserver V2 (#3869)
* feat: add retry and timeout to apiserver V2 Signed-off-by: Cheng-Yeh Chung <[email protected]> * feat: add logs to failure retries Signed-off-by: Cheng-Yeh Chung <[email protected]> * ensure timeout duration and retryable status codes consistent with apiserver V1 Signed-off-by: Cheng-Yeh Chung <[email protected]> * refactor comments Signed-off-by: Cheng-Yeh Chung <[email protected]> * merge body preserving into retryRoundTripper Signed-off-by: Cheng-Yeh Chung <[email protected]> * add todo comment for v1 v2 compatibility Signed-off-by: Cheng-Yeh Chung <[email protected]> * feat: drain response every time before retry Signed-off-by: Cheng-Yeh Chung <[email protected]> * add TODO comment for merging common utils in v1 and v2 Signed-off-by: Cheng-Yeh Chung <[email protected]> * test: add mock test for retry round tripper Signed-off-by: Cheng-Yeh Chung <[email protected]> * Add comment for retryRoundTripper mock unit test Co-authored-by: dentiny <[email protected]> Signed-off-by: Cheng-Yeh Chung <[email protected]> * feat: improve error log Signed-off-by: Cheng-Yeh Chung <[email protected]> * test: add more test for retryRoundTripper && fix minor errors Signed-off-by: Cheng-Yeh Chung <[email protected]> * minor: format Signed-off-by: Cheng-Yeh Chung <[email protected]> * improve error message Signed-off-by: Cheng-Yeh Chung <[email protected]> * renaming function Signed-off-by: Cheng-Yeh Chung <[email protected]> * add todo comment for http util funtions Signed-off-by: Cheng-Yeh Chung <[email protected]> * fix: retry times error && add comments for clarity Signed-off-by: Cheng-Yeh Chung <[email protected]> * return timeout error for remaining time less than the backoff sleep duration Signed-off-by: Cheng-Yeh Chung <[email protected]> * refactor && use select to prevent blocking if the request context being cancelled during backoff Signed-off-by: Cheng-Yeh Chung <[email protected]> * refactor: avoid using req.GetBody in reusing request body Signed-off-by: Cheng-Yeh Chung <[email protected]> * update Co-authored-by: Rueian <[email protected]> Signed-off-by: Cheng-Yeh Chung <[email protected]> * Update apiserversdk/proxy.go Co-authored-by: Rueian <[email protected]> Signed-off-by: Cheng-Yeh Chung <[email protected]> * test: add a test for request with body Signed-off-by: Cheng-Yeh Chung <[email protected]> --------- Signed-off-by: Cheng-Yeh Chung <[email protected]> Co-authored-by: dentiny <[email protected]> Co-authored-by: Rueian <[email protected]>
1 parent ef9206c commit 4c46d56

File tree

3 files changed

+290
-3
lines changed

3 files changed

+290
-3
lines changed

apiserversdk/config.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package apiserversdk
2+
3+
import "time"
4+
5+
// TODO: Make apiserver configs compatible with V1
6+
const (
7+
// Max retry times for HTTP Client
8+
HTTPClientDefaultMaxRetry = 3
9+
10+
// Retry backoff settings
11+
HTTPClientDefaultBackoffBase = float64(2)
12+
HTTPClientDefaultInitBackoff = 500 * time.Millisecond
13+
HTTPClientDefaultMaxBackoff = 10 * time.Second
14+
15+
// Overall timeout for retries
16+
HTTPClientDefaultOverallTimeout = 30 * time.Second
17+
)

apiserversdk/proxy.go

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package apiserversdk
22

33
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
"math"
48
"net/http"
59
"net/http/httputil"
610
"net/url"
711
"strings"
12+
"time"
813

914
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1015
"k8s.io/apimachinery/pkg/util/net"
@@ -22,12 +27,14 @@ type MuxConfig struct {
2227
func NewMux(config MuxConfig) (*http.ServeMux, error) {
2328
u, err := url.Parse(config.KubernetesConfig.Host) // parse the K8s API server URL from the KubernetesConfig.
2429
if err != nil {
25-
return nil, err
30+
return nil, fmt.Errorf("failed to parse url %s from config: %w", config.KubernetesConfig.Host, err)
2631
}
2732
proxy := httputil.NewSingleHostReverseProxy(u)
28-
if proxy.Transport, err = rest.TransportFor(config.KubernetesConfig); err != nil { // rest.TransportFor provides the auth to the K8s API server.
29-
return nil, err
33+
baseTransport, err := rest.TransportFor(config.KubernetesConfig) // rest.TransportFor provides the auth to the K8s API server.
34+
if err != nil {
35+
return nil, fmt.Errorf("failed to get transport for config: %w", err)
3036
}
37+
proxy.Transport = newRetryRoundTripper(baseTransport)
3138
var handler http.Handler = proxy
3239
if config.Middleware != nil {
3340
handler = config.Middleware(proxy)
@@ -84,3 +91,112 @@ func requireKubeRayService(handler http.Handler, k8sClient *kubernetes.Clientset
8491
handler.ServeHTTP(w, r)
8592
})
8693
}
94+
95+
// retryRoundTripper is a custom implementation of http.RoundTripper that retries HTTP requests.
96+
// It verifies retryable HTTP status codes and retries using exponential backoff.
97+
type retryRoundTripper struct {
98+
base http.RoundTripper
99+
100+
// Num of retries after the initial attempt
101+
maxRetries int
102+
}
103+
104+
func newRetryRoundTripper(base http.RoundTripper) http.RoundTripper {
105+
return &retryRoundTripper{base: base, maxRetries: HTTPClientDefaultMaxRetry}
106+
}
107+
108+
func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
109+
ctx := req.Context()
110+
111+
var bodyBytes []byte
112+
var resp *http.Response
113+
var err error
114+
115+
if req.Body != nil {
116+
/* Reuse request body in each attempt */
117+
bodyBytes, err = io.ReadAll(req.Body)
118+
if err != nil {
119+
return nil, fmt.Errorf("failed to read request body for retry support: %w", err)
120+
}
121+
err = req.Body.Close()
122+
if err != nil {
123+
return nil, fmt.Errorf("failed to close request body: %w", err)
124+
}
125+
}
126+
127+
for attempt := 0; attempt <= rrt.maxRetries; attempt++ {
128+
/* Try up to (rrt.maxRetries + 1) times: initial attempt + retries */
129+
130+
if bodyBytes != nil {
131+
req.Body = io.NopCloser(bytes.NewReader(bodyBytes))
132+
}
133+
134+
resp, err = rrt.base.RoundTrip(req)
135+
if err != nil {
136+
return resp, fmt.Errorf("request to %s %s failed with error: %w", req.Method, req.URL.String(), err)
137+
}
138+
139+
if isSuccessfulStatusCode(resp.StatusCode) {
140+
return resp, nil
141+
}
142+
143+
if !isRetryableHTTPStatusCodes(resp.StatusCode) {
144+
return resp, nil
145+
}
146+
147+
if attempt == rrt.maxRetries {
148+
return resp, nil
149+
}
150+
151+
if resp.Body != nil {
152+
/* If not last attempt, drain response body */
153+
if _, err = io.Copy(io.Discard, resp.Body); err != nil {
154+
return nil, fmt.Errorf("retryRoundTripper internal failure to drain response body: %w", err)
155+
}
156+
if err = resp.Body.Close(); err != nil {
157+
return nil, fmt.Errorf("retryRoundTripper internal failure to close response body: %w", err)
158+
}
159+
}
160+
161+
// TODO: move to HTTP util function in independent util file
162+
sleepDuration := HTTPClientDefaultInitBackoff * time.Duration(math.Pow(HTTPClientDefaultBackoffBase, float64(attempt)))
163+
if sleepDuration > HTTPClientDefaultMaxBackoff {
164+
sleepDuration = HTTPClientDefaultMaxBackoff
165+
}
166+
167+
// TODO: merge common utils for apiserver v1 and v2
168+
if deadline, ok := ctx.Deadline(); ok {
169+
remaining := time.Until(deadline)
170+
if sleepDuration > remaining {
171+
return resp, fmt.Errorf("retry timeout exceeded context deadline")
172+
}
173+
}
174+
175+
select {
176+
case <-time.After(sleepDuration):
177+
case <-ctx.Done():
178+
return resp, fmt.Errorf("retry canceled during backoff: %w", ctx.Err())
179+
}
180+
}
181+
return resp, err
182+
}
183+
184+
// TODO: move HTTP util function into independent util file / folder
185+
func isSuccessfulStatusCode(statusCode int) bool {
186+
return 200 <= statusCode && statusCode < 300
187+
}
188+
189+
// TODO: merge common utils for apiserver v1 and v2
190+
func isRetryableHTTPStatusCodes(statusCode int) bool {
191+
switch statusCode {
192+
case http.StatusRequestTimeout, // 408
193+
http.StatusTooManyRequests, // 429
194+
http.StatusInternalServerError, // 500
195+
http.StatusBadGateway, // 502
196+
http.StatusServiceUnavailable, // 503
197+
http.StatusGatewayTimeout: // 504
198+
return true
199+
default:
200+
return false
201+
}
202+
}

apiserversdk/proxy_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package apiserversdk
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
7+
"io"
68
"net"
79
"net/http"
810
"path/filepath"
11+
"strings"
912
"sync/atomic"
1013
"testing"
1114
"time"
@@ -325,3 +328,154 @@ var _ = Describe("kuberay service", Ordered, func() {
325328
})
326329
})
327330
})
331+
332+
var _ = Describe("retryRoundTripper", func() {
333+
It("should not retry on successful status OK", func() {
334+
var attempts int32
335+
mock := &mockRoundTripper{
336+
fn: func(_ *http.Request) (*http.Response, error) {
337+
atomic.AddInt32(&attempts, 1)
338+
return &http.Response{ /* Always return OK status */
339+
StatusCode: http.StatusOK,
340+
Body: io.NopCloser(strings.NewReader("OK")),
341+
}, nil
342+
},
343+
}
344+
retrier := newRetryRoundTripper(mock)
345+
req, err := http.NewRequest(http.MethodGet, "http://test", nil)
346+
Expect(err).ToNot(HaveOccurred())
347+
resp, err := retrier.RoundTrip(req)
348+
Expect(err).ToNot(HaveOccurred())
349+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
350+
Expect(attempts).To(Equal(int32(1)))
351+
})
352+
353+
It("should retry failed requests and eventually succeed", func() {
354+
const maxFailure = 2
355+
var attempts int32
356+
mock := &mockRoundTripper{
357+
fn: func(_ *http.Request) (*http.Response, error) {
358+
count := atomic.AddInt32(&attempts, 1)
359+
if count <= maxFailure {
360+
return &http.Response{
361+
StatusCode: http.StatusInternalServerError,
362+
Body: io.NopCloser(strings.NewReader("internal error")),
363+
}, nil
364+
}
365+
return &http.Response{
366+
StatusCode: http.StatusOK,
367+
Body: io.NopCloser(strings.NewReader("ok")),
368+
}, nil
369+
},
370+
}
371+
retrier := newRetryRoundTripper(mock)
372+
req, err := http.NewRequest(http.MethodGet, "http://test", nil)
373+
Expect(err).ToNot(HaveOccurred())
374+
resp, err := retrier.RoundTrip(req)
375+
Expect(err).ToNot(HaveOccurred())
376+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
377+
Expect(attempts).To(Equal(int32(maxFailure + 1)))
378+
})
379+
380+
It("Retries exceed maximum retry counts", func() {
381+
var attempts int32
382+
mock := &mockRoundTripper{
383+
fn: func(_ *http.Request) (*http.Response, error) {
384+
atomic.AddInt32(&attempts, 1)
385+
return &http.Response{ /* Always return retriable status */
386+
StatusCode: http.StatusInternalServerError,
387+
Body: io.NopCloser(strings.NewReader("internal error")),
388+
}, nil
389+
},
390+
}
391+
retrier := newRetryRoundTripper(mock)
392+
req, err := http.NewRequest(http.MethodGet, "http://test", nil)
393+
Expect(err).ToNot(HaveOccurred())
394+
resp, err := retrier.RoundTrip(req)
395+
Expect(err).ToNot(HaveOccurred())
396+
Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError))
397+
Expect(attempts).To(Equal(int32(HTTPClientDefaultMaxRetry + 1)))
398+
})
399+
400+
It("Retries on request with body", func() {
401+
const testBody = "test-body"
402+
const maxFailure = 2
403+
var attempts int32
404+
mock := &mockRoundTripper{
405+
fn: func(req *http.Request) (*http.Response, error) {
406+
count := atomic.AddInt32(&attempts, 1)
407+
reqBody, err := io.ReadAll(req.Body)
408+
Expect(err).ToNot(HaveOccurred())
409+
Expect(string(reqBody)).To(Equal(testBody))
410+
411+
if count <= maxFailure {
412+
return &http.Response{
413+
StatusCode: http.StatusInternalServerError,
414+
Body: io.NopCloser(strings.NewReader("internal error")),
415+
}, nil
416+
}
417+
return &http.Response{
418+
StatusCode: http.StatusOK,
419+
Body: io.NopCloser(strings.NewReader("ok")),
420+
}, nil
421+
},
422+
}
423+
retrier := newRetryRoundTripper(mock)
424+
body := bytes.NewBufferString(testBody)
425+
req, err := http.NewRequest(http.MethodPost, "http://test", body)
426+
Expect(err).ToNot(HaveOccurred())
427+
resp, err := retrier.RoundTrip(req)
428+
Expect(err).ToNot(HaveOccurred())
429+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
430+
Expect(attempts).To(Equal(int32(maxFailure + 1)))
431+
})
432+
433+
It("should not retry on non-retriable status", func() {
434+
var attempts int32
435+
mock := &mockRoundTripper{
436+
fn: func(_ *http.Request) (*http.Response, error) {
437+
atomic.AddInt32(&attempts, 1)
438+
return &http.Response{ /* Always return non-retriable status */
439+
StatusCode: http.StatusNotFound,
440+
Body: io.NopCloser(strings.NewReader("Not Found")),
441+
}, nil
442+
},
443+
}
444+
retrier := newRetryRoundTripper(mock)
445+
req, err := http.NewRequest(http.MethodGet, "http://test", nil)
446+
Expect(err).ToNot(HaveOccurred())
447+
resp, err := retrier.RoundTrip(req)
448+
Expect(err).ToNot(HaveOccurred())
449+
Expect(resp.StatusCode).To(Equal(http.StatusNotFound))
450+
Expect(attempts).To(Equal(int32(1)))
451+
})
452+
453+
It("should respect context timeout and stop retrying", func() {
454+
mock := &mockRoundTripper{
455+
fn: func(_ *http.Request) (*http.Response, error) {
456+
time.Sleep(100 * time.Millisecond)
457+
return &http.Response{
458+
StatusCode: http.StatusInternalServerError,
459+
Body: io.NopCloser(strings.NewReader("internal error")),
460+
}, nil
461+
},
462+
}
463+
retrier := newRetryRoundTripper(mock)
464+
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
465+
defer cancel()
466+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://test", nil)
467+
Expect(err).ToNot(HaveOccurred())
468+
resp, err := retrier.RoundTrip(req)
469+
Expect(err).To(HaveOccurred())
470+
Expect(err.Error()).To(ContainSubstring("retry timeout exceeded context deadline"))
471+
Expect(resp).ToNot(BeNil())
472+
})
473+
})
474+
475+
type mockRoundTripper struct {
476+
fn func(*http.Request) (*http.Response, error)
477+
}
478+
479+
func (m *mockRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
480+
return m.fn(req)
481+
}

0 commit comments

Comments
 (0)