Skip to content

Commit 775b111

Browse files
kenchung285rueian
authored andcommitted
[apiserver]: merge http utils of apiserver v1/v2 (ray-project#3924)
* feat: merge http utils of apiserver v1/v2 Signed-off-by: Cheng-Yeh Chung <[email protected]> * fix naming error in package alias Co-authored-by: Rueian <[email protected]> Signed-off-by: Cheng-Yeh Chung <[email protected]> * fix naming error Signed-off-by: Cheng-Yeh Chung <[email protected]> * refactor function naming Signed-off-by: Cheng-Yeh Chung <[email protected]> * remove TODO comments solved by this PR Signed-off-by: Cheng-Yeh Chung <[email protected]> * merge retry config to RetryRoundTripper. fix v1 merge error Signed-off-by: Cheng-Yeh Chung <[email protected]> --------- Signed-off-by: Cheng-Yeh Chung <[email protected]> Co-authored-by: Rueian <[email protected]>
1 parent 8e60f05 commit 775b111

File tree

8 files changed

+65
-66
lines changed

8 files changed

+65
-66
lines changed

apiserver/pkg/http/client.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ import (
66
"errors"
77
"fmt"
88
"io"
9-
"math"
109
"net/http"
1110
"strconv"
1211
"time"
1312

1413
rpcStatus "google.golang.org/genproto/googleapis/rpc/status"
1514
"google.golang.org/protobuf/encoding/protojson"
1615

16+
apiserverutil "github.com/ray-project/kuberay/apiserversdk/util"
1717
api "github.com/ray-project/kuberay/proto/go_client"
1818
)
1919

@@ -653,15 +653,6 @@ func (krc *KuberayAPIServerClient) doDelete(deleteURL string) (*rpcStatus.Status
653653
return status, err
654654
}
655655

656-
var retryableHTTPStatusCodes = map[int]struct{}{
657-
http.StatusRequestTimeout: {}, // 408
658-
http.StatusTooManyRequests: {}, // 429
659-
http.StatusInternalServerError: {}, // 500
660-
http.StatusBadGateway: {}, // 502
661-
http.StatusServiceUnavailable: {}, // 503
662-
http.StatusGatewayTimeout: {}, // 504
663-
}
664-
665656
func (krc *KuberayAPIServerClient) executeRequest(httpRequest *http.Request, URL string) ([]byte, *rpcStatus.Status, error) {
666657
// Set the overall timeout
667658
ctx, cancel := context.WithTimeout(context.Background(), krc.retryCfg.OverallTimeout)
@@ -713,7 +704,7 @@ func (krc *KuberayAPIServerClient) executeRequest(httpRequest *http.Request, URL
713704
break
714705
}
715706

716-
if statusCode == http.StatusOK {
707+
if apiserverutil.IsSuccessfulStatusCode(statusCode) {
717708
return bodyBytes, nil, nil
718709
}
719710

@@ -730,16 +721,15 @@ func (krc *KuberayAPIServerClient) executeRequest(httpRequest *http.Request, URL
730721
HTTPStatusCode: statusCode,
731722
}
732723

733-
// Retry only for HTTP status in the list
734-
if _, retryable := retryableHTTPStatusCodes[statusCode]; !retryable {
724+
if !apiserverutil.IsRetryableHTTPStatusCodes(statusCode) {
735725
break
736726
}
737727

738728
// Backoff before retry
739-
sleep := krc.retryCfg.InitBackoff * time.Duration(math.Pow(krc.retryCfg.BackoffFactor, float64(attempt)))
740-
if sleep > krc.retryCfg.MaxBackoff {
741-
sleep = krc.retryCfg.MaxBackoff
742-
}
729+
sleep := apiserverutil.GetRetryBackoff(attempt,
730+
krc.retryCfg.InitBackoff,
731+
krc.retryCfg.BackoffFactor,
732+
krc.retryCfg.MaxBackoff)
743733

744734
select {
745735
case <-time.After(sleep):

apiserver/pkg/http/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/stretchr/testify/require"
1414
rpcStatus "google.golang.org/genproto/googleapis/rpc/status"
1515

16-
"github.com/ray-project/kuberay/apiserver/pkg/util"
16+
"github.com/ray-project/kuberay/apiserversdk/util"
1717
api "github.com/ray-project/kuberay/proto/go_client"
1818
)
1919

apiserver/pkg/util/config.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,6 @@ const (
2525
RayClusterImageAnnotationKey = "ray.io/compute-image"
2626

2727
RayClusterDefaultImageRepository = "rayproject/ray"
28-
29-
// Max retry times for HTTP Client
30-
HTTPClientDefaultMaxRetry = 3
31-
32-
// Retry backoff settings
33-
HTTPClientDefaultBackoffBase = float64(2)
34-
HTTPClientDefaultInitBackoff = 500 * time.Millisecond
35-
HTTPClientDefaultMaxBackoff = 10 * time.Second
36-
37-
// Overall timeout for retries
38-
HTTPClientDefaultOverallTimeout = 30 * time.Second
3928
)
4029

4130
const (

apiserver/test/e2e/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"sigs.k8s.io/controller-runtime/pkg/client/config"
2424

2525
kuberayHTTP "github.com/ray-project/kuberay/apiserver/pkg/http"
26-
"github.com/ray-project/kuberay/apiserver/pkg/util"
26+
"github.com/ray-project/kuberay/apiserversdk/util"
2727
api "github.com/ray-project/kuberay/proto/go_client"
2828
rayv1api "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
2929
rayv1 "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/typed/ray/v1"

apiserversdk/proxy.go

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bytes"
55
"fmt"
66
"io"
7-
"math"
87
"net/http"
98
"net/http/httputil"
109
"net/url"
@@ -16,7 +15,8 @@ import (
1615
"k8s.io/client-go/kubernetes"
1716
"k8s.io/client-go/rest"
1817

19-
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
18+
apiserverutil "github.com/ray-project/kuberay/apiserversdk/util"
19+
rayutil "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
2020
)
2121

2222
type MuxConfig struct {
@@ -78,7 +78,7 @@ func requireKubeRayService(handler http.Handler, k8sClient *kubernetes.Clientset
7878
}
7979
services, err := k8sClient.CoreV1().Services(namespace).List(r.Context(), metav1.ListOptions{
8080
FieldSelector: "metadata.name=" + serviceName,
81-
LabelSelector: "app.kubernetes.io/name=" + utils.ApplicationName,
81+
LabelSelector: "app.kubernetes.io/name=" + rayutil.ApplicationName,
8282
})
8383
if err != nil {
8484
http.Error(w, "failed to list kuberay services", http.StatusInternalServerError)
@@ -99,10 +99,21 @@ type retryRoundTripper struct {
9999

100100
// Num of retries after the initial attempt
101101
maxRetries int
102+
103+
// Retry backoff settings
104+
initBackoff time.Duration
105+
backoffBase float64
106+
maxBackoff time.Duration
102107
}
103108

104109
func newRetryRoundTripper(base http.RoundTripper) http.RoundTripper {
105-
return &retryRoundTripper{base: base, maxRetries: HTTPClientDefaultMaxRetry}
110+
return &retryRoundTripper{
111+
base: base,
112+
maxRetries: apiserverutil.HTTPClientDefaultMaxRetry,
113+
initBackoff: apiserverutil.HTTPClientDefaultInitBackoff,
114+
backoffBase: apiserverutil.HTTPClientDefaultBackoffBase,
115+
maxBackoff: apiserverutil.HTTPClientDefaultMaxBackoff,
116+
}
106117
}
107118

108119
func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
@@ -136,11 +147,11 @@ func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, erro
136147
return resp, fmt.Errorf("request to %s %s failed with error: %w", req.Method, req.URL.String(), err)
137148
}
138149

139-
if isSuccessfulStatusCode(resp.StatusCode) {
150+
if apiserverutil.IsSuccessfulStatusCode(resp.StatusCode) {
140151
return resp, nil
141152
}
142153

143-
if !isRetryableHTTPStatusCodes(resp.StatusCode) {
154+
if !apiserverutil.IsRetryableHTTPStatusCodes(resp.StatusCode) {
144155
return resp, nil
145156
}
146157

@@ -158,11 +169,7 @@ func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, erro
158169
}
159170
}
160171

161-
// TODO: move to HTTP util function in independent util file
162-
sleepDuration := HTTPClientDefaultInitBackoff * time.Duration(math.Pow(HTTPClientDefaultBackoffBase, float64(attempt)))
163-
if sleepDuration > HTTPClientDefaultMaxBackoff {
164-
sleepDuration = HTTPClientDefaultMaxBackoff
165-
}
172+
sleepDuration := apiserverutil.GetRetryBackoff(attempt, rrt.initBackoff, rrt.backoffBase, rrt.maxBackoff)
166173

167174
// TODO: merge common utils for apiserver v1 and v2
168175
if deadline, ok := ctx.Deadline(); ok {
@@ -180,23 +187,3 @@ func (rrt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, erro
180187
}
181188
return resp, err
182189
}
183-
184-
// TODO: move HTTP util function into independent util file / folder
185-
func isSuccessfulStatusCode(statusCode int) bool {
186-
return 200 <= statusCode && statusCode < 300
187-
}
188-
189-
// TODO: merge common utils for apiserver v1 and v2
190-
func isRetryableHTTPStatusCodes(statusCode int) bool {
191-
switch statusCode {
192-
case http.StatusRequestTimeout, // 408
193-
http.StatusTooManyRequests, // 429
194-
http.StatusInternalServerError, // 500
195-
http.StatusBadGateway, // 502
196-
http.StatusServiceUnavailable, // 503
197-
http.StatusGatewayTimeout: // 504
198-
return true
199-
default:
200-
return false
201-
}
202-
}

apiserversdk/proxy_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import (
2323
"k8s.io/client-go/rest"
2424
"sigs.k8s.io/controller-runtime/pkg/envtest"
2525

26+
apiserverutil "github.com/ray-project/kuberay/apiserversdk/util"
2627
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
27-
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
28+
rayutil "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
2829
rayclient "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/typed/ray/v1"
2930
)
3031

@@ -319,7 +320,7 @@ var _ = Describe("kuberay service", Ordered, func() {
319320
ObjectMeta: metav1.ObjectMeta{
320321
Name: svcName,
321322
Labels: map[string]string{
322-
utils.KubernetesApplicationNameLabelKey: utils.ApplicationName,
323+
rayutil.KubernetesApplicationNameLabelKey: rayutil.ApplicationName,
323324
},
324325
},
325326
Spec: corev1.ServiceSpec{
@@ -454,7 +455,7 @@ var _ = Describe("retryRoundTripper", func() {
454455
resp, err := retrier.RoundTrip(req)
455456
Expect(err).ToNot(HaveOccurred())
456457
Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError))
457-
Expect(attempts).To(Equal(int32(HTTPClientDefaultMaxRetry + 1)))
458+
Expect(attempts).To(Equal(int32(apiserverutil.HTTPClientDefaultMaxRetry + 1)))
458459
})
459460

460461
It("Retries on request with body", func() {

apiserversdk/config.go renamed to apiserversdk/util/config.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
package apiserversdk
1+
package util
22

33
import "time"
44

5-
// TODO: Make apiserver configs compatible with V1
65
const (
76
// Max retry times for HTTP Client
87
HTTPClientDefaultMaxRetry = 3

apiserversdk/util/http.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package util
2+
3+
import (
4+
"math"
5+
"net/http"
6+
"time"
7+
)
8+
9+
func GetRetryBackoff(attempt int, initBackoff time.Duration, backoffBase float64, maxBackoff time.Duration) time.Duration {
10+
sleepDuration := initBackoff * time.Duration(math.Pow(backoffBase, float64(attempt)))
11+
if sleepDuration > maxBackoff {
12+
sleepDuration = maxBackoff
13+
}
14+
return sleepDuration
15+
}
16+
17+
func IsSuccessfulStatusCode(statusCode int) bool {
18+
return 200 <= statusCode && statusCode < 300
19+
}
20+
21+
func IsRetryableHTTPStatusCodes(statusCode int) bool {
22+
switch statusCode {
23+
case http.StatusRequestTimeout, // 408
24+
http.StatusTooManyRequests, // 429
25+
http.StatusInternalServerError, // 500
26+
http.StatusBadGateway, // 502
27+
http.StatusServiceUnavailable, // 503
28+
http.StatusGatewayTimeout: // 504
29+
return true
30+
default:
31+
return false
32+
}
33+
}

0 commit comments

Comments
 (0)