Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion blob_rewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestBlobRewrite(t *testing.T) {
} else {
ikv.V = base.MakeInPlaceValue([]byte(parts[1]))
}
require.NoError(t, vs.Add(tw, &ikv, false /* forceObsolete */, false /* isLikeyMVCCGarbage */))
require.NoError(t, vs.Add(tw, &ikv, false /* forceObsolete */, false /* isLikeyMVCCGarbage */, base.KVMeta{}))
}
return buf.String()
case "close-output":
Expand Down Expand Up @@ -320,6 +320,7 @@ func TestBlobRewriteRandomized(t *testing.T) {
},
base.ShortAttribute(0),
false, /* forceObsolete */
base.KVMeta{},
))
require.NoError(t, tw.Close())
originalValueIndices[i] = i
Expand Down
27 changes: 20 additions & 7 deletions cockroachkvs/cockroachkvs_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ func BenchmarkRandSeekInSST(b *testing.B) {
valueLen: 128, // ~200 KVs per data block
version: sstable.TableFormatPebblev7,
},
{
name: "v8/single-level",
numKeys: 200 * 100, // ~100 data blocks.
valueLen: 128, // ~200 KVs per data block
version: sstable.TableFormatPebblev8,
},
{
name: "v8/two-level",
numKeys: 200 * 5000, // ~5000 data blocks
valueLen: 128, // ~200 KVs per data block
version: sstable.TableFormatPebblev8,
},
}
keyCfg := KeyGenConfig{
PrefixAlphabetLen: 26,
Expand Down Expand Up @@ -193,7 +205,7 @@ func benchmarkCockroachDataColBlockWriter(b *testing.B, keyConfig KeyGenConfig,
_, keys, values := generateDataBlock(rng, targetBlockSize, keyConfig, valueLen)

var w colblk.DataBlockEncoder
w.Init(&KeySchema)
w.Init(colblk.ColumnFormatv1, &KeySchema)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -202,7 +214,8 @@ func benchmarkCockroachDataColBlockWriter(b *testing.B, keyConfig KeyGenConfig,
for w.Size() < targetBlockSize {
ik := base.MakeInternalKey(keys[count], base.SeqNum(rng.Uint64N(uint64(base.SeqNumMax))), base.InternalKeyKindSet)
kcmp := w.KeyWriter.ComparePrev(ik.UserKey)
w.Add(ik, values[count], block.InPlaceValuePrefix(kcmp.PrefixEqual()), kcmp, false /* isObsolete */)
w.Add(ik, values[count], block.InPlaceValuePrefix(kcmp.PrefixEqual()), kcmp,
false /* isObsolete */, base.KVMeta{})
count++
}
_, _ = w.Finish(w.Rows(), w.Size())
Expand Down Expand Up @@ -315,10 +328,10 @@ func benchmarkCockroachDataColBlockIter(

var decoder colblk.DataBlockDecoder
var it colblk.DataBlockIter
it.InitOnce(&KeySchema, &Comparer, getInternalValuer(func([]byte) base.InternalValue {
it.InitOnce(colblk.ColumnFormatv1, &KeySchema, &Comparer, getInternalValuer(func([]byte) base.InternalValue {
return base.MakeInPlaceValue([]byte("mock external value"))
}))
bd := decoder.Init(&KeySchema, serializedBlock)
bd := decoder.Init(colblk.ColumnFormatv1, &KeySchema, serializedBlock)
if err := it.Init(&decoder, bd, transforms); err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -382,19 +395,19 @@ func BenchmarkInitDataBlockMetadata(b *testing.B) {
}, 8)

var w colblk.DataBlockEncoder
w.Init(&KeySchema)
w.Init(colblk.ColumnFormatv1, &KeySchema)
for j := 0; w.Size() < targetBlockSize; j++ {
ik := base.MakeInternalKey(keys[j], base.SeqNum(rng.Uint64N(uint64(base.SeqNumMax))), base.InternalKeyKindSet)
kcmp := w.KeyWriter.ComparePrev(ik.UserKey)
vp := block.InPlaceValuePrefix(kcmp.PrefixEqual())
w.Add(ik, values[j], vp, kcmp, false /* isObsolete */)
w.Add(ik, values[j], vp, kcmp, false /* isObsolete */, base.KVMeta{})
}
finished, _ := w.Finish(w.Rows(), w.Size())

var md block.Metadata

b.ResetTimer()
for range b.N {
colblk.InitDataBlockMetadata(&KeySchema, &md, finished)
colblk.InitDataBlockMetadata(colblk.ColumnFormatv1, &KeySchema, &md, finished)
}
}
14 changes: 7 additions & 7 deletions cockroachkvs/cockroachkvs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func TestKeySchema_KeySeeker(t *testing.T) {
var bd colblk.BlockDecoder
var ks colblk.KeySeeker
var maxKeyLen int
enc.Init(&KeySchema)
enc.Init(colblk.ColumnFormatv1, &KeySchema)

initKeySeeker := func() {
ksPointer := &cockroachKeySeeker{}
Expand All @@ -231,11 +231,11 @@ func TestKeySchema_KeySeeker(t *testing.T) {
UserKey: k,
Trailer: pebble.MakeInternalKeyTrailer(0, base.InternalKeyKindSet),
}
enc.Add(ikey, k, block.InPlaceValuePrefix(false), kcmp, false /* isObsolete */)
enc.Add(ikey, k, block.InPlaceValuePrefix(false), kcmp, false /* isObsolete */, base.KVMeta{})
rows++
}
blk, _ := enc.Finish(rows, enc.Size())
bd = dec.Init(&KeySchema, blk)
bd = dec.Init(colblk.ColumnFormatv1, &KeySchema, blk)
return buf.String()
case "is-lower-bound":
initKeySeeker()
Expand Down Expand Up @@ -410,10 +410,10 @@ func testCockroachDataColBlock(t *testing.T, seed uint64, keyCfg KeyGenConfig) {

var decoder colblk.DataBlockDecoder
var it colblk.DataBlockIter
it.InitOnce(&KeySchema, &Comparer, getInternalValuer(func([]byte) base.InternalValue {
it.InitOnce(colblk.ColumnFormatv1, &KeySchema, &Comparer, getInternalValuer(func([]byte) base.InternalValue {
return base.MakeInPlaceValue([]byte("mock external value"))
}))
bd := decoder.Init(&KeySchema, serializedBlock)
bd := decoder.Init(colblk.ColumnFormatv1, &KeySchema, serializedBlock)
if err := it.Init(&decoder, bd, blockiter.Transforms{}); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -460,12 +460,12 @@ func generateDataBlock(
keys, values = RandomKVs(rng, targetBlockSize/valueLen, cfg, valueLen)

var w colblk.DataBlockEncoder
w.Init(&KeySchema)
w.Init(colblk.ColumnFormatv1, &KeySchema)
count := 0
for w.Size() < targetBlockSize {
ik := base.MakeInternalKey(keys[count], base.SeqNum(rng.Uint64N(uint64(base.SeqNumMax))), base.InternalKeyKindSet)
kcmp := w.KeyWriter.ComparePrev(ik.UserKey)
w.Add(ik, values[count], block.InPlaceValuePrefix(kcmp.PrefixEqual()), kcmp, false /* isObsolete */)
w.Add(ik, values[count], block.InPlaceValuePrefix(kcmp.PrefixEqual()), kcmp, false /* isObsolete */, base.KVMeta{})
count++
}
data, _ = w.Finish(w.Rows(), w.Size())
Expand Down
24 changes: 13 additions & 11 deletions cockroachkvs/key_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ func TestKeySchema(t *testing.T) {
func runDataDrivenTest(t *testing.T, path string) {
var blockData []byte
var e colblk.DataBlockEncoder
e.Init(&KeySchema)
colFmt := colblk.ColumnFormatv1
e.Init(colFmt, &KeySchema)
var iter colblk.DataBlockIter
iter.InitOnce(&KeySchema, &Comparer, nil)
iter.InitOnce(colFmt, &KeySchema, &Comparer, nil)

datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
Expand All @@ -52,7 +53,7 @@ func runDataDrivenTest(t *testing.T, path string) {
for _, l := range crstrings.Lines(td.Input) {
key, value := parseInternalKV(l)
kcmp := e.KeyWriter.ComparePrev(key.UserKey)
e.Add(key, value, 0, kcmp, false /* isObsolete */)
e.Add(key, value, 0, kcmp, false /* isObsolete */, base.KVMeta{})
buf = e.MaterializeLastUserKey(buf[:0])
if !Comparer.Equal(key.UserKey, buf) {
td.Fatalf(t, "incorrect MaterializeLastKey: %s instead of %s", formatUserKey(buf), formatUserKey(key.UserKey))
Expand All @@ -66,22 +67,22 @@ func runDataDrivenTest(t *testing.T, path string) {

case "describe":
var d colblk.DataBlockDecoder
bd := d.Init(&KeySchema, blockData)
bd := d.Init(colFmt, &KeySchema, blockData)
f := binfmt.New(blockData)
tp := treeprinter.New()
d.Describe(f, tp, bd)
return tp.String()

case "suffix-types":
var d colblk.DataBlockDecoder
bd := d.Init(&KeySchema, blockData)
bd := d.Init(colFmt, &KeySchema, blockData)
var ks cockroachKeySeeker
ks.init(&d, bd)
return fmt.Sprintf("suffix-types: %s", ks.suffixTypes)

case "keys":
var d colblk.DataBlockDecoder
bd := d.Init(&KeySchema, blockData)
bd := d.Init(colFmt, &KeySchema, blockData)
require.NoError(t, iter.Init(&d, bd, blockiter.Transforms{}))
defer iter.Close()
var buf bytes.Buffer
Expand All @@ -98,7 +99,7 @@ func runDataDrivenTest(t *testing.T, path string) {

case "seek":
var d colblk.DataBlockDecoder
bd := d.Init(&KeySchema, blockData)
bd := d.Init(colFmt, &KeySchema, blockData)
require.NoError(t, iter.Init(&d, bd, blockiter.Transforms{}))
defer iter.Close()
var buf strings.Builder
Expand Down Expand Up @@ -133,21 +134,22 @@ func TestKeySchema_RandomKeys(t *testing.T) {
slices.SortFunc(keys, Compare)

var enc colblk.DataBlockEncoder
enc.Init(&KeySchema)
colFmt := colblk.ColumnFormatv1
enc.Init(colFmt, &KeySchema)
for i := range keys {
ikey := pebble.InternalKey{
UserKey: keys[i],
Trailer: pebble.MakeInternalKeyTrailer(0, pebble.InternalKeyKindSet),
}
enc.Add(ikey, keys[i], block.InPlaceValuePrefix(false), enc.KeyWriter.ComparePrev(keys[i]), false /* isObsolete */)
enc.Add(ikey, keys[i], block.InPlaceValuePrefix(false), enc.KeyWriter.ComparePrev(keys[i]), false /* isObsolete */, base.KVMeta{})
}
blk, _ := enc.Finish(len(keys), enc.Size())
blk = crbytes.CopyAligned(blk)

var dec colblk.DataBlockDecoder
bd := dec.Init(&KeySchema, blk)
bd := dec.Init(colFmt, &KeySchema, blk)
var it colblk.DataBlockIter
it.InitOnce(&KeySchema, &Comparer, nil)
it.InitOnce(colFmt, &KeySchema, &Comparer, nil)
require.NoError(t, it.Init(&dec, bd, blockiter.NoTransforms))
// Ensure that a scan across the block finds all the relevant keys.
var valBuf []byte
Expand Down
10 changes: 5 additions & 5 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,13 +673,13 @@ func runBuildCmd(
if err != nil {
return err
}
if err := w.Raw().AddWithBlobHandle(tmp, handle, base.ShortAttribute(0), false); err != nil {
if err := w.Raw().AddWithBlobHandle(tmp, handle, base.ShortAttribute(0), false, base.KVMeta{}); err != nil {
return err
}
continue
}
// Otherwise add it as an ordinary value.
if err := w.Raw().Add(tmp, v, false); err != nil {
if err := w.Raw().Add(tmp, v, false, base.KVMeta{}); err != nil {
return err
}
}
Expand Down Expand Up @@ -1963,14 +1963,14 @@ func (vs *defineDBValueSeparator) EstimatedReferenceSize() uint64 {
// Add adds the provided key-value pair to the sstable, possibly separating the
// value into a blob file.
func (vs *defineDBValueSeparator) Add(
tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool, _ bool,
tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool, _ bool, kvMeta base.KVMeta,
) error {
// In datadriven tests, all defined values are in-place initially. See
// runDBDefineCmdReuseFS.
v := kv.V.InPlaceValue()
// If the value doesn't begin with "blob", don't separate it.
if !bytes.HasPrefix(v, []byte("blob")) {
return tw.Add(kv.K, v, forceObsolete)
return tw.Add(kv.K, v, forceObsolete, kvMeta)
}

// This looks like a blob reference. Parse it.
Expand All @@ -1995,7 +1995,7 @@ func (vs *defineDBValueSeparator) Add(
// Return a KV that uses the original key but our constructed blob reference.
vs.kv.K = kv.K
vs.kv.V = iv
return vs.pbr.Add(tw, &vs.kv, forceObsolete, false /* isLikelyMVCCGarbage */)
return vs.pbr.Add(tw, &vs.kv, forceObsolete, false /* isLikelyMVCCGarbage */, kvMeta)
}

// FinishOutput implements valsep.ValueSeparation.
Expand Down
3 changes: 2 additions & 1 deletion excise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ func TestConcurrentExcise(t *testing.T) {
VisitPointKey: func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val, false /* forceObsolete */))
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val,
false /* forceObsolete */, base.KVMeta{}))
return nil
},
VisitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
Expand Down
2 changes: 1 addition & 1 deletion file_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (t *fileCacheTest) newTestHandle() (*fileCacheHandle, *fileCacheTestFS) {
}
tw := sstable.NewWriter(w, sstable.WriterOptions{TableFormat: sstable.TableFormatPebblev2})
ik := base.ParseInternalKey(fmt.Sprintf("k.SET.%d", i))
if err := tw.Raw().Add(ik, xxx[:i], false); err != nil {
if err := tw.Raw().Add(ik, xxx[:i], false, base.KVMeta{}); err != nil {
t.Fatal(err)
}
if err := tw.RangeKeySet([]byte("k"), []byte("l"), nil, xxx[:i]); err != nil {
Expand Down
17 changes: 16 additions & 1 deletion format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ const (
// Previously, marking for compaction required a manifest rotation.
FormatMarkForCompactionInVersionEdit

// FormatTieredStorage is a format major version that adds support for
// tiered storage based on the age of a key-value pair. It introduces a new
// columnar block format (among other things) that is required for tracking
// the attribute used to derive the age.
FormatTieredStorage

// -- Add new versions here --

// FormatNewest is the most recent format major version.
Expand Down Expand Up @@ -293,6 +299,8 @@ func (v FormatMajorVersion) resolveDefault() FormatMajorVersion {
func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat {
v = v.resolveDefault()
switch {
case v >= FormatTieredStorage:
return sstable.TableFormatPebblev8
case v >= formatFooterAttributes:
return sstable.TableFormatPebblev7
case v >= FormatTableFormatV6:
Expand Down Expand Up @@ -398,6 +406,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{
FormatMarkForCompactionInVersionEdit: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatMarkForCompactionInVersionEdit)
},
FormatTieredStorage: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatTieredStorage)
},
}

const formatVersionMarkerName = `format-version`
Expand Down Expand Up @@ -511,7 +522,11 @@ func (d *DB) ratchetFormatMajorVersionLocked(formatVers FormatMajorVersion) erro
defer func() { d.mu.formatVers.ratcheting = false }()

for nextVers := d.FormatMajorVersion() + 1; nextVers <= formatVers; nextVers++ {
if err := formatMajorVersionMigrations[nextVers](d); err != nil {
migration, ok := formatMajorVersionMigrations[nextVers]
if !ok || migration == nil {
return errors.Errorf("pebble: no migration function defined for format version %d", nextVers)
}
if err := migration(d); err != nil {
return errors.Wrapf(err, "migrating to version %d", nextVers)
}

Expand Down
9 changes: 6 additions & 3 deletions format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ func TestFormatMajorVersionStableValues(t *testing.T) {
require.Equal(t, FormatV2BlobFiles, FormatMajorVersion(26))
require.Equal(t, FormatBackingValueSize, FormatMajorVersion(27))
require.Equal(t, FormatMarkForCompactionInVersionEdit, FormatMajorVersion(28))
require.Equal(t, FormatTieredStorage, FormatMajorVersion(29))

// When we add a new version, we should add a check for the new version above
// in addition to updating the expected values below.
require.Equal(t, FormatNewest, FormatMajorVersion(28))
require.Equal(t, internalFormatNewest, FormatMajorVersion(28))
require.Equal(t, FormatNewest, FormatMajorVersion(29))
require.Equal(t, internalFormatNewest, FormatMajorVersion(29))
}

func TestFormatMajorVersion_MigrationDefined(t *testing.T) {
Expand Down Expand Up @@ -233,6 +234,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) {
FormatV2BlobFiles: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev7},
FormatBackingValueSize: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev7},
FormatMarkForCompactionInVersionEdit: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev7},
FormatTieredStorage: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev8},
}

// Valid versions.
Expand Down Expand Up @@ -260,6 +262,7 @@ func TestFormatMajorVersions_BlobFileFormat(t *testing.T) {
FormatV2BlobFiles: blob.FileFormatV2,
FormatBackingValueSize: blob.FileFormatV2,
FormatMarkForCompactionInVersionEdit: blob.FileFormatV2,
FormatTieredStorage: blob.FileFormatV2,
}

// Valid versions.
Expand Down Expand Up @@ -297,7 +300,7 @@ func TestFormatMajorVersions_MaxTableFormat(t *testing.T) {
},
{
fmv: FormatNewest,
want: sstable.TableFormatPebblev7,
want: sstable.TableFormatPebblev8,
},
}
for _, tc := range testCases {
Expand Down
Loading
Loading