Skip to content

Commit 3d6e6a1

Browse files
committed
db: allow ingesting local SSTs with values in blob files
Allow ingesting local ssts and their associated blob files. Note that we do not validate blob value handles within a table. Each blob file is assumed to be valid for and fully referenced by the SST.
1 parent 606bcd5 commit 3d6e6a1

File tree

9 files changed

+723
-85
lines changed

9 files changed

+723
-85
lines changed

ingest.go

Lines changed: 253 additions & 65 deletions
Large diffs are not rendered by default.

ingest_test.go

Lines changed: 259 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sync/atomic"
2323
"testing"
2424
"time"
25+
"unicode"
2526

2627
"github.com/cockroachdb/crlib/crstrings"
2728
"github.com/cockroachdb/datadriven"
@@ -42,6 +43,7 @@ import (
4243
"github.com/cockroachdb/pebble/sstable"
4344
"github.com/cockroachdb/pebble/sstable/block"
4445
"github.com/cockroachdb/pebble/sstable/colblk"
46+
"github.com/cockroachdb/pebble/valsep"
4547
"github.com/cockroachdb/pebble/vfs"
4648
"github.com/cockroachdb/pebble/vfs/errorfs"
4749
"github.com/kr/pretty"
@@ -206,7 +208,7 @@ func TestIngestLoadRand(t *testing.T) {
206208
TableMetadata: &manifest.TableMetadata{
207209
TableNum: base.TableNum(pending[i]),
208210
},
209-
path: sstPaths[i],
211+
local: LocalSST{Path: sstPaths[i]},
210212
}
211213

212214
func() {
@@ -275,6 +277,144 @@ func TestIngestLoadRand(t *testing.T) {
275277
require.Equal(t, expected, lr.local)
276278
}
277279

280+
// TestIngestLocalWithBlobs tests the ingestion of local sstables with blobs.
281+
// Commands:
282+
// - define: defines the database
283+
// - write-table: writes an external table using valsep.SSTBlobWriter
284+
// - ingest: ingests the tables into the database
285+
func TestIngestLocalWithBlobs(t *testing.T) {
286+
keySchema := colblk.DefaultKeySchema(testkeys.Comparer, 16)
287+
ctx := context.Background()
288+
var db *DB
289+
defer func() {
290+
if db != nil {
291+
require.NoError(t, db.Close())
292+
}
293+
}()
294+
fileCount := 0
295+
fs := vfs.NewMem()
296+
reset := func() {
297+
if db != nil {
298+
require.NoError(t, db.Close())
299+
}
300+
fileCount = 0
301+
fs = vfs.NewMem()
302+
}
303+
datadriven.RunTest(t, "testdata/ingest_with_blobs", func(t *testing.T, td *datadriven.TestData) string {
304+
switch td.Cmd {
305+
case "define":
306+
reset()
307+
var err error
308+
db, err = runDBDefineCmd(td, &Options{
309+
Comparer: testkeys.Comparer,
310+
FS: fs,
311+
FormatMajorVersion: internalFormatNewest,
312+
})
313+
require.NoError(t, err)
314+
return ""
315+
case "write-table":
316+
sstWriterOpts := sstable.WriterOptions{
317+
Comparer: testkeys.Comparer,
318+
KeySchema: &keySchema,
319+
TableFormat: sstable.TableFormatMax,
320+
}
321+
sstFileName := td.CmdArgs[0].Key
322+
if sstFileName == "" {
323+
return "missing file name argument"
324+
}
325+
var valueSeparationMinSize, mvccGarbageValueSeparationMinSize int
326+
td.MaybeScanArgs(t, "value-separation-min-size", &valueSeparationMinSize)
327+
td.MaybeScanArgs(t, "mvcc-value-separation-min-size", &mvccGarbageValueSeparationMinSize)
328+
var blobPaths []string
329+
writerOpts := valsep.SSTBlobWriterOptions{
330+
SSTWriterOpts: sstWriterOpts,
331+
ValueSeparationMinSize: valueSeparationMinSize,
332+
MVCCGarbageValueSeparationMinSize: mvccGarbageValueSeparationMinSize,
333+
}
334+
writerOpts.NewBlobFileFn = func() (objstorage.Writable, error) {
335+
fnum := fileCount
336+
path := fmt.Sprintf("blob%d", fnum)
337+
blobPaths = append(blobPaths, path)
338+
f, err := fs.Create(path, vfs.WriteCategoryUnspecified)
339+
require.NoError(t, err)
340+
w := objstorageprovider.NewFileWritable(f)
341+
fileCount++
342+
return w, err
343+
}
344+
sstFile, err := fs.Create(sstFileName, vfs.WriteCategoryUnspecified)
345+
require.NoError(t, err)
346+
sstHandle := objstorageprovider.NewFileWritable(sstFile)
347+
require.NoError(t, err)
348+
writer := valsep.NewSSTBlobWriter(sstHandle, writerOpts)
349+
writerClosed := false
350+
defer func() {
351+
if !writerClosed {
352+
_ = writer.Close()
353+
}
354+
}()
355+
kvs, err := sstable.ParseTestKVsAndSpans(td.Input, nil)
356+
require.NoError(t, err)
357+
require.NoError(t, valsep.HandleTestKVs(writer, kvs))
358+
writerClosed = true
359+
if err := writer.Close(); err != nil {
360+
return err.Error()
361+
}
362+
return fmt.Sprintf("sst: %s\nblobs: %s", sstFileName, strings.Join(blobPaths, ","))
363+
case "ingest":
364+
if len(td.CmdArgs) == 0 {
365+
return "no sst files provided for ingestion"
366+
367+
}
368+
// Each argument key is an SST file path, and its values are the associated
369+
// blob file paths, if any.
370+
// Ex: ingest sst1=blob1,blob2 sst2=blob3 sst3
371+
var localTables LocalSSTables
372+
for _, arg := range td.CmdArgs {
373+
if arg.Key == "excise-span" {
374+
continue
375+
}
376+
sstPath := arg.Key
377+
var blobPaths []string
378+
// The values are the blob file paths for this SST.
379+
// Each value may contain comma-separated blob file paths.
380+
for _, val := range arg.Vals {
381+
// Split by comma to handle comma-separated blob paths
382+
paths := strings.FieldsFunc(val, func(r rune) bool {
383+
return r == ',' || unicode.IsSpace(r)
384+
})
385+
blobPaths = append(blobPaths, paths...)
386+
}
387+
localTables = append(localTables, LocalSST{
388+
Path: sstPath,
389+
BlobPaths: blobPaths,
390+
})
391+
}
392+
if len(localTables) == 0 {
393+
return "no sst files provided for ingestion"
394+
}
395+
396+
var exciseSpan KeyRange
397+
var exciseStr string
398+
td.MaybeScanArgs(t, "excise-span", &exciseStr)
399+
if exciseStr != "" {
400+
fields := strings.Split(exciseStr, "-")
401+
if len(fields) != 2 {
402+
return fmt.Sprintf("malformed excise span: %s", exciseStr)
403+
}
404+
exciseSpan.Start = []byte(fields[0])
405+
exciseSpan.End = []byte(fields[1])
406+
}
407+
_, err := db.IngestLocal(ctx, localTables, exciseSpan)
408+
if err != nil {
409+
return err.Error()
410+
}
411+
return describeLSM(db, true /* verbose */)
412+
default:
413+
return "unknown command: " + td.Cmd
414+
}
415+
})
416+
}
417+
278418
func TestIngestLoadInvalid(t *testing.T) {
279419
mem := vfs.NewMem()
280420
f, err := mem.Create("invalid", vfs.WriteCategoryUnspecified)
@@ -293,6 +433,43 @@ func TestIngestLoadInvalid(t *testing.T) {
293433
}
294434
}
295435

436+
func TestIngestLocalErrors(t *testing.T) {
437+
ctx := context.Background()
438+
439+
t.Run("ReadOnlyDB", func(t *testing.T) {
440+
fs := vfs.NewMem()
441+
442+
// First create a database
443+
opts := &Options{FS: fs}
444+
db, err := Open("test_db", opts)
445+
require.NoError(t, err)
446+
require.NoError(t, db.Close())
447+
448+
// Then open it in read-only mode
449+
opts.ReadOnly = true
450+
db, err = Open("test_db", opts)
451+
require.NoError(t, err)
452+
defer func() { require.NoError(t, db.Close()) }()
453+
454+
_, err = db.IngestLocal(ctx, LocalSSTables{LocalSST{Path: "test.sst"}}, KeyRange{})
455+
require.ErrorIs(t, err, ErrReadOnly)
456+
})
457+
458+
t.Run("InvalidExciseSpan", func(t *testing.T) {
459+
fs := vfs.NewMem()
460+
opts := &Options{FS: fs, Comparer: testkeys.Comparer}
461+
db, err := Open("", opts)
462+
require.NoError(t, err)
463+
defer func() { require.NoError(t, db.Close()) }()
464+
465+
localTables := LocalSSTables{LocalSST{Path: "test.sst"}}
466+
exciseSpan := KeyRange{Start: []byte("a@1"), End: []byte("z")}
467+
_, err = db.IngestLocal(ctx, localTables, exciseSpan)
468+
require.Error(t, err)
469+
require.Contains(t, err.Error(), "suffixed start key")
470+
})
471+
}
472+
296473
func TestIngestSortAndVerify(t *testing.T) {
297474
comparers := map[string]Compare{
298475
"default": DefaultComparer.Compare,
@@ -327,7 +504,7 @@ func TestIngestSortAndVerify(t *testing.T) {
327504
m.InitPhysicalBacking()
328505
meta = append(meta, ingestLocalMeta{
329506
TableMetadata: m,
330-
path: strconv.Itoa(i),
507+
local: LocalSST{Path: strconv.Itoa(i)},
331508
})
332509
}
333510
lr := ingestLoadResult{local: meta}
@@ -336,7 +513,7 @@ func TestIngestSortAndVerify(t *testing.T) {
336513
return fmt.Sprintf("%v\n", err)
337514
}
338515
for i := range meta {
339-
fmt.Fprintf(&buf, "%s: %v-%v\n", meta[i].path, meta[i].Smallest(), meta[i].Largest())
516+
fmt.Fprintf(&buf, "%s: %v-%v\n", meta[i].local.Path, meta[i].Smallest(), meta[i].Largest())
340517
}
341518
return buf.String()
342519

@@ -366,11 +543,11 @@ func TestIngestLink(t *testing.T) {
366543
meta := make([]ingestLocalMeta, 10)
367544
contents := make([][]byte, len(meta))
368545
for j := range meta {
369-
meta[j].path = fmt.Sprintf("external%d", j)
546+
meta[j].local.Path = fmt.Sprintf("external%d", j)
370547
meta[j].TableMetadata = &manifest.TableMetadata{}
371548
meta[j].TableNum = base.TableNum(j)
372549
meta[j].InitPhysicalBacking()
373-
f, err := opts.FS.Create(meta[j].path, vfs.WriteCategoryUnspecified)
550+
f, err := opts.FS.Create(meta[j].local.Path, vfs.WriteCategoryUnspecified)
374551
require.NoError(t, err)
375552

376553
contents[j] = []byte(fmt.Sprintf("data%d", j))
@@ -382,7 +559,7 @@ func TestIngestLink(t *testing.T) {
382559
}
383560

384561
if i < count {
385-
opts.FS.Remove(meta[i].path)
562+
opts.FS.Remove(meta[i].local.Path)
386563
}
387564

388565
err = ingestLinkLocal(context.Background(), 0 /* jobID */, opts, objProvider, meta)
@@ -453,7 +630,7 @@ func TestIngestLinkFallback(t *testing.T) {
453630

454631
meta := &manifest.TableMetadata{TableNum: 1}
455632
meta.InitPhysicalBacking()
456-
err = ingestLinkLocal(context.Background(), 0, opts, objProvider, []ingestLocalMeta{{TableMetadata: meta, path: "source"}})
633+
err = ingestLinkLocal(context.Background(), 0, opts, objProvider, []ingestLocalMeta{{TableMetadata: meta, local: LocalSST{Path: "source"}}})
457634
require.NoError(t, err)
458635

459636
dest, err := mem.Open("000001.sst")
@@ -2750,9 +2927,12 @@ func TestIngestCleanup(t *testing.T) {
27502927
fns := []base.TableNum{0, 1, 2}
27512928

27522929
testCases := []struct {
2753-
closeFiles []base.TableNum
2754-
cleanupFiles []base.TableNum
2755-
wantErr string
2930+
closeFiles []base.TableNum
2931+
cleanupFiles []base.TableNum
2932+
closeBlobFiles []base.DiskFileNum
2933+
cleanupBlobFiles []base.DiskFileNum // blob files linked for last table, but table not linked yet
2934+
cleanupMetaBlobFiles map[base.TableNum][]base.DiskFileNum // blob files per table in meta
2935+
wantErr string
27562936
}{
27572937
// Close and remove all files.
27582938
{
@@ -2777,6 +2957,22 @@ func TestIngestCleanup(t *testing.T) {
27772957
cleanupFiles: []base.TableNum{0, 1, 2, 3},
27782958
wantErr: oserror.ErrInvalid.Error(), // The first error encountered is due to the open file.
27792959
},
2960+
// Remove with stray blob files.
2961+
{
2962+
closeFiles: []base.TableNum{},
2963+
cleanupFiles: []base.TableNum{},
2964+
closeBlobFiles: []base.DiskFileNum{10},
2965+
cleanupBlobFiles: []base.DiskFileNum{10},
2966+
},
2967+
// Remove blob files in meta.
2968+
{
2969+
closeFiles: fns,
2970+
cleanupFiles: fns,
2971+
closeBlobFiles: []base.DiskFileNum{10},
2972+
cleanupMetaBlobFiles: map[base.TableNum][]base.DiskFileNum{
2973+
0: {10},
2974+
},
2975+
},
27802976
}
27812977

27822978
for _, tc := range testCases {
@@ -2787,16 +2983,32 @@ func TestIngestCleanup(t *testing.T) {
27872983
require.NoError(t, err)
27882984
defer objProvider.Close()
27892985

2790-
// Create the files in the VFS.
2986+
// Create the table files in the VFS.
27912987
metaMap := make(map[base.TableNum]objstorage.Writable)
27922988
for _, fn := range fns {
27932989
w, _, err := objProvider.Create(t.Context(), base.FileTypeTable, base.PhysicalTableDiskFileNum(fn), objstorage.CreateOptions{})
27942990
require.NoError(t, err)
2795-
27962991
metaMap[fn] = w
27972992
}
27982993

2799-
// Close a select number of files.
2994+
// Create the blob files in the VFS.
2995+
blobMetaMap := make(map[base.DiskFileNum]objstorage.Writable)
2996+
allBlobFiles := make(map[base.DiskFileNum]struct{})
2997+
for _, bfn := range tc.cleanupBlobFiles {
2998+
allBlobFiles[bfn] = struct{}{}
2999+
}
3000+
for _, blobFiles := range tc.cleanupMetaBlobFiles {
3001+
for _, bfn := range blobFiles {
3002+
allBlobFiles[bfn] = struct{}{}
3003+
}
3004+
}
3005+
for bfn := range allBlobFiles {
3006+
w, _, err := objProvider.Create(t.Context(), base.FileTypeBlob, bfn, objstorage.CreateOptions{})
3007+
require.NoError(t, err)
3008+
blobMetaMap[bfn] = w
3009+
}
3010+
3011+
// Close a select number of table files.
28003012
for _, m := range tc.closeFiles {
28013013
w, ok := metaMap[m]
28023014
if !ok {
@@ -2805,15 +3017,47 @@ func TestIngestCleanup(t *testing.T) {
28053017
require.NoError(t, w.Finish())
28063018
}
28073019

3020+
// Close a select number of blob files.
3021+
for _, bfn := range tc.closeBlobFiles {
3022+
w, ok := blobMetaMap[bfn]
3023+
if !ok {
3024+
continue
3025+
}
3026+
require.NoError(t, w.Finish())
3027+
}
3028+
28083029
// Cleanup the set of files in the FS.
28093030
var toRemove []ingestLocalMeta
28103031
for _, fn := range tc.cleanupFiles {
28113032
m := &manifest.TableMetadata{TableNum: fn}
28123033
m.InitPhysicalBacking()
2813-
toRemove = append(toRemove, ingestLocalMeta{TableMetadata: m})
3034+
meta := ingestLocalMeta{TableMetadata: m}
3035+
if blobFiles, ok := tc.cleanupMetaBlobFiles[fn]; ok {
3036+
meta.blobFiles = make([]manifest.BlobFileMetadata, len(blobFiles))
3037+
for i, bfn := range blobFiles {
3038+
meta.blobFiles[i] = manifest.BlobFileMetadata{
3039+
FileID: base.BlobFileID(bfn),
3040+
Physical: &manifest.PhysicalBlobFile{
3041+
FileNum: bfn,
3042+
},
3043+
}
3044+
}
3045+
}
3046+
toRemove = append(toRemove, meta)
3047+
}
3048+
3049+
// Create stray blob files that are provided as the last arg for cleanup.
3050+
var blobFiles []manifest.BlobFileMetadata
3051+
for _, bfn := range tc.cleanupBlobFiles {
3052+
blobFiles = append(blobFiles, manifest.BlobFileMetadata{
3053+
FileID: base.BlobFileID(bfn),
3054+
Physical: &manifest.PhysicalBlobFile{
3055+
FileNum: bfn,
3056+
},
3057+
})
28143058
}
28153059

2816-
err = ingestCleanup(objProvider, toRemove)
3060+
err = ingestCleanup(objProvider, toRemove, blobFiles)
28173061
if tc.wantErr != "" {
28183062
require.Error(t, err, "got no error, expected %s", tc.wantErr)
28193063
require.Contains(t, err.Error(), tc.wantErr)

0 commit comments

Comments
 (0)