Skip to content

Commit 4121fb5

Browse files
committed
Use RetryClient in lakectl login
Use the same RetryClient type as the rest of lakeFS, only with a different retry policy - one that retries status code 404. This involves refactoring getClient... so do that.
1 parent 736e300 commit 4121fb5

File tree

8 files changed

+72
-61
lines changed

8 files changed

+72
-61
lines changed

cmd/lakectl/cmd/fs_upload.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ var fsUploadCmd = &cobra.Command{
5858
}
5959
}()
6060

61-
s := local.NewSyncManager(ctx, client, getHTTPClient(), local.Config{
61+
s := local.NewSyncManager(ctx, client, getHTTPClient(lakectlRetryPolicy), local.Config{
6262
SyncFlags: syncFlags,
6363
SkipNonRegularFiles: cfg.Local.SkipNonRegularFiles,
6464
IncludePerm: false,
@@ -100,7 +100,7 @@ func upload(ctx context.Context, client apigen.ClientWithResponsesInterface, sou
100100
}()
101101
objectPath := apiutil.Value(destURI.Path)
102102
if syncFlags.Presign {
103-
return helpers.ClientUploadPreSign(ctx, client, getHTTPClient(), destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp, syncFlags.PresignMultipart)
103+
return helpers.ClientUploadPreSign(ctx, client, getHTTPClient(lakectlRetryPolicy), destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp, syncFlags.PresignMultipart)
104104
}
105105
return helpers.ClientUpload(ctx, client, destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp)
106106
}

cmd/lakectl/cmd/local_checkout.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func localCheckout(cmd *cobra.Command, localPath string, specifiedRef string, co
5757
currentBase := remote.WithRef(idx.AtHead)
5858
diffs := local.Undo(localDiff(cmd.Context(), client, currentBase, idx.LocalPath()))
5959
sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(checkoutOperation))
60-
syncMgr := local.NewSyncManager(sigCtx, client, getHTTPClient(), buildLocalConfig(syncFlags, cfg))
60+
syncMgr := local.NewSyncManager(sigCtx, client, getHTTPClient(lakectlRetryPolicy), buildLocalConfig(syncFlags, cfg))
6161
// confirm on local changes
6262
if confirmByFlag && len(diffs) > 0 {
6363
fmt.Println("Uncommitted changes exist, the operation will revert all changes on local directory.")

cmd/lakectl/cmd/local_clone.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ var localCloneCmd = &cobra.Command{
8686
}
8787
sigCtx := localHandleSyncInterrupt(ctx, idx, string(cloneOperation))
8888
syncFlags := getSyncFlags(cmd, client, remote.Repository)
89-
s := local.NewSyncManager(sigCtx, client, getHTTPClient(), buildLocalConfig(syncFlags, cfg))
89+
s := local.NewSyncManager(sigCtx, client, getHTTPClient(lakectlRetryPolicy), buildLocalConfig(syncFlags, cfg))
9090
err = s.Sync(localPath, stableRemote, ch)
9191
if err != nil {
9292
DieErr(err)

cmd/lakectl/cmd/local_commit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ var localCommitCmd = &cobra.Command{
172172
}
173173

174174
sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(commitOperation))
175-
s := local.NewSyncManager(sigCtx, client, getHTTPClient(), buildLocalConfig(syncFlags, cfg))
175+
s := local.NewSyncManager(sigCtx, client, getHTTPClient(lakectlRetryPolicy), buildLocalConfig(syncFlags, cfg))
176176

177177
err = s.Sync(idx.LocalPath(), remote, c)
178178
if err != nil {

cmd/lakectl/cmd/local_pull.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ var localPullCmd = &cobra.Command{
6666
return nil
6767
})
6868
sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(pullOperation))
69-
s := local.NewSyncManager(sigCtx, client, getHTTPClient(), buildLocalConfig(syncFlags, cfg))
69+
s := local.NewSyncManager(sigCtx, client, getHTTPClient(lakectlRetryPolicy), buildLocalConfig(syncFlags, cfg))
7070

7171
err = s.Sync(idx.LocalPath(), newBase, c)
7272
if err != nil {

cmd/lakectl/cmd/login.go

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package cmd
22

33
import (
4-
"errors"
4+
"context"
55
"fmt"
66
"net/http"
77
"net/url"
8+
"slices"
89
"time"
910

10-
"github.com/cenkalti/backoff/v4"
1111
"github.com/skratchdot/open-golang/open"
1212
"github.com/spf13/cobra"
1313
"github.com/treeverse/lakefs/pkg/api/apigen"
@@ -28,11 +28,15 @@ type webLoginParams struct {
2828
}
2929

3030
var (
31-
errTryAgain = errors.New("HTTP request failed; retry")
32-
errFailedToGetToken = errors.New("failed to get token")
33-
errFailedToGetTokenDontTryAgain = backoff.Permanent(errFailedToGetToken)
31+
loginRetryStatuses = slices.Concat(lakectlDefaultRetryStatuses,
32+
[]int{http.StatusNotFound},
33+
)
3434
)
3535

36+
func loginRetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
37+
return CheckRetry(ctx, resp, err, loginRetryStatuses)
38+
}
39+
3640
var loginCmd = &cobra.Command{
3741
Use: "login",
3842
Short: "Use a web browser to log into lakeFS",
@@ -44,7 +48,7 @@ var loginCmd = &cobra.Command{
4448
DieErr(fmt.Errorf("get server URL %s: %w", cfg.Server.EndpointURL, err))
4549
}
4650

47-
client := getClient()
51+
client := getClient(apigen.WithHTTPClient(getHTTPClient(loginRetryPolicy)))
4852
tokenRedirect, err := client.GetTokenRedirectWithResponse(cmd.Context())
4953
// TODO(ariels): Change back to http.StatusSeeOther after fixing lakeFS server!
5054
DieOnErrorOrUnexpectedStatusCode(tokenRedirect, err, http.StatusOK)
@@ -64,34 +68,18 @@ var loginCmd = &cobra.Command{
6468
// Keep going, user can manually use the URL.
6569
}
6670

67-
loginToken, err := backoff.RetryWithData(
68-
func() (*apigen.AuthenticationToken, error) {
69-
resp, err := client.GetTokenFromMailboxWithResponse(cmd.Context(), mailbox)
70-
if err != nil {
71-
return nil, err
72-
}
73-
if resp.JSON404 != nil {
74-
return nil, errTryAgain
75-
}
76-
if resp.JSON200 == nil {
77-
return nil, errFailedToGetTokenDontTryAgain
78-
}
79-
return resp.JSON200, nil
80-
},
81-
// Initial backoff is rapid, in case user is already logged in. Then
82-
// slow down considerably so user can log in!
83-
backoff.NewExponentialBackOff(backoff.WithInitialInterval(80*time.Millisecond),
84-
backoff.WithMultiplier(1.5),
85-
backoff.WithMaxInterval(time.Second),
86-
backoff.WithMaxElapsedTime(20*time.Second),
87-
),
88-
)
89-
if err != nil {
90-
DieErr(fmt.Errorf("get login token: %w", err))
91-
}
71+
// client will retry; use this to wait for login.
72+
//
73+
// TODO(ariels): The timeouts on some lakectl configurations may be too low for
74+
// convenient login. Consider using a RetryClient based on a different
75+
// configuration here..
76+
resp, err := client.GetTokenFromMailboxWithResponse(cmd.Context(), mailbox)
77+
78+
DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK)
9279

80+
loginToken := resp.JSON200
9381
if loginToken == nil {
94-
Die("nil login token", 1)
82+
Die("No login token", 1)
9583
}
9684

9785
cache := getTokenCacheOnce()

cmd/lakectl/cmd/retry_client.go

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http"
88
"net/url"
99
"regexp"
10+
"slices"
1011

1112
"github.com/hashicorp/go-retryablehttp"
1213
)
@@ -18,7 +19,7 @@ var (
1819
notTrustedErrorRe = regexp.MustCompile(`certificate is not trusted`)
1920
)
2021

21-
func NewRetryClient(retriesCfg RetriesCfg, transport *http.Transport) *http.Client {
22+
func NewRetryClient(retriesCfg RetriesCfg, transport *http.Transport, checkRetry func(ctx context.Context, resp *http.Response, err error) (bool, error)) *http.Client {
2223
retryClient := retryablehttp.NewClient()
2324
if transport != nil {
2425
retryClient.HTTPClient.Transport = transport
@@ -27,19 +28,29 @@ func NewRetryClient(retriesCfg RetriesCfg, transport *http.Transport) *http.Clie
2728
retryClient.RetryMax = int(retriesCfg.MaxAttempts)
2829
retryClient.RetryWaitMin = retriesCfg.MinWaitInterval
2930
retryClient.RetryWaitMax = retriesCfg.MaxWaitInterval
30-
retryClient.CheckRetry = lakectlRetryPolicy
31+
retryClient.CheckRetry = checkRetry
3132
return retryClient.StandardClient()
3233
}
3334

34-
// lakectl retry policy - we retry in the following cases:
35-
// HTTP status 429 - too many requests
36-
// HTTP status 500 - internal server error - could be recoverable
37-
// HTTP status 503 - service unavailable
38-
// We retry on all client transport errors except for:
39-
// - too many redirects
35+
// ShouldRetry checks if we should retry on this (HTTP) status.
36+
type ShouldRetryer []int
37+
38+
func (s ShouldRetryer) Retry(status int) bool {
39+
return slices.Contains(s, status)
40+
}
41+
42+
// MakeRetryPolicy makes a retry policy.
43+
//
44+
// - It will _never_ retry on these unrecoverable errors:
45+
//
46+
// - a context error (typically canceled or deadline exceeded);
4047
// - invalid http scheme/protocol
41-
// - TLS cert verification failure
42-
func lakectlRetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
48+
// - TLS cert validation failure
49+
//
50+
// - Any other error is retriable.
51+
//
52+
// - When there is no error, it will retry if should says to retry this status.
53+
func CheckRetry(ctx context.Context, resp *http.Response, err error, should ShouldRetryer) (bool, error) {
4354
// do not retry on context.Canceled or context.DeadlineExceeded
4455
if ctx.Err() != nil {
4556
return false, ctx.Err()
@@ -58,16 +69,29 @@ func lakectlRetryPolicy(ctx context.Context, resp *http.Response, err error) (bo
5869
return false, errors.Unwrap(v)
5970
}
6071
}
61-
// The stblib http.Client wraps the above errors in a url.Error
72+
// The standard http.Client wraps the above errors in a url.Error
6273
// They aren't retryable. Other errors are retryable.
6374
return true, nil
6475
}
6576

6677
// handle HTTP response status code
67-
if resp.StatusCode == http.StatusTooManyRequests ||
68-
resp.StatusCode == http.StatusInternalServerError ||
69-
resp.StatusCode == http.StatusServiceUnavailable {
70-
return true, nil
71-
}
72-
return false, nil
78+
return should.Retry(resp.StatusCode), nil
79+
}
80+
81+
// lakectl retry policy - we retry in the following cases:
82+
// HTTP status 429 - too many requests
83+
// HTTP status 500 - internal server error - could be recoverable
84+
// HTTP status 503 - service unavailable
85+
// We retry on all client transport errors except for:
86+
// - too many redirects
87+
// - invalid http scheme/protocol
88+
// - TLS cert verification failure
89+
func lakectlRetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
90+
return CheckRetry(ctx, resp, err, lakectlDefaultRetryStatuses)
91+
}
92+
93+
var lakectlDefaultRetryStatuses = ShouldRetryer{
94+
http.StatusTooManyRequests,
95+
http.StatusInternalServerError,
96+
http.StatusServiceUnavailable,
7397
}

cmd/lakectl/cmd/root.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ func sendStats(cmd *cobra.Command, cmdSuffix string) {
557557
}
558558
}
559559

560-
func getHTTPClient() *http.Client {
560+
func getHTTPClient(checkRetry func(ctx context.Context, resp *http.Response, err error) (bool, error)) *http.Client {
561561
// Override MaxIdleConnsPerHost to allow highly concurrent access to our API client.
562562
// This is done to avoid accumulating many sockets in `TIME_WAIT` status that were closed
563563
// only to be immediately reopened.
@@ -571,7 +571,7 @@ func getHTTPClient() *http.Client {
571571
if !cfg.Server.Retries.Enabled {
572572
return &http.Client{Transport: transport}
573573
}
574-
return NewRetryClient(cfg.Server.Retries, transport)
574+
return NewRetryClient(cfg.Server.Retries, transport, checkRetry)
575575
}
576576

577577
func newAWSIAMAuthProviderConfig() (*awsiam.IAMAuthParams, error) {
@@ -610,9 +610,8 @@ func newAWSIAMAuthProviderConfig() (*awsiam.IAMAuthParams, error) {
610610
return awsiam.NewIAMAuthParams(host, opts...), nil
611611
}
612612

613-
func getClient() *apigen.ClientWithResponses {
614-
opts := []apigen.ClientOption{}
615-
httpClient := getHTTPClient()
613+
func getClient(opts ...apigen.ClientOption) *apigen.ClientWithResponses {
614+
httpClient := getHTTPClient(lakectlRetryPolicy)
616615
accessKeyID := cfg.Credentials.AccessKeyID
617616
secretAccessKey := cfg.Credentials.SecretAccessKey
618617
basicAuthProvider, err := securityprovider.NewSecurityProviderBasicAuth(string(accessKeyID), string(secretAccessKey))
@@ -630,7 +629,7 @@ func getClient() *apigen.ClientWithResponses {
630629

631630
useJWTAuth := accessKeyID == "" && secretAccessKey == ""
632631
if useJWTAuth {
633-
opts = getClientOptions(awsIAMparams, serverEndpoint)
632+
opts = append(getClientOptions(awsIAMparams, serverEndpoint), opts...)
634633
}
635634

636635
oss := osinfo.GetOSInfo()

0 commit comments

Comments
 (0)