Skip to content

Commit b7c5d15

Browse files
craig[bot]tbg
andcommitted
Merge #145417
145417: kvserver,snaprecv: move MultiSSTWriter r=tbg a=tbg This series of commits moves the multiSSTWriter to newly created package `kvserver/kvstorage/snaprecv`. The plan is that it will be joined by the code extracted in #145328, so that we can build a datadriven test that exercises the snapshot receive path's storage logic end to end. Epic: CRDB-46488 Release note: none Co-authored-by: Tobias Grieger <[email protected]>
2 parents c0ae46f + cc9cfb5 commit b7c5d15

12 files changed

+248
-167
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ ALL_TESTS = [
266266
"//pkg/kv/kvserver/kvflowcontrol/node_rac2:node_rac2_test",
267267
"//pkg/kv/kvserver/kvflowcontrol/rac2:rac2_test",
268268
"//pkg/kv/kvserver/kvflowcontrol/replica_rac2:replica_rac2_test",
269+
"//pkg/kv/kvserver/kvstorage/snaprecv:snaprecv_test",
269270
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
270271
"//pkg/kv/kvserver/leases:leases_test",
271272
"//pkg/kv/kvserver/liveness:liveness_test",
@@ -1542,6 +1543,8 @@ GO_TARGETS = [
15421543
"//pkg/kv/kvserver/kvflowcontrol:kvflowcontrol",
15431544
"//pkg/kv/kvserver/kvserverbase:kvserverbase",
15441545
"//pkg/kv/kvserver/kvserverpb:kvserverpb",
1546+
"//pkg/kv/kvserver/kvstorage/snaprecv:snaprecv",
1547+
"//pkg/kv/kvserver/kvstorage/snaprecv:snaprecv_test",
15451548
"//pkg/kv/kvserver/kvstorage:kvstorage",
15461549
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
15471550
"//pkg/kv/kvserver/leases:leases",

pkg/kv/kvserver/BUILD.bazel

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ go_library(
2222
"merge_queue.go",
2323
"metric_rules.go",
2424
"metrics.go",
25-
"multi_sst_writer.go",
2625
"mvcc_gc_queue.go",
2726
"queue.go",
2827
"queue_helpers_testutil.go",
@@ -77,7 +76,6 @@ go_library(
7776
"replica_read.go",
7877
"replica_send.go",
7978
"replica_split_load.go",
80-
"replica_sst_snapshot_storage.go",
8179
"replica_store_liveness.go",
8280
"replica_store_liveness_sleep.go",
8381
"replica_tscache.go",
@@ -163,6 +161,7 @@ go_library(
163161
"//pkg/kv/kvserver/kvserverbase",
164162
"//pkg/kv/kvserver/kvserverpb",
165163
"//pkg/kv/kvserver/kvstorage",
164+
"//pkg/kv/kvserver/kvstorage/snaprecv",
166165
"//pkg/kv/kvserver/leases",
167166
"//pkg/kv/kvserver/liveness",
168167
"//pkg/kv/kvserver/liveness/livenesspb",
@@ -255,7 +254,6 @@ go_library(
255254
"@com_github_cockroachdb_pebble//:pebble",
256255
"@com_github_cockroachdb_pebble//objstorage",
257256
"@com_github_cockroachdb_pebble//objstorage/remote",
258-
"@com_github_cockroachdb_pebble//rangedel",
259257
"@com_github_cockroachdb_pebble//rangekey",
260258
"@com_github_cockroachdb_pebble//sstable/block",
261259
"@com_github_cockroachdb_pebble//vfs",
@@ -363,7 +361,6 @@ go_test(
363361
"replica_rankings_test.go",
364362
"replica_sideload_test.go",
365363
"replica_split_load_test.go",
366-
"replica_sst_snapshot_storage_test.go",
367364
"replica_store_liveness_test.go",
368365
"replica_test.go",
369366
"replica_tscache_test.go",
@@ -567,8 +564,6 @@ go_test(
567564
"@com_github_cockroachdb_errors//oserror",
568565
"@com_github_cockroachdb_logtags//:logtags",
569566
"@com_github_cockroachdb_pebble//:pebble",
570-
"@com_github_cockroachdb_pebble//rangekey",
571-
"@com_github_cockroachdb_pebble//sstable",
572567
"@com_github_cockroachdb_pebble//vfs",
573568
"@com_github_cockroachdb_redact//:redact",
574569
"@com_github_dustin_go_humanize//:go-humanize",
@@ -585,7 +580,6 @@ go_test(
585580
"@org_golang_google_grpc//metadata",
586581
"@org_golang_x_sync//errgroup",
587582
"@org_golang_x_sync//syncmap",
588-
"@org_golang_x_time//rate",
589583
],
590584
)
591585

pkg/kv/kvserver/client_merge_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3910,7 +3910,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
39103910
writer := storage.MakeIngestionSSTWriter(ctx, st, file)
39113911
if i < len(keySpans)-1 {
39123912
// The last span is the MVCC span, and is always cleared via Excise.
3913-
// See multiSSTWriter.
3913+
// See MultiSSTWriter.
39143914
if err := writer.ClearRawRange(span.Key, span.EndKey, true /* pointKeys */, true /* rangeKeys */); err != nil {
39153915
return err
39163916
}

pkg/kv/kvserver/kv_snapshot_strategy.go

Lines changed: 13 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010

1111
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/snaprecv"
1213
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
1314
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
1415
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -43,7 +44,7 @@ type kvBatchSnapshotStrategy struct {
4344
// before flushing to disk. Only used on the receiver side.
4445
sstChunkSize int64
4546
// Only used on the receiver side.
46-
scratch *SSTSnapshotStorageScratch
47+
scratch *snaprecv.SSTSnapshotStorageScratch
4748
st *cluster.Settings
4849
clusterID uuid.UUID
4950
}
@@ -104,21 +105,24 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
104105
return noSnap, sendSnapshotError(ctx, s, stream, errors.New("cannot accept shared sstables"))
105106
}
106107

107-
// We rely on the last keyRange passed into multiSSTWriter being the user key
108+
// We rely on the last keyRange passed into MultiSSTWriter being the user key
108109
// span. If the sender signals that it can no longer do shared replication
109110
// (with a TransitionFromSharedToRegularReplicate = true), we will have to
110-
// switch to adding a rangedel for that span. Since multiSSTWriter acts on an
111+
// switch to adding a rangedel for that span. Since MultiSSTWriter acts on an
111112
// opaque slice of keyRanges, we just tell it to add a rangedel for the last
112113
// span. To avoid bugs, assert on the last span in keyRanges actually being
113114
// equal to the user key span.
114115
if !keyRanges[len(keyRanges)-1].Equal(header.State.Desc.KeySpan().AsRawSpanWithNoLocals()) {
115-
return noSnap, errors.AssertionFailedf("last span in multiSSTWriter did not equal the user key span: %s", keyRanges[len(keyRanges)-1].String())
116+
return noSnap, errors.AssertionFailedf("last span in MultiSSTWriter did not equal the user key span: %s", keyRanges[len(keyRanges)-1].String())
116117
}
117118

118119
// The last key range is the user key span.
119120
localRanges := keyRanges[:len(keyRanges)-1]
120121
mvccRange := keyRanges[len(keyRanges)-1]
121-
msstw, err := newMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, localRanges, mvccRange, kvSS.sstChunkSize)
122+
msstw, err := snaprecv.NewMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, localRanges, mvccRange, snaprecv.MultiSSTWriterOptions{
123+
SSTChunkSize: kvSS.sstChunkSize,
124+
MaxSSTSize: MaxSnapshotSSTableSize.Get(&kvSS.st.SV),
125+
})
122126
if err != nil {
123127
return noSnap, err
124128
}
@@ -168,9 +172,9 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
168172
// All batch operations are guaranteed to be point key or range key puts.
169173
for batchReader.Next() {
170174
// TODO(lyang24): maybe avoid decoding engine key twice.
171-
// msstw calls (i.e. PutInternalPointKey) can use the decoded engine key here as input.
175+
// msstw calls (i.e. putInternalPointKey) can use the decoded engine key here as input.
172176

173-
bytesEstimate := msstw.estimatedDataSize()
177+
bytesEstimate := msstw.EstimatedDataSize()
174178
delta := bytesEstimate - prevBytesEstimate
175179
// Calling nil pacer is a noop.
176180
if err := pacer.Pace(ctx, delta, false /* final */); err != nil {
@@ -189,7 +193,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
189193
}
190194
}
191195

192-
if err := kvSS.readOneToBatch(ctx, ek, header.SharedReplicate, batchReader, msstw); err != nil {
196+
if err := msstw.ReadOne(ctx, ek, header.SharedReplicate, batchReader); err != nil {
193197
return noSnap, err
194198
}
195199
}
@@ -238,8 +242,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
238242
// we must still construct SSTs with range deletion tombstones to remove
239243
// the data.
240244
timingTag.start("sst")
241-
dataSize, err := msstw.Finish(ctx)
242-
sstSize := msstw.sstSize
245+
dataSize, sstSize, err := msstw.Finish(ctx)
243246
if err != nil {
244247
return noSnap, errors.Wrapf(err, "finishing sst for raft snapshot")
245248
}
@@ -286,79 +289,6 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
286289
}
287290
}
288291

289-
func (kvSS *kvBatchSnapshotStrategy) readOneToBatch(
290-
ctx context.Context,
291-
ek storage.EngineKey,
292-
shared bool, // may receive shared SSTs
293-
batchReader *storage.BatchReader,
294-
msstw *multiSSTWriter,
295-
) error {
296-
switch batchReader.KeyKind() {
297-
case pebble.InternalKeyKindSet, pebble.InternalKeyKindSetWithDelete:
298-
if err := msstw.Put(ctx, ek, batchReader.Value()); err != nil {
299-
return errors.Wrapf(err, "writing sst for raft snapshot")
300-
}
301-
case pebble.InternalKeyKindDelete, pebble.InternalKeyKindDeleteSized:
302-
if !shared {
303-
return errors.AssertionFailedf("unexpected batch entry key kind %d", batchReader.KeyKind())
304-
}
305-
if err := msstw.PutInternalPointKey(ctx, batchReader.Key(), batchReader.KeyKind(), nil); err != nil {
306-
return errors.Wrapf(err, "writing sst for raft snapshot")
307-
}
308-
case pebble.InternalKeyKindRangeDelete:
309-
if !shared {
310-
return errors.AssertionFailedf("unexpected batch entry key kind %d", batchReader.KeyKind())
311-
}
312-
start := batchReader.Key()
313-
end, err := batchReader.EndKey()
314-
if err != nil {
315-
return err
316-
}
317-
if err := msstw.PutInternalRangeDelete(ctx, start, end); err != nil {
318-
return errors.Wrapf(err, "writing sst for raft snapshot")
319-
}
320-
321-
case pebble.InternalKeyKindRangeKeyUnset, pebble.InternalKeyKindRangeKeyDelete:
322-
if !shared {
323-
return errors.AssertionFailedf("unexpected batch entry key kind %d", batchReader.KeyKind())
324-
}
325-
start := batchReader.Key()
326-
end, err := batchReader.EndKey()
327-
if err != nil {
328-
return err
329-
}
330-
rangeKeys, err := batchReader.RawRangeKeys()
331-
if err != nil {
332-
return err
333-
}
334-
for _, rkv := range rangeKeys {
335-
err := msstw.PutInternalRangeKey(ctx, start, end, rkv)
336-
if err != nil {
337-
return errors.Wrapf(err, "writing sst for raft snapshot")
338-
}
339-
}
340-
case pebble.InternalKeyKindRangeKeySet:
341-
start := ek
342-
end, err := batchReader.EngineEndKey()
343-
if err != nil {
344-
return err
345-
}
346-
rangeKeys, err := batchReader.EngineRangeKeys()
347-
if err != nil {
348-
return err
349-
}
350-
for _, rkv := range rangeKeys {
351-
err := msstw.PutRangeKey(ctx, start.Key, end.Key, rkv.Version, rkv.Value)
352-
if err != nil {
353-
return errors.Wrapf(err, "writing sst for raft snapshot")
354-
}
355-
}
356-
default:
357-
return errors.AssertionFailedf("unexpected batch entry key kind %d", batchReader.KeyKind())
358-
}
359-
return nil
360-
}
361-
362292
// Send implements the snapshotStrategy interface.
363293
func (kvSS *kvBatchSnapshotStrategy) Send(
364294
ctx context.Context,
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "snaprecv",
5+
srcs = [
6+
"multi_sst_writer.go",
7+
"sst_snapshot_storage.go",
8+
],
9+
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/snaprecv",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//pkg/kv/kvserver/kvserverbase",
13+
"//pkg/roachpb",
14+
"//pkg/settings/cluster",
15+
"//pkg/storage",
16+
"//pkg/storage/fs",
17+
"//pkg/util/syncutil",
18+
"//pkg/util/uuid",
19+
"@com_github_cockroachdb_errors//:errors",
20+
"@com_github_cockroachdb_pebble//:pebble",
21+
"@com_github_cockroachdb_pebble//objstorage",
22+
"@com_github_cockroachdb_pebble//rangedel",
23+
"@com_github_cockroachdb_pebble//rangekey",
24+
"@com_github_cockroachdb_pebble//vfs",
25+
"@org_golang_x_time//rate",
26+
],
27+
)
28+
29+
go_test(
30+
name = "snaprecv_test",
31+
srcs = ["sst_snapshot_storage_test.go"],
32+
data = glob(["testdata/**"]),
33+
embed = [":snaprecv"],
34+
deps = [
35+
"//pkg/keys",
36+
"//pkg/kv/kvserver/rditer",
37+
"//pkg/roachpb",
38+
"//pkg/settings/cluster",
39+
"//pkg/storage",
40+
"//pkg/storage/fs",
41+
"//pkg/storage/mvccencoding",
42+
"//pkg/testutils",
43+
"//pkg/testutils/datapathutils",
44+
"//pkg/testutils/echotest",
45+
"//pkg/testutils/storageutils",
46+
"//pkg/util/hlc",
47+
"//pkg/util/leaktest",
48+
"//pkg/util/log",
49+
"//pkg/util/timeutil",
50+
"//pkg/util/uuid",
51+
"@com_github_cockroachdb_errors//:errors",
52+
"@com_github_cockroachdb_errors//oserror",
53+
"@com_github_cockroachdb_pebble//:pebble",
54+
"@com_github_cockroachdb_pebble//rangekey",
55+
"@com_github_cockroachdb_pebble//sstable",
56+
"@com_github_cockroachdb_pebble//vfs",
57+
"@com_github_cockroachdb_redact//:redact",
58+
"@com_github_stretchr_testify//require",
59+
"@org_golang_x_time//rate",
60+
],
61+
)

0 commit comments

Comments
 (0)