Skip to content

Commit 5acb999

Browse files
craig[bot]dtRui HuerikgrinakerDrewKimball
committed
107145: backupccl: add RESTORE job to wait for background downloads r=dt a=dt After an online restore completes its initial data mounting phase, we actually go download the data in the background, both so that it will be faster to access once it is stored locally but also to make it permanently owned by the cluster, rather than remaining linked to the backup, which could be moved, expire, etc. Users need to be able to see, observe and control this process, in particular they need to know when it completes or will complete, so that they know when to expect normal performance or when they can lift modification/expiration blocks on their backups. This change introduces new RESTORE job, created by the initial restore job after it has linked in the external sstables, which polls the stores to determine how much data remains to be downloaded, and completes when this number reaches zero. A future change will likely extend this job to also explicitly invoke a compaction specifically configured to download these files, to make this happen more immediately and to make it controllable by the job, but for now this initial implementation only waits and polls to track the state of the compaction, rather than actively compacting itself. Release note: none. Epic: none. 107187: cloud: fix incorrect rebase of ResumingReader open at size patch r=rhu713 a=rhu713 Recently cockroachdb#103462 was merged after incorrectly rebasing and did not correctly incorporate the addition of the NoFileSize read option for external storages. This patch fixes the rebase in GCS storage and in cloud unit tests. Fixes: cockroachdb#107136 Fixes: cockroachdb#107138 Cloud Unit Test Manual Run: https://teamcity.cockroachdb.com/buildConfiguration/Cockroach_Nightlies_CloudUnitTests/10977689 Release note: None 107203: kvserver: enable `raft.Config.StepDownOnRemoval` r=erikgrinaker a=erikgrinaker **kvserver: tweak `TestRaftLeaderRemovesItself`** **kvserver: campaign when leader is demoted to learner** This patch campaigns when the leader demotes itself to a learner during an atomic conf change, instead of waiting until it is completely removed from the range. Relying on the old leader to commit the final conf change while it's a learner appears unfortunate, and is not compatible with an upcoming etcd/raft change that makes the learner step down in this case. This is backwards compatible with 23.1, because the same follower replica is responsible for campaigning, and it does not matter if it campaigns during the demotion or removal -- in particular because it forces an immediate election via `forceCampaignLocked()` which immediately bumps the term. **kvserver: enable `raft.Config.StepDownOnRemoval`** This causes the Raft leader to step down when it removes itself from the range or demotes itself to a learner. This doesn't make any difference functionally, since we're careful to no longer tick the replica or otherwise use it, but in principle it could cause a leader that was no longer part of the range to continue to assert leadership, stalling the range since it wouldn't be able to propose anything. Stepping down appears safer and cleaner. Epic: none Release note: None 107240: upgrade/upgrades: skip TestJSONForwardingIndexes r=mgartner a=DrewKimball Refs: cockroachdb#107169 Reason: flaky test Generated by bin/skip-test. Release justification: non-production code changes Release note: None Epic: None Co-authored-by: David Taylor <[email protected]> Co-authored-by: Rui Hu <[email protected]> Co-authored-by: Erik Grinaker <[email protected]> Co-authored-by: Drew Kimball <[email protected]>
5 parents 98cd2f0 + 16dd972 + ef4563b + 4dcbdcd + adeb529 commit 5acb999

File tree

11 files changed

+179
-19
lines changed

11 files changed

+179
-19
lines changed

pkg/ccl/backupccl/restore_job.go

Lines changed: 124 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ import (
6666
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
6767
"github.com/cockroachdb/cockroach/pkg/util/envutil"
6868
"github.com/cockroachdb/cockroach/pkg/util/hlc"
69+
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
6970
"github.com/cockroachdb/cockroach/pkg/util/interval"
7071
"github.com/cockroachdb/cockroach/pkg/util/log"
7172
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
@@ -407,7 +408,7 @@ func restore(
407408
return sendAddRemoteSSTs(
408409
ctx,
409410
execCtx,
410-
job.ID(),
411+
job,
411412
dataToRestore,
412413
endTime,
413414
encryption,
@@ -1541,14 +1542,19 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
15411542
}
15421543

15431544
func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) error {
1544-
details := r.job.Details().(jobspb.RestoreDetails)
15451545
p := execCtx.(sql.JobExecContext)
15461546
r.execCfg = p.ExecCfg()
15471547

1548+
details := r.job.Details().(jobspb.RestoreDetails)
1549+
15481550
if err := maybeRelocateJobExecution(ctx, r.job.ID(), p, details.ExecutionLocality, "RESTORE"); err != nil {
15491551
return err
15501552
}
15511553

1554+
if len(details.DownloadSpans) > 0 {
1555+
return r.doDownloadFiles(ctx, p)
1556+
}
1557+
15521558
mem := p.ExecCfg().RootMemoryMonitor.MakeBoundAccount()
15531559
defer mem.Close(ctx)
15541560

@@ -2401,6 +2407,13 @@ func (r *restoreResumer) OnFailOrCancel(
24012407
p := execCtx.(sql.JobExecContext)
24022408
r.execCfg = p.ExecCfg()
24032409

2410+
details := r.job.Details().(jobspb.RestoreDetails)
2411+
2412+
// If this is a download-only job, there's no cleanup to do on cancel.
2413+
if len(details.DownloadSpans) > 0 {
2414+
return nil
2415+
}
2416+
24042417
// Emit to the event log that the job has started reverting.
24052418
emitRestoreJobEvent(ctx, p, jobs.StatusReverting, r.job)
24062419

@@ -2417,7 +2430,6 @@ func (r *restoreResumer) OnFailOrCancel(
24172430
return err
24182431
}
24192432

2420-
details := r.job.Details().(jobspb.RestoreDetails)
24212433
logutil.LogJobCompletion(ctx, restoreJobEventType, r.job.ID(), false, jobErr, r.restoreStats.Rows)
24222434

24232435
execCfg := execCtx.(sql.JobExecContext).ExecCfg()
@@ -3142,7 +3154,7 @@ var onlineRestoreGate = envutil.EnvOrDefaultBool("COCKROACH_UNSAFE_RESTORE", fal
31423154
func sendAddRemoteSSTs(
31433155
ctx context.Context,
31443156
execCtx sql.JobExecContext,
3145-
jobID jobspb.JobID,
3157+
job *jobs.Job,
31463158
dataToRestore restorationData,
31473159
restoreTime hlc.Timestamp,
31483160
encryption *jobspb.BackupEncryptionOptions,
@@ -3284,7 +3296,23 @@ func sendAddRemoteSSTs(
32843296
}
32853297
}
32863298
}
3287-
return nil
3299+
3300+
downloadSpans := dataToRestore.getSpans()
3301+
3302+
log.Infof(ctx, "creating job to track downloads in %d spans", len(downloadSpans))
3303+
downloadJobRecord := jobs.Record{
3304+
Description: fmt.Sprintf("Background Data Download for %s", job.Payload().Description),
3305+
Username: job.Payload().UsernameProto.Decode(),
3306+
Details: jobspb.RestoreDetails{DownloadSpans: downloadSpans},
3307+
Progress: jobspb.RestoreProgress{},
3308+
}
3309+
3310+
return execCtx.ExecCfg().InternalDB.DescsTxn(ctx, func(
3311+
ctx context.Context, txn descs.Txn,
3312+
) error {
3313+
_, err := execCtx.ExecCfg().JobRegistry.CreateJobWithTxn(ctx, downloadJobRecord, job.ID()+1, txn)
3314+
return err
3315+
})
32883316
}
32893317

32903318
var _ jobs.Resumer = &restoreResumer{}
@@ -3301,3 +3329,94 @@ func init() {
33013329
jobs.UsesTenantCostControl,
33023330
)
33033331
}
3332+
3333+
func (r *restoreResumer) doDownloadFiles(ctx context.Context, execCtx sql.JobExecContext) error {
3334+
details := r.job.Details().(jobspb.RestoreDetails)
3335+
total := r.job.Progress().Details.(*jobspb.Progress_Restore).Restore.TotalDownloadRequired
3336+
3337+
// If this is the first resumption of this job, we need to find out the total
3338+
// amount we expect to download and persist it so that we can indiciate our
3339+
// progress as that number goes down later.
3340+
if total == 0 {
3341+
log.Infof(ctx, "calculating total download size (across all stores) to complete restore")
3342+
if err := r.job.NoTxn().RunningStatus(ctx, func(_ context.Context, _ jobspb.Details) (jobs.RunningStatus, error) {
3343+
return jobs.RunningStatus("Calculating total download size..."), nil
3344+
}); err != nil {
3345+
return errors.Wrapf(err, "failed to update running status of job %d", errors.Safe(r.job.ID()))
3346+
}
3347+
3348+
for _, span := range details.DownloadSpans {
3349+
resp, err := execCtx.ExecCfg().TenantStatusServer.SpanStats(ctx, &roachpb.SpanStatsRequest{
3350+
Spans: []roachpb.Span{span},
3351+
})
3352+
if err != nil {
3353+
return err
3354+
}
3355+
for _, stats := range resp.SpanToStats {
3356+
total += stats.ExternalFileBytes
3357+
}
3358+
}
3359+
3360+
if total == 0 {
3361+
return nil
3362+
}
3363+
3364+
if err := r.job.NoTxn().FractionProgressed(ctx, func(ctx context.Context, details jobspb.ProgressDetails) float32 {
3365+
prog := details.(*jobspb.Progress_Restore).Restore
3366+
prog.TotalDownloadRequired = total
3367+
return 0.0
3368+
}); err != nil {
3369+
return err
3370+
}
3371+
3372+
if err := r.job.NoTxn().RunningStatus(ctx, func(_ context.Context, _ jobspb.Details) (jobs.RunningStatus, error) {
3373+
return jobs.RunningStatus(fmt.Sprintf("Downloading %s of restored data...", sz(total))), nil
3374+
}); err != nil {
3375+
return errors.Wrapf(err, "failed to update running status of job %d", errors.Safe(r.job.ID()))
3376+
}
3377+
}
3378+
3379+
var lastProgressUpdate time.Time
3380+
for rt := retry.StartWithCtx(
3381+
ctx, retry.Options{InitialBackoff: time.Second * 10, Multiplier: 1.2, MaxBackoff: time.Minute * 5},
3382+
); ; rt.Next() {
3383+
3384+
var remaining uint64
3385+
for _, span := range details.DownloadSpans {
3386+
resp, err := execCtx.ExecCfg().TenantStatusServer.SpanStats(ctx, &roachpb.SpanStatsRequest{
3387+
Spans: []roachpb.Span{span},
3388+
})
3389+
if err != nil {
3390+
return err
3391+
}
3392+
for _, stats := range resp.SpanToStats {
3393+
remaining += stats.ExternalFileBytes
3394+
}
3395+
}
3396+
3397+
fractionComplete := float32(total-remaining) / float32(total)
3398+
log.Infof(ctx, "restore download phase, %s downloaded, %s remaining of %s total (%.1f complete)",
3399+
sz(total-remaining), sz(remaining), sz(total), fractionComplete,
3400+
)
3401+
3402+
if remaining == 0 {
3403+
return nil
3404+
}
3405+
3406+
if timeutil.Since(lastProgressUpdate) > time.Minute {
3407+
if err := r.job.NoTxn().FractionProgressed(ctx, func(ctx context.Context, details jobspb.ProgressDetails) float32 {
3408+
return fractionComplete
3409+
}); err != nil {
3410+
return err
3411+
}
3412+
lastProgressUpdate = timeutil.Now()
3413+
}
3414+
}
3415+
}
3416+
3417+
type sz int64
3418+
3419+
func (b sz) String() string { return string(humanizeutil.IBytes(int64(b))) }
3420+
3421+
// TODO(dt): move this to humanizeutil and allow-list it there.
3422+
//func (b sz) SafeValue() {}

pkg/cloud/amazon/s3_storage_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ func TestReadFileAtReturnsSize(t *testing.T) {
631631
_, err = w.Write(data)
632632
require.NoError(t, err)
633633
require.NoError(t, w.Close())
634-
reader, _, err := s.ReadFile(ctx, file, cloud.ReadOptions{NoFileSize: true})
634+
reader, _, err := s.ReadFile(ctx, file, cloud.ReadOptions{})
635635
require.NoError(t, err)
636636

637637
rr, ok := reader.(*cloud.ResumingReader)

pkg/cloud/gcp/gcs_storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ func (g *gcsStorage) ReadFile(
294294
return nil, 0, io.EOF
295295
}
296296
}
297-
r, err := g.bucket.Object(object).NewRangeReader(ctx, pos, -1)
297+
r, err := g.bucket.Object(object).NewRangeReader(ctx, pos, length)
298298
if err != nil {
299299
return nil, 0, err
300300
}

pkg/cloud/gcp/gcs_storage_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ func TestReadFileAtReturnsSize(t *testing.T) {
480480
_, err = w.Write(data)
481481
require.NoError(t, err)
482482
require.NoError(t, w.Close())
483-
reader, _, err := s.ReadFile(ctx, file, cloud.ReadOptions{NoFileSize: true})
483+
reader, _, err := s.ReadFile(ctx, file, cloud.ReadOptions{})
484484
require.NoError(t, err)
485485

486486
rr, ok := reader.(*cloud.ResumingReader)

pkg/cloud/httpsink/http_storage_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ func TestReadFileAtReturnsSize(t *testing.T) {
486486
_, err = w.Write(data)
487487
require.NoError(t, err)
488488
require.NoError(t, w.Close())
489-
reader, _, err := s.ReadFile(ctx, file, cloud.ReadOptions{NoFileSize: true})
489+
reader, _, err := s.ReadFile(ctx, file, cloud.ReadOptions{})
490490
require.NoError(t, err)
491491

492492
rr, ok := reader.(*cloud.ResumingReader)

pkg/jobs/jobspb/jobs.proto

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,8 +479,12 @@ message RestoreDetails {
479479
roachpb.Locality execution_locality = 30 [(gogoproto.nullable) = false];
480480

481481
bool experimental_online = 31;
482+
483+
// DownloadSpans indicates this job is the download-step of a multi-step
484+
// online restore.
485+
repeated roachpb.Span download_spans = 32 [(gogoproto.nullable) = false];
482486

483-
// NEXT ID: 31.
487+
// NEXT ID: 33.
484488
}
485489

486490

@@ -496,6 +500,11 @@ message RestoreProgress {
496500
}
497501

498502
repeated FrontierEntry checkpoint = 2 [(gogoproto.nullable) = false];
503+
504+
// TotalDownloadRequired is set in the download job for an online restore, and
505+
// reflects the total amount that was initially found to be needing to be
506+
// downloaded.
507+
uint64 total_download_required = 3;
499508
}
500509

501510
message ImportDetails {

pkg/kv/kvserver/client_raft_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6547,7 +6547,7 @@ func TestRaftLeaderRemovesItself(t *testing.T) {
65476547
// Set a large election timeout. We don't want replicas to call
65486548
// elections due to timeouts, we want them to campaign and obtain
65496549
// votes despite PreVote+CheckQuorum.
6550-
RaftElectionTimeoutTicks: 200,
6550+
RaftElectionTimeoutTicks: 300,
65516551
},
65526552
Knobs: base.TestingKnobs{
65536553
Store: &kvserver.StoreTestingKnobs{
@@ -6601,6 +6601,9 @@ func TestRaftLeaderRemovesItself(t *testing.T) {
66016601
tc.RemoveVotersOrFatal(t, key, tc.Target(0))
66026602
t.Logf("n1 removed from range")
66036603

6604+
// Make sure we didn't time out on the above.
6605+
require.NoError(t, ctx.Err())
6606+
66046607
require.Eventually(t, func() bool {
66056608
logStatus(repl2.RaftStatus())
66066609
logStatus(repl3.RaftStatus())
@@ -6610,6 +6613,8 @@ func TestRaftLeaderRemovesItself(t *testing.T) {
66106613
}
66116614
return false
66126615
}, 10*time.Second, 500*time.Millisecond)
6616+
6617+
require.NoError(t, ctx.Err())
66136618
}
66146619

66156620
// TestRaftUnquiesceLeaderNoProposal tests that unquiescing a Raft leader does

pkg/kv/kvserver/replica_init.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,14 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(s kvstorage.LoadedReplicaState
255255
// initRaftGroupRaftMuLockedReplicaMuLocked initializes a Raft group for the
256256
// replica, replacing the existing Raft group if any.
257257
func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error {
258+
ctx := r.AnnotateCtx(context.Background())
258259
rg, err := raft.NewRawNode(newRaftConfig(
260+
ctx,
259261
raft.Storage((*replicaRaftStorage)(r)),
260262
uint64(r.replicaID),
261263
r.mu.state.RaftAppliedIndex,
262264
r.store.cfg,
263-
&raftLogger{ctx: r.AnnotateCtx(context.Background())},
265+
&raftLogger{ctx: ctx},
264266
))
265267
if err != nil {
266268
return err

pkg/kv/kvserver/replica_raft.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2603,9 +2603,9 @@ func ComputeRaftLogSize(
26032603
}
26042604

26052605
// shouldCampaignAfterConfChange returns true if the current replica should
2606-
// campaign after a conf change. If the leader replica is removed, the
2607-
// leaseholder should campaign. We don't want to campaign on multiple replicas,
2608-
// since that would cause ties.
2606+
// campaign after a conf change. If the leader replica is demoted or removed,
2607+
// the leaseholder should campaign. We don't want to campaign on multiple
2608+
// replicas, since that would cause ties.
26092609
//
26102610
// If there is no current leaseholder we'll have to wait out the election
26112611
// timeout before someone campaigns, but that's ok -- either we'll have to wait
@@ -2622,7 +2622,7 @@ func shouldCampaignAfterConfChange(
26222622
raftStatus raft.BasicStatus,
26232623
leaseStatus kvserverpb.LeaseStatus,
26242624
) bool {
2625-
if raftStatus.Lead == 0 {
2625+
if raftStatus.Lead == raft.None {
26262626
// Leader unknown. We can't know if it was removed by the conf change, and
26272627
// because we force an election without prevote we don't want to risk
26282628
// throwing spurious elections.
@@ -2637,9 +2637,11 @@ func shouldCampaignAfterConfChange(
26372637
// don't expect to hit this, but let's be defensive.
26382638
return false
26392639
}
2640-
if _, ok := desc.GetReplicaDescriptorByID(roachpb.ReplicaID(raftStatus.Lead)); ok {
2641-
// The leader is still in the descriptor.
2642-
return false
2640+
if replDesc, ok := desc.GetReplicaDescriptorByID(roachpb.ReplicaID(raftStatus.Lead)); ok {
2641+
if replDesc.IsAnyVoter() {
2642+
// The leader is still a voter in the descriptor.
2643+
return false
2644+
}
26432645
}
26442646
// Prior to 23.2, the first voter in the descriptor campaigned, so we do
26452647
// the same in mixed-version clusters to avoid ties.

pkg/kv/kvserver/store.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ import (
6868
"github.com/cockroachdb/cockroach/pkg/spanconfig"
6969
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore"
7070
"github.com/cockroachdb/cockroach/pkg/storage"
71+
"github.com/cockroachdb/cockroach/pkg/util"
7172
"github.com/cockroachdb/cockroach/pkg/util/admission"
7273
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
7374
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
@@ -257,6 +258,12 @@ var ExportRequestsLimit = settings.RegisterIntSetting(
257258
settings.PositiveInt,
258259
)
259260

261+
// raftStepDownOnRemoval is a metamorphic test parameter that makes Raft leaders
262+
// step down on demotion or removal. Following an upgrade, clusters may have
263+
// replicas with mixed settings, because it's only changed when initializing
264+
// replicas. Varying it makes sure we handle this state.
265+
var raftStepDownOnRemoval = util.ConstantWithMetamorphicTestBool("raft-step-down-on-removal", true)
266+
260267
// TestStoreConfig has some fields initialized with values relevant in tests.
261268
func TestStoreConfig(clock *hlc.Clock) StoreConfig {
262269
return testStoreConfig(clock, clusterversion.TestingBinaryVersion)
@@ -299,6 +306,7 @@ func testStoreConfig(clock *hlc.Clock, version roachpb.Version) StoreConfig {
299306
}
300307

301308
func newRaftConfig(
309+
ctx context.Context,
302310
strg raft.Storage,
303311
id uint64,
304312
appliedIndex kvpb.RaftIndex,
@@ -320,6 +328,20 @@ func newRaftConfig(
320328
Storage: strg,
321329
Logger: logger,
322330

331+
// StepDownOnRemoval requires 23.2. Otherwise, in a mixed-version cluster, a
332+
// 23.2 leader may step down when it demotes itself to learner, but a
333+
// designated follower (first in the range) running 23.1 will only campaign
334+
// once the leader is entirely removed from the range descriptor (see
335+
// shouldCampaignOnConfChange). This would leave the range without a leader,
336+
// having to wait out an election timeout.
337+
//
338+
// We only set this on replica initialization, so replicas without
339+
// StepDownOnRemoval may remain on 23.2 nodes until they restart. That's
340+
// totally fine, we just can't rely on this behavior until 24.1, but
341+
// we currently don't either.
342+
StepDownOnRemoval: storeCfg.Settings.Version.IsActive(ctx, clusterversion.V23_2) &&
343+
raftStepDownOnRemoval,
344+
323345
PreVote: true,
324346
CheckQuorum: storeCfg.RaftEnableCheckQuorum,
325347
}

0 commit comments

Comments
 (0)