@@ -6,16 +6,25 @@ import (
6
6
"errors"
7
7
"fmt"
8
8
"io"
9
+ "math"
9
10
"net/http"
10
11
"strconv"
12
+ "time"
11
13
12
14
rpcStatus "google.golang.org/genproto/googleapis/rpc/status"
13
15
"google.golang.org/protobuf/encoding/protojson"
14
- klog "k8s.io/klog/v2"
15
16
16
17
api "github.com/ray-project/kuberay/proto/go_client"
17
18
)
18
19
20
+ type RetryConfig struct {
21
+ MaxRetry int
22
+ BackoffFactor float64
23
+ InitBackoff time.Duration
24
+ MaxBackoff time.Duration
25
+ OverallTimeout time.Duration
26
+ }
27
+
19
28
type KuberayAPIServerClient struct {
20
29
httpClient * http.Client
21
30
marshaler * protojson.MarshalOptions
@@ -27,6 +36,7 @@ type KuberayAPIServerClient struct {
27
36
// Store http request handling function for unit test purpose.
28
37
executeHttpRequest func (httpRequest * http.Request , URL string ) ([]byte , * rpcStatus.Status , error )
29
38
baseURL string
39
+ retryCfg RetryConfig
30
40
}
31
41
32
42
type KuberayAPIServerClientError struct {
@@ -47,7 +57,7 @@ func IsNotFoundError(err error) bool {
47
57
return false
48
58
}
49
59
50
- func NewKuberayAPIServerClient (baseURL string , httpClient * http.Client ) * KuberayAPIServerClient {
60
+ func NewKuberayAPIServerClient (baseURL string , httpClient * http.Client , retryCfg RetryConfig ) * KuberayAPIServerClient {
51
61
client := & KuberayAPIServerClient {
52
62
httpClient : httpClient ,
53
63
baseURL : baseURL ,
@@ -65,6 +75,7 @@ func NewKuberayAPIServerClient(baseURL string, httpClient *http.Client) *Kuberay
65
75
DiscardUnknown : false ,
66
76
Resolver : nil ,
67
77
},
78
+ retryCfg : retryCfg ,
68
79
}
69
80
client .executeHttpRequest = client .executeRequest
70
81
return client
@@ -637,30 +648,103 @@ func (krc *KuberayAPIServerClient) doDelete(deleteURL string) (*rpcStatus.Status
637
648
return status , err
638
649
}
639
650
651
+ var retryableHTTPStatusCodes = map [int ]struct {}{
652
+ http .StatusRequestTimeout : {}, // 408
653
+ http .StatusTooManyRequests : {}, // 429
654
+ http .StatusInternalServerError : {}, // 500
655
+ http .StatusBadGateway : {}, // 502
656
+ http .StatusServiceUnavailable : {}, // 503
657
+ http .StatusGatewayTimeout : {}, // 504
658
+ }
659
+
640
660
func (krc * KuberayAPIServerClient ) executeRequest (httpRequest * http.Request , URL string ) ([]byte , * rpcStatus.Status , error ) {
641
- response , err := krc .httpClient .Do (httpRequest )
642
- if err != nil {
643
- return nil , nil , fmt .Errorf ("failed to execute http request for url '%s': %w" , URL , err )
661
+ // Set the overall timeout
662
+ ctx , cancel := context .WithTimeout (context .Background (), krc .retryCfg .OverallTimeout )
663
+ defer cancel ()
664
+
665
+ httpRequest = httpRequest .WithContext (ctx )
666
+
667
+ // Record the last error and status got
668
+ var lastErr error
669
+ var lastStatus * rpcStatus.Status
670
+
671
+ var requestBodyBytes []byte
672
+ var err error
673
+ if httpRequest .Body != nil {
674
+ requestBodyBytes , err = io .ReadAll (httpRequest .Body )
675
+ if err != nil {
676
+ return nil , nil , fmt .Errorf ("failed to read request body: %w" , err )
677
+ }
644
678
}
645
- defer func () {
646
- if closeErr := response .Body .Close (); closeErr != nil {
647
- klog .Errorf ("Failed to close http response body because %+v" , closeErr )
679
+
680
+ doReq := func () ([]byte , int , error ) {
681
+ // new ReadCloser for httpRequest body
682
+ if requestBodyBytes != nil {
683
+ httpRequest .Body = io .NopCloser (bytes .NewBuffer (requestBodyBytes ))
648
684
}
649
- }()
650
- bodyBytes , err := io .ReadAll (response .Body )
651
- if err != nil {
652
- return nil , nil , fmt .Errorf ("failed to read response body bytes: %w" , err )
685
+
686
+ response , err := krc .httpClient .Do (httpRequest )
687
+ if err != nil {
688
+ return nil , 0 , fmt .Errorf ("failed to execute http request for url '%s': %w" , URL , err )
689
+ }
690
+ defer response .Body .Close ()
691
+
692
+ bodyBytes , err := io .ReadAll (response .Body )
693
+ // Error in reading response body, treated as non-retryable error
694
+ if err != nil {
695
+ return nil , 0 , fmt .Errorf ("failed to read response body bytes: %w" , err )
696
+ }
697
+
698
+ return bodyBytes , response .StatusCode , nil
653
699
}
654
- if response .StatusCode != http .StatusOK {
700
+
701
+ // Only retry for HTTP status codes defined as retryable in isRetryableHTTPStatus().
702
+ for attempt := 0 ; attempt <= krc .retryCfg .MaxRetry ; attempt ++ {
703
+ bodyBytes , statusCode , err := doReq ()
704
+ // Error in sending the request, treated as non-retryable error
705
+ if err != nil {
706
+ lastStatus = nil
707
+ lastErr = err
708
+ break
709
+ }
710
+
711
+ if statusCode == http .StatusOK {
712
+ return bodyBytes , nil , nil
713
+ }
714
+
655
715
status , err := krc .extractStatus (bodyBytes )
716
+ // Error in extracting status from response body, treated as non-retryable error
656
717
if err != nil {
657
- return nil , nil , err
718
+ lastStatus = nil
719
+ lastErr = err
720
+ break
658
721
}
659
- return nil , status , & KuberayAPIServerClientError {
660
- HTTPStatusCode : response .StatusCode ,
722
+
723
+ lastStatus = status
724
+ lastErr = & KuberayAPIServerClientError {
725
+ HTTPStatusCode : statusCode ,
726
+ }
727
+
728
+ // Retry only for HTTP status in the list
729
+ if _ , retryable := retryableHTTPStatusCodes [statusCode ]; ! retryable {
730
+ break
731
+ }
732
+
733
+ // Backoff before retry
734
+ sleep := krc .retryCfg .InitBackoff * time .Duration (math .Pow (krc .retryCfg .BackoffFactor , float64 (attempt )))
735
+ if sleep > krc .retryCfg .MaxBackoff {
736
+ sleep = krc .retryCfg .MaxBackoff
737
+ }
738
+
739
+ select {
740
+ case <- time .After (sleep ):
741
+ // continue to the next retry after backoff
742
+ case <- ctx .Done ():
743
+ return nil , lastStatus , fmt .Errorf ("overall timeout reached: %w" , ctx .Err ())
661
744
}
745
+
662
746
}
663
- return bodyBytes , nil , nil
747
+ return nil , lastStatus , lastErr
664
748
}
665
749
666
750
func (krc * KuberayAPIServerClient ) extractStatus (bodyBytes []byte ) (* rpcStatus.Status , error ) {
0 commit comments