@@ -40,6 +40,7 @@ import (
4040 "github.com/cockroachdb/cockroach/pkg/util/hlc"
4141 "github.com/cockroachdb/cockroach/pkg/util/log"
4242 "github.com/cockroachdb/cockroach/pkg/util/retry"
43+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
4344 "github.com/cockroachdb/cockroach/pkg/util/uuid"
4445 "github.com/cockroachdb/errors"
4546 "github.com/gogo/protobuf/types"
@@ -231,8 +232,8 @@ func (b *backupResumer) ResumeCompaction(
231232
232233 var backupManifest * backuppb.BackupManifest
233234 updatedDetails := initialDetails
235+ testingKnobs := execCtx .ExecCfg ().BackupRestoreTestingKnobs
234236 if initialDetails .URI == "" {
235- testingKnobs := execCtx .ExecCfg ().BackupRestoreTestingKnobs
236237 if testingKnobs != nil && testingKnobs .RunBeforeResolvingCompactionDest != nil {
237238 if err := testingKnobs .RunBeforeResolvingCompactionDest (); err != nil {
238239 return err
@@ -263,21 +264,13 @@ func (b *backupResumer) ResumeCompaction(
263264 return err
264265 }
265266
266- if err := execCtx .ExecCfg ().JobRegistry .CheckPausepoint ("backup.before.write_first_checkpoint" ); err != nil {
267- return err
268- }
269-
270267 if err := backupinfo .WriteBackupManifestCheckpoint (
271268 ctx , updatedDetails .URI , updatedDetails .EncryptionOptions , kmsEnv ,
272269 backupManifest , execCtx .ExecCfg (), execCtx .User (),
273270 ); err != nil {
274271 return err
275272 }
276273
277- if err := execCtx .ExecCfg ().JobRegistry .CheckPausepoint ("backup.after.write_first_checkpoint" ); err != nil {
278- return err
279- }
280-
281274 description := maybeUpdateJobDescription (
282275 initialDetails , updatedDetails , b .job .Payload ().Description ,
283276 )
@@ -297,7 +290,7 @@ func (b *backupResumer) ResumeCompaction(
297290 return err
298291 }
299292
300- if err := execCtx .ExecCfg ().JobRegistry .CheckPausepoint ("backup .after.details_has_checkpoint" ); err != nil {
293+ if err := execCtx .ExecCfg ().JobRegistry .CheckPausepoint ("backup_compaction .after.details_has_checkpoint" ); err != nil {
301294 return err
302295 }
303296 // TODO (kev-cao): Add telemetry for backup compactions.
@@ -347,6 +340,10 @@ func (b *backupResumer) ResumeCompaction(
347340 }
348341 }
349342
343+ if testingKnobs != nil && testingKnobs .AfterLoadingCompactionManifestOnResume != nil {
344+ testingKnobs .AfterLoadingCompactionManifestOnResume (backupManifest )
345+ }
346+
350347 // We retry on pretty generic failures -- any rpc error. If a worker node were
351348 // to restart, it would produce this kind of error, but there may be other
352349 // errors that are also rpc errors. Don't retry too aggressively.
@@ -355,13 +352,8 @@ func (b *backupResumer) ResumeCompaction(
355352 MaxRetries : 5 ,
356353 }
357354
358- if execCtx .ExecCfg ().BackupRestoreTestingKnobs != nil &&
359- execCtx .ExecCfg ().BackupRestoreTestingKnobs .BackupDistSQLRetryPolicy != nil {
360- retryOpts = * execCtx .ExecCfg ().BackupRestoreTestingKnobs .BackupDistSQLRetryPolicy
361- }
362-
363- if err := execCtx .ExecCfg ().JobRegistry .CheckPausepoint ("backup.before.flow" ); err != nil {
364- return err
355+ if testingKnobs != nil && testingKnobs .BackupDistSQLRetryPolicy != nil {
356+ retryOpts = * testingKnobs .BackupDistSQLRetryPolicy
365357 }
366358
367359 // We want to retry a backup if there are transient failures (i.e. worker nodes
@@ -389,8 +381,6 @@ func (b *backupResumer) ResumeCompaction(
389381
390382 // Reload the backup manifest to pick up any spans we may have completed on
391383 // previous attempts.
392- // TODO (kev-cao): Compactions currently do not create checkpoints, but this
393- // can be used to reload the manifest once we add checkpointing.
394384 var reloadBackupErr error
395385 mem .Shrink (ctx , memSize )
396386 backupManifest , memSize , reloadBackupErr = b .readManifestOnResume (ctx , & mem , execCtx .ExecCfg (),
@@ -765,9 +755,13 @@ func concludeBackupCompaction(
765755// the associated manifest.
766756func processProgress (
767757 ctx context.Context ,
758+ execCtx sql.JobExecContext ,
759+ details jobspb.BackupDetails ,
768760 manifest * backuppb.BackupManifest ,
769761 progCh <- chan * execinfrapb.RemoteProducerMetadata_BulkProcessorProgress ,
762+ kmsEnv cloud.KMSEnv ,
770763) error {
764+ var lastCheckpointTime time.Time
771765 // When a processor is done exporting a span, it will send a progress update
772766 // to progCh.
773767 for progress := range progCh {
@@ -780,11 +774,24 @@ func processProgress(
780774 manifest .Files = append (manifest .Files , file )
781775 manifest .EntryCounts .Add (file .EntryCounts )
782776 }
777+
783778 // TODO (kev-cao): Add per node progress updates.
779+
780+ if wroteCheckpoint , err := maybeWriteBackupCheckpoint (
781+ ctx , execCtx , details , manifest , lastCheckpointTime , kmsEnv ,
782+ ); err != nil {
783+ log .Errorf (ctx , "unable to checkpoint compaction: %+v" , err )
784+ } else if wroteCheckpoint {
785+ lastCheckpointTime = timeutil .Now ()
786+ if err := execCtx .ExecCfg ().JobRegistry .CheckPausepoint ("backup_compaction.after.write_checkpoint" ); err != nil {
787+ return err
788+ }
789+ }
784790 }
785791 return nil
786792}
787793
794+ // compactionJobDescription generates a redacted description of the job.
788795func compactionJobDescription (details jobspb.BackupDetails ) (string , error ) {
789796 fmtCtx := tree .NewFmtCtx (tree .FmtSimple )
790797 redactedURIs , err := sanitizeURIList (details .Destination .To )
@@ -835,8 +842,7 @@ func doCompaction(
835842 )
836843 }
837844 checkpointLoop := func (ctx context.Context ) error {
838- // TODO (kev-cao): Add logic for checkpointing during loop.
839- return processProgress (ctx , manifest , progCh )
845+ return processProgress (ctx , execCtx , details , manifest , progCh , kmsEnv )
840846 }
841847 // TODO (kev-cao): Add trace aggregator loop.
842848
@@ -851,6 +857,34 @@ func doCompaction(
851857 )
852858}
853859
860+ // maybeWriteBackupCheckpoint writes a checkpoint for the backup if
861+ // the time since the last checkpoint exceeds the configured interval. If a
862+ // checkpoint is written, the function returns true.
863+ func maybeWriteBackupCheckpoint (
864+ ctx context.Context ,
865+ execCtx sql.JobExecContext ,
866+ details jobspb.BackupDetails ,
867+ manifest * backuppb.BackupManifest ,
868+ lastCheckpointTime time.Time ,
869+ kmsEnv cloud.KMSEnv ,
870+ ) (bool , error ) {
871+ if details .URI == "" {
872+ return false , errors .New ("backup details does not contain a default URI" )
873+ }
874+ execCfg := execCtx .ExecCfg ()
875+ interval := BackupCheckpointInterval .Get (& execCfg .Settings .SV )
876+ if timeutil .Since (lastCheckpointTime ) < interval {
877+ return false , nil
878+ }
879+ if err := backupinfo .WriteBackupManifestCheckpoint (
880+ ctx , details .URI , details .EncryptionOptions , kmsEnv ,
881+ manifest , execCfg , execCtx .User (),
882+ ); err != nil {
883+ return false , err
884+ }
885+ return true , nil
886+ }
887+
854888func init () {
855889 builtins .StartCompactionJob = StartCompactionJob
856890}
0 commit comments