Skip to content

Commit 893a0a4

Browse files
committed
restore: log restore retries to job messages table
Currently, when a restore job is stuck in a retry loop, no indication is made to the user that errors were encountered and the job is retrying (aside from within the logs). This commit teaches the restore retrier to now log errors to the job messages table, throttled to once every 5 minutes to avoid a hot loop. Epic: CRDB-50823 Fixes: #149787, #148033 Release note (general change): Restore jobs now log errors on retry to the job messages table.
1 parent 43d1bd4 commit 893a0a4

File tree

2 files changed

+129
-25
lines changed

2 files changed

+129
-25
lines changed

pkg/backup/restore_job.go

Lines changed: 64 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ import (
6363
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
6464
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
6565
"github.com/cockroachdb/cockroach/pkg/sql/stats"
66+
"github.com/cockroachdb/cockroach/pkg/util"
6667
bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk"
6768
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
6869
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -91,20 +92,31 @@ var (
9192
settings.WithVisibility(settings.Reserved),
9293
settings.PositiveDuration,
9394
)
95+
96+
restoreRetryLogRate = settings.RegisterDurationSetting(
97+
settings.ApplicationLevel,
98+
"restore.retry_log_rate",
99+
"maximum rate at which retryable restore errors are logged to the job messages table",
100+
5*time.Minute,
101+
settings.WithVisibility(settings.Reserved),
102+
settings.PositiveDuration,
103+
)
94104
)
95105

96-
// restoreStatsInsertBatchSize is an arbitrarily chosen value of the number of
97-
// tables we process in a single txn when restoring their table statistics.
98-
const restoreStatsInsertBatchSize = 10
106+
const (
107+
// restoreStatsInsertBatchSize is an arbitrarily chosen value of the number of
108+
// tables we process in a single txn when restoring their table statistics.
109+
restoreStatsInsertBatchSize = 10
99110

100-
// maxRestoreRetryFastFail is the maximum number of times we will retry without
101-
// seeing any progress before fast-failing the restore job.
102-
const maxRestoreRetryFastFail = 5
111+
// maxRestoreRetryFastFail is the maximum number of times we will retry before
112+
// exceeding the restoreRetryProgressThreshold.
113+
maxRestoreRetryFastFail = 5
103114

104-
// restoreRetryProgressThreshold is the fraction of the job that must
105-
// be _exceeded_ before we no longer fast fail the restore job after hitting the
106-
// maxRestoreRetryFastFail threshold.
107-
const restoreRetryProgressThreshold = 0
115+
// restoreRetryProgressThreshold is the fraction of the job that must
116+
// be _exceeded_ before we no longer fast fail the restore job after hitting the
117+
// maxRestoreRetryFastFail threshold.
118+
restoreRetryProgressThreshold = 0
119+
)
108120

109121
var restoreStatsInsertionConcurrency = settings.RegisterIntSetting(
110122
settings.ApplicationLevel,
@@ -198,10 +210,12 @@ func restoreWithRetry(
198210
// We want to retry a restore if there are transient failures (i.e. worker nodes
199211
// dying), so if we receive a retryable error, re-plan and retry the restore.
200212
retryOpts, progThreshold := getRetryOptionsAndProgressThreshold(execCtx)
213+
logRate := restoreRetryLogRate.Get(&execCtx.ExecCfg().Settings.SV)
214+
logThrottler := util.Every(logRate)
201215
var (
202-
res roachpb.RowCount
203-
err error
204-
currPersistedSpans, prevPersistedSpans jobspb.RestoreFrontierEntries
216+
res roachpb.RowCount
217+
err error
218+
prevPersistedSpans jobspb.RestoreFrontierEntries
205219
)
206220
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
207221
res, err = restore(
@@ -237,16 +251,19 @@ func restoreWithRetry(
237251

238252
log.Warningf(ctx, "encountered retryable error: %+v", err)
239253

240-
// Check if retry counter should be reset if progress was made.
241-
currPersistedSpans = resumer.job.
242-
Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint
243-
if !currPersistedSpans.Equal(prevPersistedSpans) {
244-
// If the previous persisted spans are different than the current, it
245-
// implies that further progress has been persisted.
246-
r.Reset()
247-
log.Dev.Infof(ctx, "restored frontier has advanced since last retry, resetting retry counter")
254+
if logThrottler.ShouldProcess(timeutil.Now()) {
255+
// We throttle the logging of errors to the jobs messages table to avoid
256+
// flooding the table during the hot loop of a retry.
257+
if err := execCtx.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
258+
return resumer.job.Messages().Record(
259+
ctx, txn, "error", fmt.Sprintf("restore encountered error: %v", err),
260+
)
261+
}); err != nil {
262+
log.Warningf(ctx, "failed to record job error message: %v", err)
263+
}
248264
}
249-
prevPersistedSpans = currPersistedSpans
265+
266+
prevPersistedSpans = maybeResetRetry(ctx, resumer, &r, prevPersistedSpans)
250267

251268
// Fail fast if no progress has been made after a certain number of retries.
252269
if r.CurrentAttempt() >= maxRestoreRetryFastFail &&
@@ -281,8 +298,9 @@ func getRetryOptionsAndProgressThreshold(execCtx sql.JobExecContext) (retry.Opti
281298
// event that some progress has been made.
282299
maxDuration := restoreRetryMaxDuration.Get(&execCtx.ExecCfg().Settings.SV)
283300
retryOpts := retry.Options{
284-
MaxBackoff: 5 * time.Minute,
285-
MaxDuration: maxDuration,
301+
InitialBackoff: 50 * time.Millisecond,
302+
MaxBackoff: 5 * time.Minute,
303+
MaxDuration: maxDuration,
286304
}
287305
var progThreshold float32 = restoreRetryProgressThreshold
288306
if knobs := execCtx.ExecCfg().BackupRestoreTestingKnobs; knobs != nil {
@@ -297,6 +315,26 @@ func getRetryOptionsAndProgressThreshold(execCtx sql.JobExecContext) (retry.Opti
297315
return retryOpts, progThreshold
298316
}
299317

318+
// maybeResetRetry checks on the progress of the restore job and resets the
319+
// retry loop if progress has been made. It returns the latest progress.
320+
func maybeResetRetry(
321+
ctx context.Context,
322+
resumer *restoreResumer,
323+
rt *retry.Retry,
324+
prevProgress jobspb.RestoreFrontierEntries,
325+
) jobspb.RestoreFrontierEntries {
326+
// Check if retry counter should be reset if progress was made.
327+
var currProgress jobspb.RestoreFrontierEntries = resumer.job.
328+
Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint
329+
if !currProgress.Equal(prevProgress) {
330+
// If the previous persisted spans are different than the current, it
331+
// implies that further progress has been persisted.
332+
rt.Reset()
333+
log.Infof(ctx, "restored frontier has advanced since last retry, resetting retry counter")
334+
}
335+
return currProgress
336+
}
337+
300338
type storeByLocalityKV map[string]cloudpb.ExternalStorage
301339

302340
func makeBackupLocalityMap(
@@ -387,7 +425,8 @@ func restore(
387425
requiredSpans,
388426
restoreCheckpoint,
389427
restoreCheckpointMaxBytes.Get(&execCtx.ExecCfg().Settings.SV),
390-
endTime)
428+
endTime,
429+
)
391430
if err != nil {
392431
return emptyRowCount, err
393432
}

pkg/backup/restore_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,68 @@ func TestRestoreRetryFastFails(t *testing.T) {
206206
require.Equal(t, expFastFailAttempts, attempts)
207207
})
208208
}
209+
210+
func TestRestoreJobMessages(t *testing.T) {
211+
defer leaktest.AfterTest(t)()
212+
defer log.Scope(t).Close(t)
213+
214+
mu := struct {
215+
syncutil.Mutex
216+
retryCount int
217+
}{}
218+
testKnobs := &sql.BackupRestoreTestingKnobs{
219+
RestoreDistSQLRetryPolicy: &retry.Options{
220+
InitialBackoff: time.Microsecond,
221+
Multiplier: 1,
222+
MaxBackoff: time.Microsecond,
223+
// We want enough messages to be logged so that we can verify the count,
224+
// so we set MaxDuration long enough so that it doesn't get inadvertently
225+
// triggered.
226+
MaxDuration: 5 * time.Minute,
227+
},
228+
RunAfterRestoreFlow: func() error {
229+
mu.Lock()
230+
defer mu.Unlock()
231+
mu.retryCount++
232+
return syscall.ECONNRESET
233+
},
234+
}
235+
var params base.TestClusterArgs
236+
params.ServerArgs.Knobs.BackupRestore = testKnobs
237+
238+
const numAccounts = 1
239+
_, sqlDB, _, cleanupFn := backuptestutils.StartBackupRestoreTestCluster(
240+
t, singleNode, backuptestutils.WithParams(params), backuptestutils.WithBank(numAccounts),
241+
)
242+
defer cleanupFn()
243+
244+
sqlDB.Exec(t, "SET CLUSTER SETTING restore.retry_log_rate = '1000ms'")
245+
sqlDB.Exec(t, "BACKUP DATABASE data INTO 'nodelocal://1/backup'")
246+
247+
var restoreJobID jobspb.JobID
248+
sqlDB.QueryRow(
249+
t, `RESTORE DATABASE data FROM LATEST IN 'nodelocal://1/backup'
250+
WITH detached, new_db_name = 'restored_data'`,
251+
).Scan(&restoreJobID)
252+
253+
// We need to cancel the restore job or else it will block the test from
254+
// completing on Engflow.
255+
defer sqlDB.QueryRow(t, "CANCEL JOB $1", restoreJobID)
256+
257+
testutils.SucceedsSoon(t, func() error {
258+
var numErrMessages int
259+
sqlDB.QueryRow(
260+
t, `SELECT count(*) FROM system.job_message WHERE job_id = $1 AND kind = $2`,
261+
restoreJobID, "error",
262+
).Scan(&numErrMessages)
263+
if numErrMessages < 2 {
264+
return errors.Newf("waiting for at least 2 retries to be logged")
265+
}
266+
mu.Lock()
267+
defer mu.Unlock()
268+
// Since we throttle the frequency of error messages, we expect there to be
269+
// more retries than the number of error messages logged.
270+
require.Greater(t, mu.retryCount, numErrMessages)
271+
return nil
272+
})
273+
}

0 commit comments

Comments
 (0)