Skip to content

Commit a2c9cc8

Browse files
committed
backup: add retry loop to doDownloadFiles
Previously, `doDownloadFiles` would immediately fail the download job upon encountering an error. This commit teaches the download workers to retry the download requests before giving up and failing. Epic: CRDB-51394 Fixes: #148081 Release note: Download phase of fast restore now will retry downloads before giving up.
1 parent 9015a1c commit a2c9cc8

File tree

4 files changed

+123
-7
lines changed

4 files changed

+123
-7
lines changed

pkg/backup/restore_online.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ var onlineRestoreLayerLimit = settings.RegisterIntSetting(
6464
)
6565

6666
const linkCompleteKey = "link_complete"
67+
const maxDownloadAttempts = 5
6768

6869
// splitAndScatter runs through all entries produced by genSpans splitting and
6970
// scattering the key-space designated by the passed rewriter such that if all
@@ -556,15 +557,34 @@ func (r *restoreResumer) sendDownloadWorker(
556557
ctx, tsp := tracing.ChildSpan(ctx, "backup.sendDownloadWorker")
557558
defer tsp.Finish()
558559

559-
for rt := retry.StartWithCtx(
560-
ctx, retry.Options{InitialBackoff: time.Millisecond * 100, MaxBackoff: time.Second * 10},
561-
); ; rt.Next() {
560+
testingKnobs := execCtx.ExecCfg().BackupRestoreTestingKnobs
561+
for {
562562
if err := ctx.Err(); err != nil {
563563
return err
564564
}
565565

566-
if err := sendDownloadSpan(ctx, execCtx, spans); err != nil {
567-
return err
566+
var err error
567+
for r := retry.StartWithCtx(ctx, retry.Options{
568+
InitialBackoff: time.Millisecond * 100,
569+
MaxBackoff: time.Second,
570+
MaxRetries: maxDownloadAttempts - 1,
571+
}); r.Next(); {
572+
err = func() error {
573+
if testingKnobs != nil && testingKnobs.RunBeforeSendingDownloadSpan != nil {
574+
if err := testingKnobs.RunBeforeSendingDownloadSpan(); err != nil {
575+
return err
576+
}
577+
}
578+
return sendDownloadSpan(ctx, execCtx, spans)
579+
}()
580+
if err == nil {
581+
break
582+
}
583+
log.VInfof(ctx, 1, "attempt %d failed to download spans: %v", r.CurrentAttempt(), err)
584+
}
585+
586+
if err != nil {
587+
return errors.Wrapf(err, "retries exhausted for sending download spans")
568588
}
569589

570590
// Wait for the completion poller to signal that it has checked our work.
@@ -576,6 +596,11 @@ func (r *restoreResumer) sendDownloadWorker(
576596
case <-ctx.Done():
577597
return ctx.Err()
578598
}
599+
600+
// Sleep a bit before sending download requests again to avoid a hot loop.
601+
// This will only be hit if after a successful download request, there are
602+
// still spans to download (e.g. because of a rabalancing).
603+
time.Sleep(10 * time.Second)
579604
}
580605
}
581606
}

pkg/backup/restore_online_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"path/filepath"
1515
"reflect"
1616
"strings"
17+
"sync/atomic"
1718
"testing"
1819

1920
"github.com/cockroachdb/cockroach/pkg/base"
@@ -32,6 +33,7 @@ import (
3233
"github.com/cockroachdb/cockroach/pkg/util/log"
3334
"github.com/cockroachdb/cockroach/pkg/util/randutil"
3435
"github.com/cockroachdb/cockroach/pkg/util/stop"
36+
"github.com/cockroachdb/errors"
3537
"github.com/kr/pretty"
3638
"github.com/stretchr/testify/require"
3739
)
@@ -581,6 +583,65 @@ func TestOnlineRestoreErrors(t *testing.T) {
581583
})
582584
}
583585

586+
func TestOnlineRestoreRetryingDownloadRequests(t *testing.T) {
587+
defer leaktest.AfterTest(t)()
588+
defer log.Scope(t).Close(t)
589+
590+
defer nodelocal.ReplaceNodeLocalForTesting(t.TempDir())()
591+
592+
rng, seed := randutil.NewPseudoRand()
593+
t.Logf("random seed: %d", seed)
594+
595+
alwaysFail := rng.Intn(2) == 0
596+
t.Logf("always fail download requests: %t", alwaysFail)
597+
totalFailures := int32(rng.Intn(maxDownloadAttempts-1) + 1)
598+
var currentFailures atomic.Int32
599+
600+
clusterArgs := base.TestClusterArgs{
601+
ServerArgs: base.TestServerArgs{
602+
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
603+
Knobs: base.TestingKnobs{
604+
BackupRestore: &sql.BackupRestoreTestingKnobs{
605+
RunBeforeSendingDownloadSpan: func() error {
606+
if alwaysFail {
607+
return errors.Newf("always fail download request")
608+
}
609+
if currentFailures.Load() >= totalFailures {
610+
return nil
611+
}
612+
currentFailures.Add(1)
613+
return errors.Newf("injected download request failure")
614+
},
615+
},
616+
},
617+
},
618+
}
619+
620+
const numAccounts = 1
621+
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(
622+
t, singleNode, numAccounts, InitManualReplication, clusterArgs,
623+
)
624+
defer cleanupFn()
625+
626+
externalStorage := "nodelocal://1/backup"
627+
sqlDB.Exec(t, fmt.Sprintf("BACKUP INTO '%s'", externalStorage))
628+
sqlDB.Exec(
629+
t,
630+
fmt.Sprintf(`
631+
RESTORE DATABASE data FROM LATEST IN '%s'
632+
WITH EXPERIMENTAL DEFERRED COPY, new_db_name=data2
633+
`, externalStorage),
634+
)
635+
636+
var downloadJobID jobspb.JobID
637+
sqlDB.QueryRow(t, latestDownloadJobIDQuery).Scan(&downloadJobID)
638+
if alwaysFail {
639+
jobutils.WaitForJobToFail(t, sqlDB, downloadJobID)
640+
} else {
641+
jobutils.WaitForJobToSucceed(t, sqlDB, downloadJobID)
642+
}
643+
}
644+
584645
func bankOnlineRestore(
585646
t *testing.T,
586647
sqlDB *sqlutils.SQLRunner,

pkg/server/span_download.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/storage"
1919
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
2020
"github.com/cockroachdb/cockroach/pkg/util/log"
21+
"github.com/cockroachdb/cockroach/pkg/util/retry"
2122
"github.com/cockroachdb/errors"
2223
"github.com/cockroachdb/errors/errorspb"
2324
"google.golang.org/grpc/codes"
2425
"google.golang.org/grpc/status"
2526
)
2627

28+
var perNodeMaxDownloadAttempts = 5
29+
2730
func (t *statusServer) DownloadSpan(
2831
ctx context.Context, req *serverpb.DownloadSpanRequest,
2932
) (*serverpb.DownloadSpanResponse, error) {
@@ -62,8 +65,31 @@ func (s *systemStatusServer) DownloadSpan(
6265
// Send DownloadSpan request to all stores on all nodes.
6366
remoteRequest := *req
6467
remoteRequest.NodeID = "local"
65-
nodeFn := func(ctx context.Context, status serverpb.RPCStatusClient, _ roachpb.NodeID) (*serverpb.DownloadSpanResponse, error) {
66-
return status.DownloadSpan(ctx, &remoteRequest)
68+
nodeFn := func(
69+
ctx context.Context, status serverpb.RPCStatusClient, _ roachpb.NodeID,
70+
) (*serverpb.DownloadSpanResponse, error) {
71+
var nodeResp *serverpb.DownloadSpanResponse
72+
var err error
73+
for r := retry.StartWithCtx(ctx, retry.Options{
74+
InitialBackoff: time.Millisecond * 100,
75+
MaxBackoff: time.Second,
76+
MaxRetries: perNodeMaxDownloadAttempts - 1,
77+
}); r.Next(); {
78+
nodeResp, err = status.DownloadSpan(ctx, &remoteRequest)
79+
for node, encodedErr := range resp.Errors {
80+
err = errors.Wrapf(
81+
errors.DecodeError(ctx, encodedErr),
82+
"download span request failed with error on node n%d",
83+
node,
84+
)
85+
break
86+
}
87+
if err == nil {
88+
break
89+
}
90+
log.VInfof(ctx, 1, "attempt %d failed to download span: %v", r.CurrentAttempt(), err)
91+
}
92+
return nodeResp, err
6793
}
6894
responseFn := func(nodeID roachpb.NodeID, downloadSpanResp *serverpb.DownloadSpanResponse) {
6995
for i, e := range downloadSpanResp.Errors {

pkg/sql/exec_util_backup.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ type BackupRestoreTestingKnobs struct {
6969
RunAfterRestoreProcDrains func()
7070

7171
RunBeforeResolvingCompactionDest func() error
72+
73+
// RunBeforeSendingDownloadSpan is called within the retry loop of the
74+
// download span worker before sending the download span request.
75+
RunBeforeSendingDownloadSpan func() error
7276
}
7377

7478
var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{}

0 commit comments

Comments
 (0)