Skip to content

Commit fb71b26

Browse files
committed
db, valsep: replace writeNewBlobFiles with generalizedValueSeparation
1 parent 77a04d5 commit fb71b26

File tree

3 files changed

+54
-231
lines changed

3 files changed

+54
-231
lines changed

blob_rewrite_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,18 @@ func TestBlobRewrite(t *testing.T) {
9292
}
9393
vs = pbr
9494
case "write-new-blob-files":
95-
newSep := &writeNewBlobFiles{
96-
comparer: testkeys.Comparer,
97-
newBlobObject: func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
95+
var minimumSize int
96+
d.MaybeScanArgs(t, "minimum-size", &minimumSize)
97+
newSep := valsep.NewWriteNewBlobFiles(
98+
testkeys.Comparer,
99+
func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
98100
fn++
99101
return objStore.Create(ctx, base.FileTypeBlob, fn, objstorage.CreateOptions{})
100102
},
101-
}
102-
d.MaybeScanArgs(t, "minimum-size", &newSep.minimumSize)
103+
blob.FileWriterOptions{},
104+
minimumSize,
105+
valsep.WriteNewBlobFilesOptions{},
106+
)
103107
vs = newSep
104108
default:
105109
t.Fatalf("unknown value separation policy: %s", x)

compaction_value_separation.go

Lines changed: 26 additions & 213 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package pebble
66

77
import (
88
"slices"
9-
"time"
109

1110
"github.com/cockroachdb/errors"
1211
"github.com/cockroachdb/pebble/internal/base"
@@ -47,6 +46,12 @@ func (d *DB) determineCompactionValueSeparation(
4746

4847
// We're allowed to write blob references. Determine whether we should carry
4948
// forward existing blob references, or write new ones.
49+
50+
var blobFileSet map[base.BlobFileID]*manifest.PhysicalBlobFile
51+
if c.version != nil {
52+
// For flushes, c.version is nil.
53+
blobFileSet = uniqueInputBlobMetadatas(&c.version.BlobFiles, c.inputs)
54+
}
5055
minSize := uint64(policy.MinimumSize)
5156
switch valueStorage {
5257
case ValueStorageLowReadLatency:
@@ -62,35 +67,37 @@ func (d *DB) determineCompactionValueSeparation(
6267
kind = sstable.ValueSeparationSpanPolicy
6368
}
6469
return &preserveBlobReferences{
65-
inputBlobPhysicalFiles: uniqueInputBlobMetadatas(&c.version.BlobFiles, c.inputs),
70+
inputBlobPhysicalFiles: blobFileSet,
6671
outputBlobReferenceDepth: outputBlobReferenceDepth,
6772
minimumValueSize: int(minSize),
6873
originalValueSeparationKind: kind,
6974
}
7075
}
7176

7277
// This compaction should write values to new blob files.
73-
return &writeNewBlobFiles{
74-
comparer: d.opts.Comparer,
75-
newBlobObject: func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
78+
return valsep.NewWriteNewBlobFiles(
79+
d.opts.Comparer,
80+
func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
7681
return d.newCompactionOutputBlob(
7782
jobID, c.kind, c.outputLevel.level, &c.metrics.bytesWritten, c.objCreateOpts)
7883
},
79-
shortAttrExtractor: d.opts.Experimental.ShortAttributeExtractor,
80-
writerOpts: d.makeBlobWriterOptions(c.outputLevel.level),
81-
minimumSize: policy.MinimumSize,
82-
globalMinimumSize: policy.MinimumSize,
83-
invalidValueCallback: func(userKey []byte, value []byte, err error) {
84-
// The value may not be safe, so it will be redacted when redaction
85-
// is enabled.
86-
d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
87-
Kind: InvalidValue,
88-
UserKey: userKey,
89-
ExtraInfo: redact.Sprintf("callback=ShortAttributeExtractor,value=%x,err=%q",
90-
value, err),
91-
})
84+
d.makeBlobWriterOptions(c.outputLevel.level),
85+
policy.MinimumSize,
86+
valsep.WriteNewBlobFilesOptions{
87+
InputBlobPhysicalFiles: blobFileSet,
88+
ShortAttrExtractor: d.opts.Experimental.ShortAttributeExtractor,
89+
InvalidValueCallback: func(userKey []byte, value []byte, err error) {
90+
// The value may not be safe, so it will be redacted when redaction
91+
// is enabled.
92+
d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
93+
Kind: InvalidValue,
94+
UserKey: userKey,
95+
ExtraInfo: redact.Sprintf("callback=ShortAttributeExtractor,value=%x,err=%q",
96+
value, err),
97+
})
98+
},
9299
},
93-
}
100+
)
94101
}
95102

96103
// shouldWriteBlobFiles returns true if the compaction should write new blob
@@ -212,200 +219,6 @@ func uniqueInputBlobMetadatas(
212219
return m
213220
}
214221

215-
// writeNewBlobFiles implements the strategy and mechanics for separating values
216-
// into external blob files. We will always separate potential MVCC garbage
217-
// values into this external blob file. MVCC garbage values are determined on a
218-
// best-effort basis; see comments in sstable.IsLikelyMVCCGarbage for the
219-
// exact criteria we use.
220-
type writeNewBlobFiles struct {
221-
comparer *base.Comparer
222-
// newBlobObject constructs a new blob object for use in the compaction.
223-
newBlobObject func() (objstorage.Writable, objstorage.ObjectMetadata, error)
224-
shortAttrExtractor ShortAttributeExtractor
225-
// writerOpts is used to configure all constructed blob writers.
226-
writerOpts blob.FileWriterOptions
227-
// minimumSize imposes a lower bound on the size of values that can be
228-
// separated into a blob file. Values smaller than this are always written
229-
// to the sstable (but may still be written to a value block within the
230-
// sstable).
231-
//
232-
// minimumSize is set to globalMinimumSize by default and on every call to
233-
// FinishOutput. It may be overriden by SetNextOutputConfig (i.e, if a
234-
// SpanPolicy dictates a different minimum size for a span of the keyspace).
235-
minimumSize int
236-
// globalMinimumSize is the size threshold for separating values into blob
237-
// files globally across the keyspace. It may be overridden per-output by
238-
// SetNextOutputConfig.
239-
globalMinimumSize int
240-
// invalidValueCallback is called when a value is encountered for which the
241-
// short attribute extractor returns an error.
242-
invalidValueCallback func(userKey []byte, value []byte, err error)
243-
244-
// Current blob writer state.
245-
writer *blob.FileWriter
246-
objMeta objstorage.ObjectMetadata
247-
248-
buf []byte
249-
}
250-
251-
// Assert that *writeNewBlobFiles implements the compact.ValueSeparation interface.
252-
var _ valsep.ValueSeparation = (*writeNewBlobFiles)(nil)
253-
254-
// SetNextOutputConfig implements the ValueSeparation interface.
255-
func (vs *writeNewBlobFiles) SetNextOutputConfig(config valsep.ValueSeparationOutputConfig) {
256-
vs.minimumSize = config.MinimumSize
257-
}
258-
259-
// Kind implements the ValueSeparation interface.
260-
func (vs *writeNewBlobFiles) Kind() sstable.ValueSeparationKind {
261-
if vs.minimumSize != vs.globalMinimumSize {
262-
return sstable.ValueSeparationSpanPolicy
263-
}
264-
return sstable.ValueSeparationDefault
265-
}
266-
267-
// MinimumSize implements the ValueSeparation interface.
268-
func (vs *writeNewBlobFiles) MinimumSize() int { return vs.minimumSize }
269-
270-
// EstimatedFileSize returns an estimate of the disk space consumed by the current
271-
// blob file if it were closed now.
272-
func (vs *writeNewBlobFiles) EstimatedFileSize() uint64 {
273-
if vs.writer == nil {
274-
return 0
275-
}
276-
return vs.writer.EstimatedSize()
277-
}
278-
279-
// EstimatedReferenceSize returns an estimate of the disk space consumed by the
280-
// current output sstable's blob references so far.
281-
func (vs *writeNewBlobFiles) EstimatedReferenceSize() uint64 {
282-
// When we're writing to new blob files, the size of the blob file itself is
283-
// a better estimate of the disk space consumed than the uncompressed value
284-
// sizes.
285-
return vs.EstimatedFileSize()
286-
}
287-
288-
// Add adds the provided key-value pair to the sstable, possibly separating the
289-
// value into a blob file.
290-
func (vs *writeNewBlobFiles) Add(
291-
tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool, isLikelyMVCCGarbage bool,
292-
) error {
293-
// We always fetch the value if we're rewriting blob files. We want to
294-
// replace any references to existing blob files with references to new blob
295-
// files that we write during the compaction.
296-
v, callerOwned, err := kv.Value(vs.buf)
297-
if err != nil {
298-
return err
299-
}
300-
if callerOwned {
301-
vs.buf = v[:0]
302-
}
303-
304-
// Values that are too small are never separated; however, MVCC keys are
305-
// separated if they are a SET key kind, as long as the value is not empty.
306-
if len(v) < vs.minimumSize && !isLikelyMVCCGarbage {
307-
return tw.Add(kv.K, v, forceObsolete)
308-
}
309-
310-
// Merge and deletesized keys are never separated.
311-
switch kv.K.Kind() {
312-
case base.InternalKeyKindMerge, base.InternalKeyKindDeleteSized:
313-
return tw.Add(kv.K, v, forceObsolete)
314-
}
315-
316-
// This KV met all the criteria and its value will be separated.
317-
// If there's a configured short attribute extractor, extract the value's
318-
// short attribute.
319-
var shortAttr base.ShortAttribute
320-
if vs.shortAttrExtractor != nil {
321-
keyPrefixLen := vs.comparer.Split(kv.K.UserKey)
322-
shortAttr, err = vs.shortAttrExtractor(kv.K.UserKey, keyPrefixLen, v)
323-
if err != nil {
324-
// Report that there was a value for which the short attribute
325-
// extractor was unable to extract a short attribute.
326-
vs.invalidValueCallback(kv.K.UserKey, v, err)
327-
328-
// Rather than erroring out and aborting the flush or compaction, we
329-
// fallback to writing the value verbatim to the sstable. Otherwise
330-
// a flush could busy loop, repeatedly attempting to write the same
331-
// memtable and repeatedly unable to extract a key's short attribute.
332-
return tw.Add(kv.K, v, forceObsolete)
333-
}
334-
}
335-
336-
// If we don't have an open blob writer, create one. We create blob objects
337-
// lazily so that we don't create them unless a compaction will actually
338-
// write to a blob file. This avoids creating and deleting empty blob files
339-
// on every compaction in parts of the keyspace that a) are required to be
340-
// in-place or b) have small values.
341-
if vs.writer == nil {
342-
writable, objMeta, err := vs.newBlobObject()
343-
if err != nil {
344-
return err
345-
}
346-
vs.objMeta = objMeta
347-
vs.writer = blob.NewFileWriter(objMeta.DiskFileNum, writable, vs.writerOpts)
348-
}
349-
350-
// Append the value to the blob file.
351-
handle := vs.writer.AddValue(v, isLikelyMVCCGarbage)
352-
353-
// Write the key and the handle to the sstable. We need to map the
354-
// blob.Handle into a blob.InlineHandle. Everything is copied verbatim,
355-
// except the FileNum is translated into a reference index.
356-
inlineHandle := blob.InlineHandle{
357-
InlineHandlePreface: blob.InlineHandlePreface{
358-
// Since we're writing blob files and maintaining a 1:1 mapping
359-
// between sstables and blob files, the reference index is always 0
360-
// here. Only compactions that don't rewrite blob files will produce
361-
// handles with nonzero reference indices.
362-
ReferenceID: 0,
363-
ValueLen: handle.ValueLen,
364-
},
365-
HandleSuffix: blob.HandleSuffix{
366-
BlockID: handle.BlockID,
367-
ValueID: handle.ValueID,
368-
},
369-
}
370-
return tw.AddWithBlobHandle(kv.K, inlineHandle, shortAttr, forceObsolete)
371-
}
372-
373-
// FinishOutput closes the current blob file (if any). It returns the stats
374-
// and metadata of the now completed blob file.
375-
func (vs *writeNewBlobFiles) FinishOutput() (valsep.ValueSeparationMetadata, error) {
376-
if vs.writer == nil {
377-
return valsep.ValueSeparationMetadata{}, nil
378-
}
379-
stats, err := vs.writer.Close()
380-
if err != nil {
381-
return valsep.ValueSeparationMetadata{}, err
382-
}
383-
vs.writer = nil
384-
meta := &manifest.PhysicalBlobFile{
385-
FileNum: vs.objMeta.DiskFileNum,
386-
Size: stats.FileLen,
387-
ValueSize: stats.UncompressedValueBytes,
388-
CreationTime: uint64(time.Now().Unix()),
389-
}
390-
meta.PopulateProperties(&stats.Properties)
391-
// Reset the minimum size for the next output.
392-
vs.minimumSize = vs.globalMinimumSize
393-
394-
return valsep.ValueSeparationMetadata{
395-
BlobReferences: manifest.BlobReferences{
396-
manifest.MakeBlobReference(base.BlobFileID(vs.objMeta.DiskFileNum),
397-
stats.UncompressedValueBytes, stats.UncompressedValueBytes, meta),
398-
},
399-
BlobReferenceSize: stats.UncompressedValueBytes,
400-
BlobReferenceDepth: 1,
401-
NewBlobFiles: []valsep.NewBlobFileInfo{{
402-
FileStats: stats,
403-
FileObject: vs.objMeta,
404-
FileMetadata: meta,
405-
}},
406-
}, nil
407-
}
408-
409222
// preserveBlobReferences implements the compact.ValueSeparation interface. When
410223
// a compaction is configured with preserveBlobReferences, the compaction will
411224
// not create any new blob files. However, input references to existing blob

compaction_value_separation_test.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -89,26 +89,32 @@ func TestValueSeparationPolicy(t *testing.T) {
8989
}
9090
vs = pbr
9191
case "write-new-blob-files":
92-
newSep := &writeNewBlobFiles{
93-
comparer: testkeys.Comparer,
94-
newBlobObject: func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
95-
fn++
96-
return objStore.Create(ctx, base.FileTypeBlob, fn, objstorage.CreateOptions{})
97-
},
98-
invalidValueCallback: func(userKey []byte, value []byte, err error) {
99-
fmt.Fprintf(&buf, "# invalid value for key %q, value: %q: %s\n", userKey, value, err)
100-
},
101-
}
102-
d.MaybeScanArgs(t, "minimum-size", &newSep.minimumSize)
103-
newSep.globalMinimumSize = newSep.minimumSize
92+
var minimumSize int
93+
var shortAttrExtractor base.ShortAttributeExtractor
94+
d.MaybeScanArgs(t, "minimum-size", &minimumSize)
10495
if arg, ok := d.Arg("short-attr-extractor"); ok {
10596
switch arg.SingleVal(t) {
10697
case "error":
107-
newSep.shortAttrExtractor = errShortAttrExtractor
98+
shortAttrExtractor = errShortAttrExtractor
10899
default:
109100
t.Fatalf("unknown short attribute extractor: %s", arg.String())
110101
}
111102
}
103+
newSep := valsep.NewWriteNewBlobFiles(
104+
testkeys.Comparer,
105+
func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
106+
fn++
107+
return objStore.Create(ctx, base.FileTypeBlob, fn, objstorage.CreateOptions{})
108+
},
109+
blob.FileWriterOptions{},
110+
minimumSize,
111+
valsep.WriteNewBlobFilesOptions{
112+
ShortAttrExtractor: shortAttrExtractor,
113+
InvalidValueCallback: func(userKey []byte, value []byte, err error) {
114+
fmt.Fprintf(&buf, "# invalid value for key %q, value: %q: %s\n", userKey, value, err)
115+
},
116+
},
117+
)
112118
vs = newSep
113119
default:
114120
t.Fatalf("unknown value separation policy: %s", x)

0 commit comments

Comments
 (0)