Skip to content

Commit 6f79180

Browse files
craig[bot]kev-cao
andcommitted
Merge #149440
149440: restore: add secondary retry policy to restore r=dt a=kev-cao This commit teaches restore to use a more dynamic retry policy that depends on the progress of the restore. Previously, restore would attempt a retry a set amount of times before failing with some minimal max backoff. This approach works relatively well in cases where a restore is failing early into a job and presumably there exists some issue blocking restore and we want to fail early. However, for restores that have been running to near completion, it becomes increasingly more likely that the error is a transient and due to some temporary cluster unhealthiness. In these cases, it is beneficial to retry with a longer backup and for a longer period of time before reporting a failure. We now update restore such that if the restore progress has exceeded some predefined threshold, we switch from an attempt-limited retry mechanism to a duration-limited mechanism with larger backoffs. Epic: CRDB-51362 Fixes: #148028 Release note: None Co-authored-by: Kevin Cao <[email protected]>
2 parents 2f72cf4 + 3416430 commit 6f79180

File tree

7 files changed

+225
-78
lines changed

7 files changed

+225
-78
lines changed

pkg/backup/backup_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,26 +1404,27 @@ func TestRestoreJobRetryReset(t *testing.T) {
14041404
}{}
14051405
waitForProgress := make(chan struct{})
14061406

1407-
maxRetries := 4
1408-
14091407
params := base.TestClusterArgs{}
14101408
knobs := base.TestingKnobs{
14111409
BackupRestore: &sql.BackupRestoreTestingKnobs{
14121410
RestoreDistSQLRetryPolicy: &retry.Options{
14131411
InitialBackoff: time.Microsecond,
14141412
Multiplier: 2,
14151413
MaxBackoff: 2 * time.Microsecond,
1416-
MaxRetries: maxRetries,
1414+
MaxDuration: time.Second,
14171415
},
1416+
// Disable switching to the secondary retry policy for this test since it
1417+
// is not relevant to the test. Set to an unachievable value.
1418+
RestoreRetryProgressThreshold: 1.1,
14181419
RunBeforeRestoreFlow: func() error {
14191420
mu.Lock()
14201421
defer mu.Unlock()
1421-
if mu.retryCount >= maxRetries-1 {
1422-
return nil
1422+
if mu.retryCount < maxRestoreRetryFastFail {
1423+
mu.retryCount++
1424+
// Send a retryable error
1425+
return syscall.ECONNRESET
14231426
}
1424-
mu.retryCount++
1425-
// Send a retryable error
1426-
return syscall.ECONNRESET
1427+
return nil
14271428
},
14281429
RunAfterRestoreFlow: func() error {
14291430
mu.Lock()
@@ -1454,9 +1455,9 @@ func TestRestoreJobRetryReset(t *testing.T) {
14541455
})
14551456
close(waitForProgress)
14561457

1457-
jobutils.WaitForJobToPause(t, sqlDB, restoreJobId)
1458+
jobutils.WaitForJobToFail(t, sqlDB, restoreJobId)
14581459

1459-
require.Greater(t, mu.retryCount, maxRetries+2)
1460+
require.Greater(t, mu.retryCount, maxRestoreRetryFastFail+2)
14601461
}
14611462

14621463
// TestRestoreRetryProcErr tests that the restore data processor will mark
@@ -1493,7 +1494,7 @@ func TestRestoreRetryProcErr(t *testing.T) {
14931494
InitialBackoff: time.Microsecond,
14941495
Multiplier: 2,
14951496
MaxBackoff: 2 * time.Microsecond,
1496-
MaxRetries: 4,
1497+
MaxDuration: time.Second,
14971498
},
14981499
RunBeforeRestoreFlow: func() error {
14991500
mu.Lock()

pkg/backup/full_cluster_backup_restore_test.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -689,19 +689,13 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
689689
sqlDB.Exec(t, `BACKUP data.bank INTO 'nodelocal://1/throwawayjob'`)
690690
sqlDB.Exec(t, `BACKUP INTO $1`, localFoo)
691691

692-
waitForJobPauseCancel := func(db *sqlutils.SQLRunner, jobID jobspb.JobID) {
693-
db.Exec(t, `USE system;`)
694-
jobutils.WaitForJobToPause(t, db, jobID)
695-
db.Exec(t, `CANCEL JOB $1`, jobID)
696-
jobutils.WaitForJobToCancel(t, db, jobID)
697-
}
698-
699692
t.Run("during restoration of data", func(t *testing.T) {
700693
_, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, base.TestClusterArgs{})
701694
defer cleanupEmptyCluster()
702695
var jobID jobspb.JobID
703696
sqlDBRestore.QueryRow(t, `RESTORE FROM LATEST IN 'nodelocal://1/missing-ssts' WITH detached`).Scan(&jobID)
704-
waitForJobPauseCancel(sqlDBRestore, jobID)
697+
sqlDBRestore.Exec(t, "USE system")
698+
jobutils.WaitForJobToFail(t, sqlDBRestore, jobID)
705699
// Verify the failed RESTORE added some DROP tables.
706700
// Note that the system tables here correspond to the temporary tables
707701
// imported, not the system tables themselves.

pkg/backup/restore_data_processor.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,6 @@ var defaultNumWorkers = metamorphic.ConstantWithTestRange(
119119
1, /* metamorphic min */
120120
8, /* metamorphic max */
121121
)
122-
var retryableRestoreProcError = errors.New("restore processor error after forward progress")
123-
var restoreProcError = errors.New("restore processor error without forward progress")
124122

125123
// TODO(pbardea): It may be worthwhile to combine this setting with the one that
126124
// controls the number of concurrent AddSSTable requests if each restore worker
@@ -647,11 +645,6 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce
647645
if !ok {
648646
// Done. Check if any phase exited early with an error.
649647
err := rd.phaseGroup.Wait()
650-
if rd.progressMade {
651-
err = errors.Mark(err, retryableRestoreProcError)
652-
} else {
653-
err = errors.Mark(err, restoreProcError)
654-
}
655648
rd.MoveToDraining(err)
656649
return nil, rd.DrainHelper()
657650
}

pkg/backup/restore_job.go

Lines changed: 79 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
2323
"github.com/cockroachdb/cockroach/pkg/clusterversion"
2424
"github.com/cockroachdb/cockroach/pkg/jobs"
25-
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
2625
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2726
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
2827
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -81,10 +80,30 @@ import (
8180
"github.com/cockroachdb/errors"
8281
)
8382

83+
var (
84+
restoreRetryMaxDuration = settings.RegisterDurationSetting(
85+
settings.ApplicationLevel,
86+
"restore.retry_max_duration",
87+
"maximum duration a restore job will retry before terminating",
88+
72*time.Hour,
89+
settings.WithVisibility(settings.Reserved),
90+
settings.PositiveDuration,
91+
)
92+
)
93+
8494
// restoreStatsInsertBatchSize is an arbitrarily chosen value of the number of
8595
// tables we process in a single txn when restoring their table statistics.
8696
const restoreStatsInsertBatchSize = 10
8797

98+
// maxRestoreRetryFastFail is the maximum number of times we will retry without
99+
// seeing any progress before fast-failing the restore job.
100+
const maxRestoreRetryFastFail = 5
101+
102+
// restoreRetryProgressThreshold is the fraction of the job that must
103+
// be _exceeded_ before we no longer fast fail the restore job after hitting the
104+
// maxRestoreRetryFastFail threshold.
105+
const restoreRetryProgressThreshold = 0
106+
88107
var restoreStatsInsertionConcurrency = settings.RegisterIntSetting(
89108
settings.ApplicationLevel,
90109
"bulkio.restore.insert_stats_workers",
@@ -154,42 +173,30 @@ func rewriteBackupSpanKey(
154173
return newKey, nil
155174
}
156175

176+
// restoreWithRetry attempts to run restore with retry logic and logs retries
177+
// accordingly.
157178
func restoreWithRetry(
158-
restoreCtx context.Context,
179+
ctx context.Context,
159180
execCtx sql.JobExecContext,
181+
resumer *restoreResumer,
160182
backupManifests []backuppb.BackupManifest,
161183
backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo,
162184
endTime hlc.Timestamp,
163185
dataToRestore restorationData,
164-
resumer *restoreResumer,
165186
encryption *jobspb.BackupEncryptionOptions,
166187
kmsEnv cloud.KMSEnv,
167188
) (roachpb.RowCount, error) {
168-
169-
// We retry on pretty generic failures -- any rpc error. If a worker node were
170-
// to restart, it would produce this kind of error, but there may be other
171-
// errors that are also rpc errors. Don't retry to aggressively.
172-
retryOpts := retry.Options{
173-
MaxBackoff: 1 * time.Second,
174-
MaxRetries: 5,
175-
}
176-
if execCtx.ExecCfg().BackupRestoreTestingKnobs != nil &&
177-
execCtx.ExecCfg().BackupRestoreTestingKnobs.RestoreDistSQLRetryPolicy != nil {
178-
retryOpts = *execCtx.ExecCfg().BackupRestoreTestingKnobs.RestoreDistSQLRetryPolicy
179-
}
180-
181189
// We want to retry a restore if there are transient failures (i.e. worker nodes
182-
// dying), so if we receive a retryable error, re-plan and retry the backup.
190+
// dying), so if we receive a retryable error, re-plan and retry the restore.
191+
retryOpts, progThreshold := getRetryOptionsAndProgressThreshold(execCtx)
183192
var (
184-
res roachpb.RowCount
185-
err error
186-
previousPersistedSpans jobspb.RestoreFrontierEntries
187-
currentPersistedSpans jobspb.RestoreFrontierEntries
193+
res roachpb.RowCount
194+
err error
195+
currPersistedSpans, prevPersistedSpans jobspb.RestoreFrontierEntries
188196
)
189-
190-
for r := retry.StartWithCtx(restoreCtx, retryOpts); r.Next(); {
197+
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
191198
res, err = restore(
192-
restoreCtx,
199+
ctx,
193200
execCtx,
194201
backupManifests,
195202
backupLocalityInfo,
@@ -204,30 +211,34 @@ func restoreWithRetry(
204211
break
205212
}
206213

207-
if errors.HasType(err, &kvpb.InsufficientSpaceError{}) || errors.Is(err, restoreProcError) {
214+
if errors.HasType(err, &kvpb.InsufficientSpaceError{}) {
208215
return roachpb.RowCount{}, jobs.MarkPauseRequestError(errors.UnwrapAll(err))
209216
}
210-
211-
if joberror.IsPermanentBulkJobError(err) && !errors.Is(err, retryableRestoreProcError) {
212-
return roachpb.RowCount{}, err
213-
}
214-
215217
// If we are draining, it is unlikely we can start a
216218
// new DistSQL flow. Exit with a retryable error so
217219
// that another node can pick up the job.
218220
if execCtx.ExecCfg().JobRegistry.IsDraining() {
219221
return roachpb.RowCount{}, jobs.MarkAsRetryJobError(errors.Wrapf(err, "job encountered retryable error on draining node"))
220222
}
221223

222-
log.Warningf(restoreCtx, "encountered retryable error: %+v", err)
223-
currentPersistedSpans = resumer.job.Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint
224-
if !currentPersistedSpans.Equal(previousPersistedSpans) {
224+
log.Warningf(ctx, "encountered retryable error: %+v", err)
225+
226+
// Check if retry counter should be reset if progress was made.
227+
currPersistedSpans = resumer.job.
228+
Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint
229+
if !currPersistedSpans.Equal(prevPersistedSpans) {
225230
// If the previous persisted spans are different than the current, it
226231
// implies that further progress has been persisted.
227232
r.Reset()
228-
log.Infof(restoreCtx, "restored frontier has advanced since last retry, resetting retry counter")
233+
log.Infof(ctx, "restored frontier has advanced since last retry, resetting retry counter")
234+
}
235+
prevPersistedSpans = currPersistedSpans
236+
237+
// Fail fast if no progress has been made after a certain number of retries.
238+
if r.CurrentAttempt() >= maxRestoreRetryFastFail &&
239+
resumer.job.FractionCompleted() <= progThreshold {
240+
return roachpb.RowCount{}, errors.Wrap(err, "restore job exhausted max retries without making progress")
229241
}
230-
previousPersistedSpans = currentPersistedSpans
231242

232243
testingKnobs := execCtx.ExecCfg().BackupRestoreTestingKnobs
233244
if testingKnobs != nil && testingKnobs.RunAfterRetryIteration != nil {
@@ -237,18 +248,41 @@ func restoreWithRetry(
237248
}
238249
}
239250

240-
// We have exhausted retries, but we have not seen a "PermanentBulkJobError" so
241-
// it is possible that this is a transient error that is taking longer than
242-
// our configured retry to go away.
243-
//
244-
// Let's pause the job instead of failing it so that the user can decide
245-
// whether to resume it or cancel it.
251+
// Since the restore was able to make some progress before exhausting the
252+
// retry counter, we will pause the job and allow the user to determine
253+
// whether or not to resume the job or disccard all progress and cancel.
246254
if err != nil {
247255
return res, jobs.MarkPauseRequestError(errors.Wrap(err, "exhausted retries"))
248256
}
249257
return res, nil
250258
}
251259

260+
// getRetryOptionsAndProgressThreshold returns the restore retry options and
261+
// progress threshold for fast failure, taking into consideration any testing
262+
// knobs and cluster settings.
263+
func getRetryOptionsAndProgressThreshold(execCtx sql.JobExecContext) (retry.Options, float32) {
264+
// In the event that the job is failing early without any progress, we will
265+
// manually quit out of the retry loop prematurely. As such, we set a long max
266+
// duration and backoff to allow for the job to retry for a long time in the
267+
// event that some progress has been made.
268+
maxDuration := restoreRetryMaxDuration.Get(&execCtx.ExecCfg().Settings.SV)
269+
retryOpts := retry.Options{
270+
MaxBackoff: 5 * time.Minute,
271+
MaxDuration: maxDuration,
272+
}
273+
var progThreshold float32 = restoreRetryProgressThreshold
274+
if knobs := execCtx.ExecCfg().BackupRestoreTestingKnobs; knobs != nil {
275+
if knobs.RestoreDistSQLRetryPolicy != nil {
276+
retryOpts = *knobs.RestoreDistSQLRetryPolicy
277+
}
278+
if knobs.RestoreRetryProgressThreshold > 0 {
279+
progThreshold = knobs.RestoreRetryProgressThreshold
280+
}
281+
}
282+
283+
return retryOpts, progThreshold
284+
}
285+
252286
type storeByLocalityKV map[string]cloudpb.ExternalStorage
253287

254288
func makeBackupLocalityMap(
@@ -490,7 +524,7 @@ func restore(
490524
case <-timer.C:
491525
// Replan the restore job if it has been 10 minutes since the last
492526
// processor completed working.
493-
return errors.Mark(laggingRestoreProcErr, retryableRestoreProcError)
527+
return laggingRestoreProcErr
494528
}
495529
}
496530
}
@@ -1929,11 +1963,11 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
19291963
res, err := restoreWithRetry(
19301964
ctx,
19311965
p,
1966+
r,
19321967
backupManifests,
19331968
details.BackupLocalityInfo,
19341969
details.EndTime,
19351970
preData,
1936-
r,
19371971
details.Encryption,
19381972
&kmsEnv,
19391973
)
@@ -1969,11 +2003,11 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
19692003
res, err := restoreWithRetry(
19702004
ctx,
19712005
p,
2006+
r,
19722007
backupManifests,
19732008
details.BackupLocalityInfo,
19742009
details.EndTime,
19752010
preValidateData,
1976-
r,
19772011
details.Encryption,
19782012
&kmsEnv,
19792013
)
@@ -1990,11 +2024,11 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
19902024
res, err := restoreWithRetry(
19912025
ctx,
19922026
p,
2027+
r,
19932028
backupManifests,
19942029
details.BackupLocalityInfo,
19952030
details.EndTime,
19962031
mainData,
1997-
r,
19982032
details.Encryption,
19992033
&kmsEnv,
20002034
)

0 commit comments

Comments
 (0)