@@ -66,6 +66,7 @@ import (
6666 "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
6767 "github.com/cockroachdb/cockroach/pkg/util/envutil"
6868 "github.com/cockroachdb/cockroach/pkg/util/hlc"
69+ "github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
6970 "github.com/cockroachdb/cockroach/pkg/util/interval"
7071 "github.com/cockroachdb/cockroach/pkg/util/log"
7172 "github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
@@ -406,7 +407,7 @@ func restore(
406407 return sendAddRemoteSSTs (
407408 ctx ,
408409 execCtx ,
409- job . ID () ,
410+ job ,
410411 dataToRestore ,
411412 endTime ,
412413 encryption ,
@@ -1540,14 +1541,19 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
15401541}
15411542
15421543func (r * restoreResumer ) doResume (ctx context.Context , execCtx interface {}) error {
1543- details := r .job .Details ().(jobspb.RestoreDetails )
15441544 p := execCtx .(sql.JobExecContext )
15451545 r .execCfg = p .ExecCfg ()
15461546
1547+ details := r .job .Details ().(jobspb.RestoreDetails )
1548+
15471549 if err := maybeRelocateJobExecution (ctx , r .job .ID (), p , details .ExecutionLocality , "RESTORE" ); err != nil {
15481550 return err
15491551 }
15501552
1553+ if len (details .DownloadSpans ) > 0 {
1554+ return r .doDownloadFiles (ctx , p )
1555+ }
1556+
15511557 mem := p .ExecCfg ().RootMemoryMonitor .MakeBoundAccount ()
15521558 defer mem .Close (ctx )
15531559
@@ -2400,6 +2406,13 @@ func (r *restoreResumer) OnFailOrCancel(
24002406 p := execCtx .(sql.JobExecContext )
24012407 r .execCfg = p .ExecCfg ()
24022408
2409+ details := r .job .Details ().(jobspb.RestoreDetails )
2410+
2411+ // If this is a download-only job, there's no cleanup to do on cancel.
2412+ if len (details .DownloadSpans ) > 0 {
2413+ return nil
2414+ }
2415+
24032416 // Emit to the event log that the job has started reverting.
24042417 emitRestoreJobEvent (ctx , p , jobs .StatusReverting , r .job )
24052418
@@ -2416,7 +2429,6 @@ func (r *restoreResumer) OnFailOrCancel(
24162429 return err
24172430 }
24182431
2419- details := r .job .Details ().(jobspb.RestoreDetails )
24202432 logutil .LogJobCompletion (ctx , restoreJobEventType , r .job .ID (), false , jobErr , r .restoreStats .Rows )
24212433
24222434 execCfg := execCtx .(sql.JobExecContext ).ExecCfg ()
@@ -3141,7 +3153,7 @@ var onlineRestoreGate = envutil.EnvOrDefaultBool("COCKROACH_UNSAFE_RESTORE", fal
31413153func sendAddRemoteSSTs (
31423154 ctx context.Context ,
31433155 execCtx sql.JobExecContext ,
3144- jobID jobspb. JobID ,
3156+ job * jobs. Job ,
31453157 dataToRestore restorationData ,
31463158 restoreTime hlc.Timestamp ,
31473159 encryption * jobspb.BackupEncryptionOptions ,
@@ -3225,7 +3237,23 @@ func sendAddRemoteSSTs(
32253237 }
32263238 }
32273239 }
3228- return nil
3240+
3241+ downloadSpans := dataToRestore .getSpans ()
3242+
3243+ log .Infof (ctx , "creating job to track downloads in %d spans" , len (downloadSpans ))
3244+ downloadJobRecord := jobs.Record {
3245+ Description : fmt .Sprintf ("Background Data Download for %s" , job .Payload ().Description ),
3246+ Username : job .Payload ().UsernameProto .Decode (),
3247+ Details : jobspb.RestoreDetails {DownloadSpans : downloadSpans },
3248+ Progress : jobspb.RestoreProgress {},
3249+ }
3250+
3251+ return execCtx .ExecCfg ().InternalDB .DescsTxn (ctx , func (
3252+ ctx context.Context , txn descs.Txn ,
3253+ ) error {
3254+ _ , err := execCtx .ExecCfg ().JobRegistry .CreateJobWithTxn (ctx , downloadJobRecord , job .ID ()+ 1 , txn )
3255+ return err
3256+ })
32293257}
32303258
32313259var _ jobs.Resumer = & restoreResumer {}
@@ -3242,3 +3270,94 @@ func init() {
32423270 jobs .UsesTenantCostControl ,
32433271 )
32443272}
3273+
3274+ func (r * restoreResumer ) doDownloadFiles (ctx context.Context , execCtx sql.JobExecContext ) error {
3275+ details := r .job .Details ().(jobspb.RestoreDetails )
3276+ total := r .job .Progress ().Details .(* jobspb.Progress_Restore ).Restore .TotalDownloadRequired
3277+
3278+ // If this is the first resumption of this job, we need to find out the total
3279+ // amount we expect to download and persist it so that we can indiciate our
3280+ // progress as that number goes down later.
3281+ if total == 0 {
3282+ log .Infof (ctx , "calculating total download size (across all stores) to complete restore" )
3283+ if err := r .job .NoTxn ().RunningStatus (ctx , func (_ context.Context , _ jobspb.Details ) (jobs.RunningStatus , error ) {
3284+ return jobs .RunningStatus ("Calculating total download size..." ), nil
3285+ }); err != nil {
3286+ return errors .Wrapf (err , "failed to update running status of job %d" , errors .Safe (r .job .ID ()))
3287+ }
3288+
3289+ for _ , span := range details .DownloadSpans {
3290+ resp , err := execCtx .ExecCfg ().TenantStatusServer .SpanStats (ctx , & roachpb.SpanStatsRequest {
3291+ Spans : []roachpb.Span {span },
3292+ })
3293+ if err != nil {
3294+ return err
3295+ }
3296+ for _ , stats := range resp .SpanToStats {
3297+ total += stats .ExternalFileBytes
3298+ }
3299+ }
3300+
3301+ if total == 0 {
3302+ return nil
3303+ }
3304+
3305+ if err := r .job .NoTxn ().FractionProgressed (ctx , func (ctx context.Context , details jobspb.ProgressDetails ) float32 {
3306+ prog := details .(* jobspb.Progress_Restore ).Restore
3307+ prog .TotalDownloadRequired = total
3308+ return 0.0
3309+ }); err != nil {
3310+ return err
3311+ }
3312+
3313+ if err := r .job .NoTxn ().RunningStatus (ctx , func (_ context.Context , _ jobspb.Details ) (jobs.RunningStatus , error ) {
3314+ return jobs .RunningStatus (fmt .Sprintf ("Downloading %s of restored data..." , sz (total ))), nil
3315+ }); err != nil {
3316+ return errors .Wrapf (err , "failed to update running status of job %d" , errors .Safe (r .job .ID ()))
3317+ }
3318+ }
3319+
3320+ var lastProgressUpdate time.Time
3321+ for rt := retry .StartWithCtx (
3322+ ctx , retry.Options {InitialBackoff : time .Second * 10 , Multiplier : 1.2 , MaxBackoff : time .Minute * 5 },
3323+ ); ; rt .Next () {
3324+
3325+ var remaining uint64
3326+ for _ , span := range details .DownloadSpans {
3327+ resp , err := execCtx .ExecCfg ().TenantStatusServer .SpanStats (ctx , & roachpb.SpanStatsRequest {
3328+ Spans : []roachpb.Span {span },
3329+ })
3330+ if err != nil {
3331+ return err
3332+ }
3333+ for _ , stats := range resp .SpanToStats {
3334+ remaining += stats .ExternalFileBytes
3335+ }
3336+ }
3337+
3338+ fractionComplete := float32 (total - remaining ) / float32 (total )
3339+ log .Infof (ctx , "restore download phase, %s downloaded, %s remaining of %s total (%.1f complete)" ,
3340+ sz (total - remaining ), sz (remaining ), sz (total ), fractionComplete ,
3341+ )
3342+
3343+ if remaining == 0 {
3344+ return nil
3345+ }
3346+
3347+ if timeutil .Since (lastProgressUpdate ) > time .Minute {
3348+ if err := r .job .NoTxn ().FractionProgressed (ctx , func (ctx context.Context , details jobspb.ProgressDetails ) float32 {
3349+ return fractionComplete
3350+ }); err != nil {
3351+ return err
3352+ }
3353+ lastProgressUpdate = timeutil .Now ()
3354+ }
3355+ }
3356+ }
3357+
3358+ type sz int64
3359+
3360+ func (b sz ) String () string { return string (humanizeutil .IBytes (int64 (b ))) }
3361+
3362+ // TODO(dt): move this to humanizeutil and allow-list it there.
3363+ //func (b sz) SafeValue() {}
0 commit comments