@@ -63,6 +63,7 @@ import (
63
63
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
64
64
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
65
65
"github.com/cockroachdb/cockroach/pkg/sql/stats"
66
+ "github.com/cockroachdb/cockroach/pkg/util"
66
67
bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk"
67
68
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
68
69
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -91,20 +92,31 @@ var (
91
92
settings .WithVisibility (settings .Reserved ),
92
93
settings .PositiveDuration ,
93
94
)
95
+
96
+ restoreRetryLogRate = settings .RegisterDurationSetting (
97
+ settings .ApplicationLevel ,
98
+ "restore.retry_log_rate" ,
99
+ "maximum rate at which retryable restore errors are logged to the job messages table" ,
100
+ 5 * time .Minute ,
101
+ settings .WithVisibility (settings .Reserved ),
102
+ settings .PositiveDuration ,
103
+ )
94
104
)
95
105
96
- // restoreStatsInsertBatchSize is an arbitrarily chosen value of the number of
97
- // tables we process in a single txn when restoring their table statistics.
98
- const restoreStatsInsertBatchSize = 10
106
+ const (
107
+ // restoreStatsInsertBatchSize is an arbitrarily chosen value of the number of
108
+ // tables we process in a single txn when restoring their table statistics.
109
+ restoreStatsInsertBatchSize = 10
99
110
100
- // maxRestoreRetryFastFail is the maximum number of times we will retry without
101
- // seeing any progress before fast-failing the restore job .
102
- const maxRestoreRetryFastFail = 5
111
+ // maxRestoreRetryFastFail is the maximum number of times we will retry before
112
+ // exceeding the restoreRetryProgressThreshold .
113
+ maxRestoreRetryFastFail = 5
103
114
104
- // restoreRetryProgressThreshold is the fraction of the job that must
105
- // be _exceeded_ before we no longer fast fail the restore job after hitting the
106
- // maxRestoreRetryFastFail threshold.
107
- const restoreRetryProgressThreshold = 0
115
+ // restoreRetryProgressThreshold is the fraction of the job that must
116
+ // be _exceeded_ before we no longer fast fail the restore job after hitting the
117
+ // maxRestoreRetryFastFail threshold.
118
+ restoreRetryProgressThreshold = 0
119
+ )
108
120
109
121
var restoreStatsInsertionConcurrency = settings .RegisterIntSetting (
110
122
settings .ApplicationLevel ,
@@ -198,10 +210,12 @@ func restoreWithRetry(
198
210
// We want to retry a restore if there are transient failures (i.e. worker nodes
199
211
// dying), so if we receive a retryable error, re-plan and retry the restore.
200
212
retryOpts , progThreshold := getRetryOptionsAndProgressThreshold (execCtx )
213
+ logRate := restoreRetryLogRate .Get (& execCtx .ExecCfg ().Settings .SV )
214
+ logThrottler := util .Every (logRate )
201
215
var (
202
- res roachpb.RowCount
203
- err error
204
- currPersistedSpans , prevPersistedSpans jobspb.RestoreFrontierEntries
216
+ res roachpb.RowCount
217
+ err error
218
+ prevPersistedSpans jobspb.RestoreFrontierEntries
205
219
)
206
220
for r := retry .StartWithCtx (ctx , retryOpts ); r .Next (); {
207
221
res , err = restore (
@@ -237,16 +251,19 @@ func restoreWithRetry(
237
251
238
252
log .Warningf (ctx , "encountered retryable error: %+v" , err )
239
253
240
- // Check if retry counter should be reset if progress was made.
241
- currPersistedSpans = resumer .job .
242
- Progress ().Details .(* jobspb.Progress_Restore ).Restore .Checkpoint
243
- if ! currPersistedSpans .Equal (prevPersistedSpans ) {
244
- // If the previous persisted spans are different than the current, it
245
- // implies that further progress has been persisted.
246
- r .Reset ()
247
- log .Dev .Infof (ctx , "restored frontier has advanced since last retry, resetting retry counter" )
254
+ if logThrottler .ShouldProcess (timeutil .Now ()) {
255
+ // We throttle the logging of errors to the jobs messages table to avoid
256
+ // flooding the table during the hot loop of a retry.
257
+ if err := execCtx .ExecCfg ().InternalDB .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
258
+ return resumer .job .Messages ().Record (
259
+ ctx , txn , "error" , fmt .Sprintf ("restore encountered error: %v" , err ),
260
+ )
261
+ }); err != nil {
262
+ log .Warningf (ctx , "failed to record job error message: %v" , err )
263
+ }
248
264
}
249
- prevPersistedSpans = currPersistedSpans
265
+
266
+ prevPersistedSpans = maybeResetRetry (ctx , resumer , & r , prevPersistedSpans )
250
267
251
268
// Fail fast if no progress has been made after a certain number of retries.
252
269
if r .CurrentAttempt () >= maxRestoreRetryFastFail &&
@@ -281,8 +298,9 @@ func getRetryOptionsAndProgressThreshold(execCtx sql.JobExecContext) (retry.Opti
281
298
// event that some progress has been made.
282
299
maxDuration := restoreRetryMaxDuration .Get (& execCtx .ExecCfg ().Settings .SV )
283
300
retryOpts := retry.Options {
284
- MaxBackoff : 5 * time .Minute ,
285
- MaxDuration : maxDuration ,
301
+ InitialBackoff : 50 * time .Millisecond ,
302
+ MaxBackoff : 5 * time .Minute ,
303
+ MaxDuration : maxDuration ,
286
304
}
287
305
var progThreshold float32 = restoreRetryProgressThreshold
288
306
if knobs := execCtx .ExecCfg ().BackupRestoreTestingKnobs ; knobs != nil {
@@ -297,6 +315,26 @@ func getRetryOptionsAndProgressThreshold(execCtx sql.JobExecContext) (retry.Opti
297
315
return retryOpts , progThreshold
298
316
}
299
317
318
+ // maybeResetRetry checks on the progress of the restore job and resets the
319
+ // retry loop if progress has been made. It returns the latest progress.
320
+ func maybeResetRetry (
321
+ ctx context.Context ,
322
+ resumer * restoreResumer ,
323
+ rt * retry.Retry ,
324
+ prevProgress jobspb.RestoreFrontierEntries ,
325
+ ) jobspb.RestoreFrontierEntries {
326
+ // Check if retry counter should be reset if progress was made.
327
+ var currProgress jobspb.RestoreFrontierEntries = resumer .job .
328
+ Progress ().Details .(* jobspb.Progress_Restore ).Restore .Checkpoint
329
+ if ! currProgress .Equal (prevProgress ) {
330
+ // If the previous persisted spans are different than the current, it
331
+ // implies that further progress has been persisted.
332
+ rt .Reset ()
333
+ log .Infof (ctx , "restored frontier has advanced since last retry, resetting retry counter" )
334
+ }
335
+ return currProgress
336
+ }
337
+
300
338
type storeByLocalityKV map [string ]cloudpb.ExternalStorage
301
339
302
340
func makeBackupLocalityMap (
@@ -387,7 +425,8 @@ func restore(
387
425
requiredSpans ,
388
426
restoreCheckpoint ,
389
427
restoreCheckpointMaxBytes .Get (& execCtx .ExecCfg ().Settings .SV ),
390
- endTime )
428
+ endTime ,
429
+ )
391
430
if err != nil {
392
431
return emptyRowCount , err
393
432
}
0 commit comments