Skip to content

Commit 265d36e

Browse files
craig[bot]pav-kv
andcommitted
Merge #149499
149499: kvserver: clean up prepareSnapApply r=arulajmani a=pav-kv This PR introduces a builder type which prepares a set of writes when applying a snapshot. This removes extensive lists of parameters dragged across function calls, and wraps repetitive SST creation code into a helper method. It also fixes a TODO in `TestPrepareSnapApply`: all the writes are now printed in the original order, thanks to the `storage.Writer` abstraction instead of direct SST generation code. The new `storage.Writer`-based interface is more flexible, and e.g. allows eliminating the `ConvertFilesToBatchAndCommit` [quirk](https://github.com/cockroachdb/cockroach/blob/255f60e14a03060b9b2ca81e38f7188c33173a7a/pkg/kv/kvserver/replica_raftstorage.go#L640) as well as the "cleared spans" tracking. We can know when the snapshot is ["simple/small"](https://github.com/cockroachdb/cockroach/blob/255f60e14a03060b9b2ca81e38f7188c33173a7a/pkg/kv/kvserver/replica_raftstorage.go#L624-L627) before `MultiSSTWriter` starts writing files (e.g. from the handshake), so we can generate a batch directly and skip the files writing phase; this batch can then be extended with `prepareSnapApply` data and committed. This would make small snapshot ingestions faster. Epic: CRDB-49111 Co-authored-by: Pavel Kalinnikov <[email protected]>
2 parents 73cc435 + 0240c1d commit 265d36e

File tree

8 files changed

+142
-206
lines changed

8 files changed

+142
-206
lines changed

pkg/kv/kvserver/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ go_test(
450450
"//pkg/kv/kvserver/load",
451451
"//pkg/kv/kvserver/lockspanset",
452452
"//pkg/kv/kvserver/logstore",
453+
"//pkg/kv/kvserver/print",
453454
"//pkg/kv/kvserver/protectedts",
454455
"//pkg/kv/kvserver/protectedts/ptpb",
455456
"//pkg/kv/kvserver/protectedts/ptstorage",

pkg/kv/kvserver/kvstorage/snaprecv/sst_snapshot_storage.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1616
"github.com/cockroachdb/cockroach/pkg/roachpb"
17+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1718
"github.com/cockroachdb/cockroach/pkg/storage"
1819
"github.com/cockroachdb/cockroach/pkg/storage/fs"
1920
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -53,14 +54,15 @@ func NewSSTSnapshotStorage(engine storage.Engine, limiter *rate.Limiter) SSTSnap
5354
// NewScratchSpace creates a new storage scratch space for SSTs for a specific
5455
// snapshot.
5556
func (s *SSTSnapshotStorage) NewScratchSpace(
56-
rangeID roachpb.RangeID, snapUUID uuid.UUID,
57+
rangeID roachpb.RangeID, snapUUID uuid.UUID, st *cluster.Settings,
5758
) *SSTSnapshotStorageScratch {
5859
s.mu.Lock()
5960
s.mu.rangeRefCount[rangeID]++
6061
s.mu.Unlock()
6162
snapDir := filepath.Join(s.dir, strconv.Itoa(int(rangeID)), snapUUID.String())
6263
return &SSTSnapshotStorageScratch{
6364
storage: s,
65+
st: st,
6466
rangeID: rangeID,
6567
snapDir: snapDir,
6668
}
@@ -98,6 +100,7 @@ func (s *SSTSnapshotStorage) scratchClosed(rangeID roachpb.RangeID) {
98100
// snapshot.
99101
type SSTSnapshotStorageScratch struct {
100102
storage *SSTSnapshotStorage
103+
st *cluster.Settings
101104
rangeID roachpb.RangeID
102105
ssts []string
103106
snapDir string
@@ -138,21 +141,34 @@ func (s *SSTSnapshotStorageScratch) NewFile(
138141
return f, nil
139142
}
140143

141-
// WriteSST writes SST data to a file. The method closes
142-
// the provided SST when it is finished using it. If the provided SST is empty,
143-
// then no file will be created and nothing will be written.
144-
func (s *SSTSnapshotStorageScratch) WriteSST(ctx context.Context, data []byte) error {
144+
// WriteSST creates an SST populated with the given write function, and writes
145+
// it to a file. Does nothing if no data is written.
146+
func (s *SSTSnapshotStorageScratch) WriteSST(
147+
ctx context.Context, write func(context.Context, storage.Writer) error,
148+
) error {
145149
if s.closed {
146150
return errors.AssertionFailedf("SSTSnapshotStorageScratch closed")
147151
}
148-
if len(data) == 0 {
152+
153+
// TODO(itsbilal): Write to SST directly rather than buffer in a MemObject.
154+
sstFile := &storage.MemObject{}
155+
w := storage.MakeIngestionSSTWriter(ctx, s.st, sstFile)
156+
defer w.Close()
157+
if err := write(ctx, &w); err != nil {
158+
return err
159+
}
160+
if err := w.Finish(); err != nil {
161+
return err
162+
}
163+
if w.DataSize == 0 {
149164
return nil
150165
}
166+
151167
f, err := s.NewFile(ctx, 512<<10 /* 512 KB */)
152168
if err != nil {
153169
return err
154170
}
155-
if err := f.Write(data); err != nil {
171+
if err := f.Write(sstFile.Data()); err != nil {
156172
f.Abort()
157173
return err
158174
}

pkg/kv/kvserver/kvstorage/snaprecv/sst_snapshot_storage_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func TestSSTSnapshotStorage(t *testing.T) {
5757
defer eng.Close()
5858

5959
sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)
60-
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID)
60+
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID, nil)
6161

6262
// Check that the storage lazily creates the directories on first write.
6363
_, err := eng.Env().Stat(scratch.snapDir)
@@ -139,7 +139,7 @@ func TestSSTSnapshotStorageConcurrentRange(t *testing.T) {
139139
sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)
140140

141141
runForSnap := func(snapUUID uuid.UUID) error {
142-
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, snapUUID)
142+
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, snapUUID, nil)
143143

144144
// Check that the storage lazily creates the directories on first write.
145145
_, err := eng.Env().Stat(scratch.snapDir)
@@ -242,7 +242,7 @@ func TestSSTSnapshotStorageContextCancellation(t *testing.T) {
242242
defer eng.Close()
243243

244244
sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)
245-
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID)
245+
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID, nil)
246246

247247
var cancel func()
248248
ctx, cancel = context.WithCancel(ctx)
@@ -283,7 +283,7 @@ func testMultiSSTWriterInitSSTInner(t *testing.T, interesting bool) {
283283
defer eng.Close()
284284

285285
sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)
286-
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID)
286+
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID, nil)
287287
desc := roachpb.RangeDescriptor{
288288
RangeID: 100,
289289
StartKey: roachpb.RKey("d"),
@@ -415,8 +415,8 @@ func TestMultiSSTWriterSize(t *testing.T) {
415415
defer eng.Close()
416416

417417
sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)
418-
ref := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID)
419-
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID)
418+
ref := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID, nil)
419+
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID, nil)
420420
settings := cluster.MakeTestingClusterSettings()
421421

422422
desc := roachpb.RangeDescriptor{
@@ -533,7 +533,7 @@ func TestMultiSSTWriterAddLastSpan(t *testing.T) {
533533
defer eng.Close()
534534

535535
sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)
536-
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID)
536+
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID, nil)
537537
desc := roachpb.RangeDescriptor{
538538
StartKey: roachpb.RKey("d"),
539539
EndKey: roachpb.RKeyMax,

pkg/kv/kvserver/replica_raftstorage.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,6 @@ func (r *Replica) applySnapshotRaftMuLocked(
561561
Index: kvpb.RaftIndex(nonemptySnap.Metadata.Index),
562562
Term: kvpb.RaftTerm(nonemptySnap.Metadata.Term),
563563
}
564-
clearedSpans := inSnap.clearedSpans
565564

566565
subsumedDescs := make([]*roachpb.RangeDescriptor, 0, len(subsumedRepls))
567566
for _, sr := range subsumedRepls {
@@ -583,11 +582,9 @@ func (r *Replica) applySnapshotRaftMuLocked(
583582
subsumedDescs = append(subsumedDescs, sr.Desc())
584583
}
585584

586-
st := r.ClusterSettings()
587-
prepInput := prepareSnapApplyInput{
585+
sb := snapWriteBuilder{
588586
id: r.ID(),
589587

590-
st: st,
591588
todoEng: r.store.TODOEngine(),
592589
sl: r.raftMu.stateLoader,
593590
writeSST: inSnap.SSTStorageScratch.WriteSST,
@@ -596,15 +593,13 @@ func (r *Replica) applySnapshotRaftMuLocked(
596593
hardState: hs,
597594
desc: desc,
598595
subsumedDescs: subsumedDescs,
599-
}
600596

601-
_ = applySnapshotTODO
602-
clearedUnreplicatedSpan, clearedSubsumedSpans, err := prepareSnapApply(ctx, prepInput)
603-
if err != nil {
597+
cleared: inSnap.clearedSpans,
598+
}
599+
_ = applySnapshotTODO // 2.4 is written, the rest is handled below
600+
if err := sb.prepareSnapApply(ctx); err != nil {
604601
return err
605602
}
606-
clearedSpans = append(clearedSpans, clearedUnreplicatedSpan)
607-
clearedSpans = append(clearedSpans, clearedSubsumedSpans...)
608603

609604
ls := r.asLogStorage()
610605

@@ -622,7 +617,7 @@ func (r *Replica) applySnapshotRaftMuLocked(
622617
}
623618

624619
if len(inSnap.externalSSTs)+len(inSnap.sharedSSTs) == 0 && /* simple */
625-
inSnap.SSTSize <= snapshotIngestAsWriteThreshold.Get(&st.SV) /* small */ {
620+
inSnap.SSTSize <= snapshotIngestAsWriteThreshold.Get(&r.ClusterSettings().SV) /* small */ {
626621
applyAsIngest = false
627622
}
628623

@@ -638,7 +633,7 @@ func (r *Replica) applySnapshotRaftMuLocked(
638633
} else {
639634
_ = applySnapshotTODO // all atomic
640635
err := r.store.TODOEngine().ConvertFilesToBatchAndCommit(
641-
ctx, inSnap.SSTStorageScratch.SSTs(), clearedSpans)
636+
ctx, inSnap.SSTStorageScratch.SSTs(), sb.cleared)
642637
if err != nil {
643638
return errors.Wrapf(err, "while applying as batch %s", inSnap.SSTStorageScratch.SSTs())
644639
}

0 commit comments

Comments
 (0)