Skip to content

Commit 7d01920

Browse files
committed
db: replace preserveBlobReferences
1 parent fb71b26 commit 7d01920

File tree

4 files changed

+35
-200
lines changed

4 files changed

+35
-200
lines changed

blob_rewrite_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,20 @@ func TestBlobRewrite(t *testing.T) {
8181
bv = blobtest.Values{}
8282
switch x := d.CmdArgs[0].String(); x {
8383
case "preserve-blob-references":
84-
pbr := &preserveBlobReferences{}
8584
lines := crstrings.Lines(d.Input)
86-
pbr.inputBlobPhysicalFiles = make(map[base.BlobFileID]*manifest.PhysicalBlobFile, len(lines))
85+
inputBlobPhysicalFiles := make(map[base.BlobFileID]*manifest.PhysicalBlobFile, len(lines))
8786
for _, line := range lines {
8887
bfm, err := manifest.ParseBlobFileMetadataDebug(line)
8988
require.NoError(t, err)
9089
fn = max(fn, bfm.Physical.FileNum)
91-
pbr.inputBlobPhysicalFiles[bfm.FileID] = bfm.Physical
90+
inputBlobPhysicalFiles[bfm.FileID] = bfm.Physical
9291
}
92+
pbr := valsep.NewPreserveAllHotBlobReferences(
93+
inputBlobPhysicalFiles,
94+
0, /* outputBlobReferenceDepth */
95+
sstable.ValueSeparationDefault,
96+
0, /* original minimumSize */
97+
)
9398
vs = pbr
9499
case "write-new-blob-files":
95100
var minimumSize int

compaction_value_separation.go

Lines changed: 8 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,11 @@
55
package pebble
66

77
import (
8-
"slices"
9-
108
"github.com/cockroachdb/errors"
119
"github.com/cockroachdb/pebble/internal/base"
12-
"github.com/cockroachdb/pebble/internal/invariants"
1310
"github.com/cockroachdb/pebble/internal/manifest"
1411
"github.com/cockroachdb/pebble/objstorage"
1512
"github.com/cockroachdb/pebble/sstable"
16-
"github.com/cockroachdb/pebble/sstable/blob"
1713
"github.com/cockroachdb/pebble/valsep"
1814
"github.com/cockroachdb/redact"
1915
)
@@ -52,26 +48,26 @@ func (d *DB) determineCompactionValueSeparation(
5248
// For flushes, c.version is nil.
5349
blobFileSet = uniqueInputBlobMetadatas(&c.version.BlobFiles, c.inputs)
5450
}
55-
minSize := uint64(policy.MinimumSize)
51+
minSize := policy.MinimumSize
5652
switch valueStorage {
5753
case ValueStorageLowReadLatency:
5854
return valsep.NeverSeparateValues{}
5955
case ValueStorageLatencyTolerant:
6056
minSize = latencyTolerantMinimumSize
6157
default:
6258
}
63-
if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy, minSize); !writeBlobs {
59+
if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy, uint64(minSize)); !writeBlobs {
6460
// This compaction should preserve existing blob references.
6561
kind := sstable.ValueSeparationDefault
6662
if valueStorage != ValueStorageDefault {
6763
kind = sstable.ValueSeparationSpanPolicy
6864
}
69-
return &preserveBlobReferences{
70-
inputBlobPhysicalFiles: blobFileSet,
71-
outputBlobReferenceDepth: outputBlobReferenceDepth,
72-
minimumValueSize: int(minSize),
73-
originalValueSeparationKind: kind,
74-
}
65+
return valsep.NewPreserveAllHotBlobReferences(
66+
blobFileSet,
67+
outputBlobReferenceDepth,
68+
kind,
69+
minSize,
70+
)
7571
}
7672

7773
// This compaction should write values to new blob files.
@@ -218,172 +214,3 @@ func uniqueInputBlobMetadatas(
218214
}
219215
return m
220216
}
221-
222-
// preserveBlobReferences implements the compact.ValueSeparation interface. When
223-
// a compaction is configured with preserveBlobReferences, the compaction will
224-
// not create any new blob files. However, input references to existing blob
225-
// references will be preserved and metadata about the table's blob references
226-
// will be collected.
227-
type preserveBlobReferences struct {
228-
// inputBlobPhysicalFiles holds the *PhysicalBlobFile for every unique blob
229-
// file referenced by input sstables.
230-
inputBlobPhysicalFiles map[base.BlobFileID]*manifest.PhysicalBlobFile
231-
outputBlobReferenceDepth manifest.BlobReferenceDepth
232-
233-
// minimumValueSize is the minimum size of values used by the value separation
234-
// policy that was originally used to write the input sstables.
235-
minimumValueSize int
236-
// originalValueSeparationKind is the value separation policy that was originally used to
237-
// write the input sstables.
238-
originalValueSeparationKind sstable.ValueSeparationKind
239-
240-
// state
241-
buf []byte
242-
// currReferences holds the pending references that have been referenced by
243-
// the current output sstable. The index of a reference with a given blob
244-
// file ID is the value of the base.BlobReferenceID used by its value handles
245-
// within the output sstable.
246-
currReferences []pendingReference
247-
// totalValueSize is the sum of currReferenceValueSizes.
248-
//
249-
// INVARIANT: totalValueSize == sum(currReferenceValueSizes)
250-
totalValueSize uint64
251-
}
252-
253-
type pendingReference struct {
254-
blobFileID base.BlobFileID
255-
valueSize uint64
256-
}
257-
258-
// Assert that *preserveBlobReferences implements the compact.ValueSeparation
259-
// interface.
260-
var _ valsep.ValueSeparation = (*preserveBlobReferences)(nil)
261-
262-
// SetNextOutputConfig implements the ValueSeparation interface.
263-
func (vs *preserveBlobReferences) SetNextOutputConfig(config valsep.ValueSeparationOutputConfig) {}
264-
265-
// Kind implements the ValueSeparation interface.
266-
func (vs *preserveBlobReferences) Kind() sstable.ValueSeparationKind {
267-
return vs.originalValueSeparationKind
268-
}
269-
270-
// MinimumSize implements the ValueSeparation interface.
271-
func (vs *preserveBlobReferences) MinimumSize() int { return vs.minimumValueSize }
272-
273-
// EstimatedFileSize returns an estimate of the disk space consumed by the current
274-
// blob file if it were closed now.
275-
func (vs *preserveBlobReferences) EstimatedFileSize() uint64 {
276-
return 0
277-
}
278-
279-
// EstimatedReferenceSize returns an estimate of the disk space consumed by the
280-
// current output sstable's blob references so far.
281-
func (vs *preserveBlobReferences) EstimatedReferenceSize() uint64 {
282-
// TODO(jackson): The totalValueSize is the uncompressed value sizes. With
283-
// compressible data, it overestimates the disk space consumed by the blob
284-
// references. It also does not include the blob file's index block or
285-
// footer, so it can underestimate if values are completely incompressible.
286-
//
287-
// Should we compute a compression ratio per blob file and scale the
288-
// references appropriately?
289-
return vs.totalValueSize
290-
}
291-
292-
// Add implements compact.ValueSeparation. This implementation will write
293-
// existing blob references to the output table.
294-
func (vs *preserveBlobReferences) Add(
295-
tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool, _ bool,
296-
) error {
297-
if !kv.V.IsBlobValueHandle() {
298-
// If the value is not already a blob handle (either it's in-place or in
299-
// a value block), we retrieve the value and write it through Add. The
300-
// sstable writer may still decide to put the value in a value block,
301-
// but regardless the value will be written to the sstable itself and
302-
// not a blob file.
303-
v, callerOwned, err := kv.Value(vs.buf)
304-
if err != nil {
305-
return err
306-
}
307-
if callerOwned {
308-
vs.buf = v[:0]
309-
}
310-
return tw.Add(kv.K, v, forceObsolete)
311-
}
312-
313-
// The value is an existing blob handle. We can copy it into the output
314-
// sstable, taking note of the reference for the table metadata.
315-
lv := kv.V.LazyValue()
316-
fileID := lv.Fetcher.BlobFileID
317-
318-
var refID base.BlobReferenceID
319-
if refIdx := slices.IndexFunc(vs.currReferences, func(ref pendingReference) bool {
320-
return ref.blobFileID == fileID
321-
}); refIdx != -1 {
322-
refID = base.BlobReferenceID(refIdx)
323-
} else {
324-
refID = base.BlobReferenceID(len(vs.currReferences))
325-
vs.currReferences = append(vs.currReferences, pendingReference{
326-
blobFileID: fileID,
327-
valueSize: 0,
328-
})
329-
}
330-
331-
if invariants.Enabled && vs.currReferences[refID].blobFileID != fileID {
332-
panic("wrong reference index")
333-
}
334-
335-
handleSuffix := blob.DecodeHandleSuffix(lv.ValueOrHandle)
336-
inlineHandle := blob.InlineHandle{
337-
InlineHandlePreface: blob.InlineHandlePreface{
338-
ReferenceID: refID,
339-
ValueLen: lv.Fetcher.Attribute.ValueLen,
340-
},
341-
HandleSuffix: handleSuffix,
342-
}
343-
err := tw.AddWithBlobHandle(kv.K, inlineHandle, lv.Fetcher.Attribute.ShortAttribute, forceObsolete)
344-
if err != nil {
345-
return err
346-
}
347-
vs.currReferences[refID].valueSize += uint64(lv.Fetcher.Attribute.ValueLen)
348-
vs.totalValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
349-
return nil
350-
}
351-
352-
// FinishOutput implements compact.ValueSeparation.
353-
func (vs *preserveBlobReferences) FinishOutput() (valsep.ValueSeparationMetadata, error) {
354-
if invariants.Enabled {
355-
// Assert that the incrementally-maintained totalValueSize matches the
356-
// sum of all the reference value sizes.
357-
totalValueSize := uint64(0)
358-
for _, ref := range vs.currReferences {
359-
totalValueSize += ref.valueSize
360-
}
361-
if totalValueSize != vs.totalValueSize {
362-
return valsep.ValueSeparationMetadata{},
363-
errors.AssertionFailedf("totalValueSize mismatch: %d != %d", totalValueSize, vs.totalValueSize)
364-
}
365-
}
366-
367-
references := make(manifest.BlobReferences, len(vs.currReferences))
368-
for i := range vs.currReferences {
369-
ref := vs.currReferences[i]
370-
phys, ok := vs.inputBlobPhysicalFiles[ref.blobFileID]
371-
if !ok {
372-
return valsep.ValueSeparationMetadata{},
373-
errors.AssertionFailedf("pebble: blob file %s not found among input sstables", ref.blobFileID)
374-
}
375-
references[i] = manifest.MakeBlobReference(ref.blobFileID, ref.valueSize, ref.valueSize, phys)
376-
}
377-
referenceSize := vs.totalValueSize
378-
vs.currReferences = vs.currReferences[:0]
379-
vs.totalValueSize = 0
380-
return valsep.ValueSeparationMetadata{
381-
BlobReferences: references,
382-
BlobReferenceSize: referenceSize,
383-
// The outputBlobReferenceDepth is computed from the input sstables,
384-
// reflecting the worst-case overlap of referenced blob files. If this
385-
// sstable references fewer unique blob files, reduce its depth to the
386-
// count of unique files.
387-
BlobReferenceDepth: min(vs.outputBlobReferenceDepth, manifest.BlobReferenceDepth(len(references))),
388-
}, nil
389-
}

compaction_value_separation_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,20 @@ func TestValueSeparationPolicy(t *testing.T) {
7878
case "never-separate-values":
7979
vs = valsep.NeverSeparateValues{}
8080
case "preserve-blob-references":
81-
pbr := &preserveBlobReferences{}
8281
lines := crstrings.Lines(d.Input)
83-
pbr.inputBlobPhysicalFiles = make(map[base.BlobFileID]*manifest.PhysicalBlobFile, len(lines))
82+
inputBlobPhysicalFiles := make(map[base.BlobFileID]*manifest.PhysicalBlobFile, len(lines))
8483
for _, line := range lines {
8584
bfm, err := manifest.ParseBlobFileMetadataDebug(line)
8685
require.NoError(t, err)
8786
fn = max(fn, bfm.Physical.FileNum)
88-
pbr.inputBlobPhysicalFiles[bfm.FileID] = bfm.Physical
87+
inputBlobPhysicalFiles[bfm.FileID] = bfm.Physical
8988
}
89+
pbr := valsep.NewPreserveAllHotBlobReferences(
90+
inputBlobPhysicalFiles, /* blob file set */
91+
manifest.BlobReferenceDepth(0),
92+
sstable.ValueSeparationDefault,
93+
0, /* minimum size */
94+
)
9095
vs = pbr
9196
case "write-new-blob-files":
9297
var minimumSize int
@@ -212,7 +217,7 @@ func (w *loggingRawWriter) AddWithBlobHandle(
212217
type defineDBValueSeparator struct {
213218
bv blobtest.Values
214219
metas map[base.BlobFileID]*manifest.PhysicalBlobFile
215-
pbr *preserveBlobReferences
220+
pbr valsep.ValueSeparation
216221
kv base.InternalKV
217222
}
218223

@@ -274,10 +279,6 @@ func (vs *defineDBValueSeparator) Add(
274279
meta.Size += uint64(lv.Fetcher.Attribute.ValueLen)
275280
meta.ValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
276281

277-
if vs.pbr.inputBlobPhysicalFiles == nil {
278-
vs.pbr.inputBlobPhysicalFiles = make(map[base.BlobFileID]*manifest.PhysicalBlobFile)
279-
}
280-
vs.pbr.inputBlobPhysicalFiles[fileID] = meta
281282
// Return a KV that uses the original key but our constructed blob reference.
282283
vs.kv.K = kv.K
283284
vs.kv.V = iv

data_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -912,12 +912,14 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
912912
// all the tables, we can construct all the referenced blob files and add
913913
// them to the final version edit.
914914
valueSeparator := &defineDBValueSeparator{
915-
pbr: &preserveBlobReferences{
916-
originalValueSeparationKind: sstable.ValueSeparationDefault,
917-
minimumValueSize: d.opts.Experimental.ValueSeparationPolicy().MinimumSize,
918-
},
919915
metas: make(map[base.BlobFileID]*manifest.PhysicalBlobFile),
920916
}
917+
valueSeparator.pbr = valsep.NewPreserveAllHotBlobReferences(
918+
valueSeparator.metas,
919+
0, /* outputreference depth */
920+
sstable.ValueSeparationDefault,
921+
d.opts.Experimental.ValueSeparationPolicy().MinimumSize,
922+
)
921923

922924
var mem *memTable
923925
var start, end *base.InternalKey
@@ -934,14 +936,14 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
934936
flushable: mem,
935937
flushed: make(chan struct{}),
936938
}}
939+
getValueSeparator := func(JobID, *tableCompaction, ValueStoragePolicy) valsep.ValueSeparation {
940+
return valueSeparator
941+
}
937942
c, err := newFlush(d.opts, d.mu.versions.currentVersion(), d.mu.versions.latest.l0Organizer,
938-
d.mu.versions.picker.getBaseLevel(), toFlush, time.Now(), d.shouldCreateShared(0), d.determineCompactionValueSeparation)
943+
d.mu.versions.picker.getBaseLevel(), toFlush, time.Now(), d.shouldCreateShared(0), getValueSeparator)
939944
if err != nil {
940945
return err
941946
}
942-
c.getValueSeparation = func(JobID, *tableCompaction, ValueStoragePolicy) valsep.ValueSeparation {
943-
return valueSeparator
944-
}
945947
// NB: define allows the test to exactly specify which keys go
946948
// into which sstables. If the test has a small target file
947949
// size to test grandparent limits, etc, the maxOutputFileSize

0 commit comments

Comments
 (0)