Skip to content

Commit 11f0017

Browse files
authored
Improve download robustness with retry logic for body operations (#9629)
1 parent c36bbbf commit 11f0017

File tree

1 file changed

+97
-51
lines changed

1 file changed

+97
-51
lines changed

pkg/api/helpers/download.go

Lines changed: 97 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"net/http"
88
"os"
99
"path/filepath"
10+
"time"
1011

12+
"github.com/cenkalti/backoff/v4"
1113
"github.com/go-openapi/swag"
1214
"github.com/hashicorp/go-retryablehttp"
1315
"github.com/jedib0t/go-pretty/v6/progress"
@@ -25,6 +27,22 @@ const (
2527
DefaultDownloadConcurrency = 10
2628
)
2729

30+
// Backoff parameters for download retries
31+
const (
32+
DefaultDownloadInitialInterval = 100 * time.Millisecond
33+
DefaultDownloadMaxInterval = 2 * time.Second
34+
DefaultDownloadMaxElapsedTime = 30 * time.Second
35+
)
36+
37+
// newDownloadBackoff creates a backoff strategy for download retries.
38+
func newDownloadBackoff() backoff.BackOff {
39+
return backoff.NewExponentialBackOff(
40+
backoff.WithInitialInterval(DefaultDownloadInitialInterval),
41+
backoff.WithMaxInterval(DefaultDownloadMaxInterval),
42+
backoff.WithMaxElapsedTime(DefaultDownloadMaxElapsedTime),
43+
)
44+
}
45+
2846
type Downloader struct {
2947
Client *apigen.ClientWithResponses
3048
PreSign bool
@@ -155,9 +173,7 @@ func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI,
155173
if err != nil {
156174
return err
157175
}
158-
defer func() {
159-
_ = f.Close()
160-
}()
176+
defer func() { _ = f.Close() }()
161177

162178
// make sure the destination file is in the right size
163179
if err := f.Truncate(size); err != nil {
@@ -218,25 +234,9 @@ func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI,
218234
}
219235

220236
func (d *Downloader) downloadPresignedPart(ctx context.Context, physicalAddress string, rangeStart int64, partSize int64, partNumber int, f *os.File, buf []byte) error {
237+
// set range header
221238
rangeEnd := rangeStart + partSize - 1
222239
rangeHeader := fmt.Sprintf("bytes=%d-%d", rangeStart, rangeEnd)
223-
req, err := http.NewRequestWithContext(ctx, http.MethodGet, physicalAddress, nil)
224-
if err != nil {
225-
return err
226-
}
227-
req.Header.Set("Range", rangeHeader)
228-
resp, err := d.HTTPClient.Do(req)
229-
if err != nil {
230-
return err
231-
}
232-
defer func() { _ = resp.Body.Close() }()
233-
234-
if resp.StatusCode != http.StatusPartialContent {
235-
return fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
236-
}
237-
if resp.ContentLength != partSize {
238-
return fmt.Errorf("%w: part %d expected %d bytes, got %d", ErrRequestFailed, partNumber, partSize, resp.ContentLength)
239-
}
240240

241241
// reuse buffer if possible
242242
if buf == nil {
@@ -245,48 +245,94 @@ func (d *Downloader) downloadPresignedPart(ctx context.Context, physicalAddress
245245
buf = buf[:partSize]
246246
}
247247

248-
_, err = io.ReadFull(resp.Body, buf)
249-
if err != nil {
250-
return err
248+
operation := func() error {
249+
// create request with range header
250+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, physicalAddress, nil)
251+
if err != nil {
252+
return backoff.Permanent(err)
253+
}
254+
req.Header.Set("Range", rangeHeader)
255+
256+
resp, err := d.HTTPClient.Do(req)
257+
if err != nil {
258+
return err
259+
}
260+
defer func() { _ = resp.Body.Close() }()
261+
262+
if resp.StatusCode != http.StatusPartialContent {
263+
err := fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
264+
return backoff.Permanent(err)
265+
}
266+
if resp.ContentLength != partSize {
267+
err := fmt.Errorf("%w: part %d expected %d bytes, got %d", ErrRequestFailed, partNumber, partSize, resp.ContentLength)
268+
return backoff.Permanent(err)
269+
}
270+
_, readErr := io.ReadFull(resp.Body, buf)
271+
return readErr
251272
}
252273

253-
_, err = f.WriteAt(buf, rangeStart)
254-
if err != nil {
255-
return err
274+
bo := backoff.WithContext(newDownloadBackoff(), ctx)
275+
notification := func(err error, d time.Duration) {
276+
logging.FromContext(ctx).WithError(err).Warnf("Download of '%s' part %d failed, retrying in %s", f.Name(), partNumber, d)
256277
}
257-
return nil
278+
if err := backoff.RetryNotify(operation, bo, notification); err != nil {
279+
return fmt.Errorf("failed to download '%s' part %d: %w", f.Name(), partNumber, err)
280+
}
281+
282+
_, err := f.WriteAt(buf, rangeStart)
283+
return err
258284
}
259285

260286
func (d *Downloader) downloadObject(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker) error {
261-
// get object content
262-
resp, err := d.Client.GetObject(ctx, src.Repository, src.Ref, &apigen.GetObjectParams{
263-
Path: *src.Path,
264-
Presign: swag.Bool(d.PreSign),
265-
})
266-
if err != nil {
267-
return err
268-
}
269-
defer func() { _ = resp.Body.Close() }()
270-
if resp.StatusCode != http.StatusOK {
271-
return fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
272-
}
287+
operation := func() error {
288+
// get object content
289+
resp, err := d.Client.GetObject(ctx, src.Repository, src.Ref, &apigen.GetObjectParams{
290+
Path: *src.Path,
291+
Presign: swag.Bool(d.PreSign),
292+
})
293+
if err != nil {
294+
return backoff.Permanent(err)
295+
}
296+
defer func() {
297+
_ = resp.Body.Close()
298+
}()
273299

274-
if tracker != nil && resp.ContentLength != -1 {
275-
tracker.UpdateTotal(resp.ContentLength)
276-
}
300+
if resp.StatusCode != http.StatusOK {
301+
err := fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
302+
return backoff.Permanent(err)
303+
}
277304

278-
// create and copy object content
279-
f, err := os.Create(dst)
280-
if err != nil {
305+
// create and copy object content
306+
f, err := os.Create(dst)
307+
if err != nil {
308+
return backoff.Permanent(err)
309+
}
310+
defer func() {
311+
_ = f.Close()
312+
}()
313+
314+
// w is used to write the data, it will be wrapped with a tracker if needed
315+
var w io.Writer = f
316+
if tracker != nil {
317+
tracker.Reset()
318+
if resp.ContentLength != -1 {
319+
tracker.UpdateTotal(resp.ContentLength)
320+
}
321+
w = NewTrackerWriter(f, tracker)
322+
}
323+
324+
_, err = io.Copy(w, resp.Body)
281325
return err
282326
}
283-
defer func() { _ = f.Close() }()
284-
var w io.Writer = f
285-
if tracker != nil {
286-
w = NewTrackerWriter(f, tracker)
327+
328+
b := backoff.WithContext(newDownloadBackoff(), ctx)
329+
notification := func(err error, d time.Duration) {
330+
logging.FromContext(ctx).WithError(err).Warnf("Download of object '%s' failed, retrying in %s", dst, d)
287331
}
288-
_, err = io.Copy(w, resp.Body)
289-
return err
332+
if err := backoff.RetryNotify(operation, b, notification); err != nil {
333+
return fmt.Errorf("failed to download '%s' object: %w", dst, err)
334+
}
335+
return nil
290336
}
291337

292338
// Tracker interface for tracking written data.

0 commit comments

Comments
 (0)