Skip to content

Commit 3795809

Browse files
committed
db: don't store table fromat in compaction
We currently store a table format in compaction. In practice, this is always `d.TableFormat()`, and we don't even end up using it when we write the output (we use `d.TableFormat()` again). This change removes the table format from the compaction to simplify things a bit. This allows adding a simpler `DB.makeWriterOptions()` (which makes future extensions easier).
1 parent 1cb683a commit 3795809

File tree

9 files changed

+54
-50
lines changed

9 files changed

+54
-50
lines changed

blob_rewrite.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ func (d *DB) runBlobFileRewriteLocked(
320320
if err != nil {
321321
return objstorage.ObjectMetadata{}, nil, err
322322
}
323-
// Initialize a blob file rewriter. We pass L6 to MakeBlobWriterOptions.
323+
// Initialize a blob file rewriter. We pass L6 to makeBlobWriterOptions.
324324
// There's no single associated level with a blob file. A long-lived blob
325325
// file that gets rewritten is likely to mostly be referenced from L6.
326326
// TODO(jackson): Consider refactoring to remove the level association.
@@ -329,7 +329,7 @@ func (d *DB) runBlobFileRewriteLocked(
329329
env,
330330
objMeta.DiskFileNum,
331331
writable,
332-
d.opts.MakeBlobWriterOptions(6, d.BlobFileFormat()),
332+
d.makeBlobWriterOptions(6),
333333
c.referencingTables,
334334
c.input,
335335
)

compaction.go

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"github.com/cockroachdb/pebble/internal/sstableinternal"
3131
"github.com/cockroachdb/pebble/objstorage"
3232
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
33-
"github.com/cockroachdb/pebble/objstorage/remote"
3433
"github.com/cockroachdb/pebble/sstable"
3534
"github.com/cockroachdb/pebble/sstable/blob"
3635
"github.com/cockroachdb/pebble/sstable/block"
@@ -251,7 +250,7 @@ type tableCompaction struct {
251250
// b) rewrite blob files: The compaction will write eligible values to new
252251
// blob files. This consumes more write bandwidth because all values are
253252
// rewritten. However it restores locality.
254-
getValueSeparation func(JobID, *tableCompaction, sstable.TableFormat) compact.ValueSeparation
253+
getValueSeparation func(JobID, *tableCompaction) compact.ValueSeparation
255254

256255
// startLevel is the level that is being compacted. Inputs from startLevel
257256
// and outputLevel will be merged to produce a set of outputLevel files.
@@ -336,7 +335,6 @@ type tableCompaction struct {
336335

337336
grantHandle CompactionGrantHandle
338337

339-
tableFormat sstable.TableFormat
340338
objCreateOpts objstorage.CreateOptions
341339

342340
annotations []string
@@ -544,7 +542,7 @@ func (c *tableCompaction) makeInfo(jobID JobID) CompactionInfo {
544542
return info
545543
}
546544

547-
type getValueSeparation func(JobID, *tableCompaction, sstable.TableFormat) compact.ValueSeparation
545+
type getValueSeparation func(JobID, *tableCompaction) compact.ValueSeparation
548546

549547
// newCompaction constructs a compaction from the provided picked compaction.
550548
//
@@ -556,7 +554,7 @@ func newCompaction(
556554
beganAt time.Time,
557555
provider objstorage.Provider,
558556
grantHandle CompactionGrantHandle,
559-
tableFormat sstable.TableFormat,
557+
preferSharedStorage bool,
560558
getValueSeparation getValueSeparation,
561559
) *tableCompaction {
562560
c := &tableCompaction{
@@ -574,7 +572,6 @@ func newCompaction(
574572
picker: pc.pickerMetrics,
575573
},
576574
grantHandle: grantHandle,
577-
tableFormat: tableFormat,
578575
}
579576
// Acquire a reference to the version to ensure that files and in-memory
580577
// version state necessary for reading files remain available. Ignoring
@@ -615,8 +612,6 @@ func newCompaction(
615612
)
616613
c.kind = pc.kind
617614

618-
preferSharedStorage := tableFormat >= FormatMinForSharedObjects.MaxTableFormat() &&
619-
remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
620615
c.maybeSwitchToMoveOrCopy(preferSharedStorage, provider)
621616
c.objCreateOpts = objstorage.CreateOptions{
622617
PreferSharedStorage: preferSharedStorage,
@@ -833,7 +828,7 @@ func newFlush(
833828
baseLevel int,
834829
flushing flushableList,
835830
beganAt time.Time,
836-
tableFormat sstable.TableFormat,
831+
preferSharedStorage bool,
837832
getValueSeparation getValueSeparation,
838833
) (*tableCompaction, error) {
839834
c := &tableCompaction{
@@ -845,7 +840,6 @@ func newFlush(
845840
maxOutputFileSize: math.MaxUint64,
846841
maxOverlapBytes: math.MaxUint64,
847842
grantHandle: noopGrantHandle{},
848-
tableFormat: tableFormat,
849843
metrics: compactionMetrics{
850844
beganAt: beganAt,
851845
},
@@ -872,8 +866,6 @@ func newFlush(
872866
}
873867
}
874868

875-
preferSharedStorage := tableFormat >= FormatMinForSharedObjects.MaxTableFormat() &&
876-
remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
877869
c.objCreateOpts = objstorage.CreateOptions{
878870
PreferSharedStorage: preferSharedStorage,
879871
WriteCategory: getDiskWriteCategoryForCompaction(opts, c.kind),
@@ -1722,8 +1714,16 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
17221714
}
17231715
}
17241716

1725-
c, err := newFlush(d.opts, d.mu.versions.currentVersion(), d.mu.versions.latest.l0Organizer,
1726-
d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow(), d.TableFormat(), d.determineCompactionValueSeparation)
1717+
c, err := newFlush(
1718+
d.opts,
1719+
d.mu.versions.currentVersion(),
1720+
d.mu.versions.latest.l0Organizer,
1721+
d.mu.versions.picker.getBaseLevel(),
1722+
d.mu.mem.queue[:n],
1723+
d.timeNow(),
1724+
d.shouldCreateShared(0),
1725+
d.determineCompactionValueSeparation,
1726+
)
17271727
if err != nil {
17281728
return 0, err
17291729
}
@@ -2921,12 +2921,7 @@ func (d *DB) runCopyCompaction(
29212921
}
29222922
}()
29232923

2924-
w, _, err := d.objProvider.Create(
2925-
ctx, base.FileTypeTable, newMeta.TableBacking.DiskFileNum,
2926-
objstorage.CreateOptions{
2927-
PreferSharedStorage: d.shouldCreateShared(c.outputLevel.level),
2928-
},
2929-
)
2924+
w, _, err := d.objProvider.Create(ctx, base.FileTypeTable, newMeta.TableBacking.DiskFileNum, c.objCreateOpts)
29302925
if err != nil {
29312926
return nil, compact.Stats{}, []compact.OutputBlob{}, err
29322927
}
@@ -2954,7 +2949,7 @@ func (d *DB) runCopyCompaction(
29542949
// or update category stats).
29552950
wrote, err = sstable.CopySpan(ctx,
29562951
src, r, c.startLevel.level,
2957-
w, d.opts.MakeWriterOptions(c.outputLevel.level, d.TableFormat()),
2952+
w, d.makeWriterOptions(c.outputLevel.level),
29582953
start, end,
29592954
)
29602955
return err
@@ -3306,9 +3301,9 @@ func (d *DB) runDefaultTableCompaction(
33063301
defer d.mu.Lock()
33073302

33083303
// Determine whether we should separate values into blob files.
3309-
valueSeparation := c.getValueSeparation(jobID, c, c.tableFormat)
3304+
valueSeparation := c.getValueSeparation(jobID, c)
33103305

3311-
result := d.compactAndWrite(jobID, c, snapshots, c.tableFormat, valueSeparation)
3306+
result := d.compactAndWrite(jobID, c, snapshots, valueSeparation)
33123307
if result.Err == nil {
33133308
ve, result.Err = c.makeVersionEdit(result)
33143309
}
@@ -3352,7 +3347,6 @@ func (d *DB) compactAndWrite(
33523347
jobID JobID,
33533348
c *tableCompaction,
33543349
snapshots compact.Snapshots,
3355-
tableFormat sstable.TableFormat,
33563350
valueSeparation compact.ValueSeparation,
33573351
) (result compact.Result) {
33583352
suggestedCacheReaders := blob.SuggestedCachedReaders(len(c.inputs))
@@ -3469,7 +3463,7 @@ func (d *DB) compactAndWrite(
34693463
spanPolicyValid = true
34703464
}
34713465

3472-
writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
3466+
writerOpts := d.makeWriterOptions(c.outputLevel.level)
34733467
if spanPolicy.DisableValueSeparationBySuffix {
34743468
writerOpts.DisableValueBlocks = true
34753469
}

compaction_picker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ func (pc *pickedTableCompaction) ConstructCompaction(
248248
d.timeNow(),
249249
d.ObjProvider(),
250250
grantHandle,
251-
d.TableFormat(),
251+
d.shouldCreateShared(pc.outputLevel.level),
252252
d.determineCompactionValueSeparation)
253253
}
254254

compaction_picker_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ func TestCompactionPickerL0(t *testing.T) {
671671
var result strings.Builder
672672
if ptc != nil {
673673
checkClone(t, ptc)
674-
c := newCompaction(ptc, opts, time.Now(), nil /* provider */, noopGrantHandle{}, sstable.TableFormatMinSupported, neverSeparateValues)
674+
c := newCompaction(ptc, opts, time.Now(), nil /* provider */, noopGrantHandle{}, noSharedStorage, neverSeparateValues)
675675
fmt.Fprintf(&result, "L%d -> L%d\n", ptc.startLevel.level, ptc.outputLevel.level)
676676
fmt.Fprintf(&result, "L%d: %s\n", ptc.startLevel.level, tableNums(ptc.startLevel.files))
677677
if !ptc.outputLevel.files.Empty() {
@@ -713,6 +713,8 @@ func TestCompactionPickerL0(t *testing.T) {
713713
})
714714
}
715715

716+
const noSharedStorage = false
717+
716718
func TestCompactionPickerConcurrency(t *testing.T) {
717719
opts := DefaultOptions()
718720
opts.Experimental.L0CompactionConcurrency = 1
@@ -803,7 +805,7 @@ func TestCompactionPickerConcurrency(t *testing.T) {
803805
var result strings.Builder
804806
fmt.Fprintf(&result, "picker.getCompactionConcurrency: %d\n", allowedCompactions)
805807
if pc != nil {
806-
c := newCompaction(ptc, opts, time.Now(), nil /* provider */, noopGrantHandle{}, sstable.TableFormatMinSupported, neverSeparateValues)
808+
c := newCompaction(ptc, opts, time.Now(), nil /* provider */, noopGrantHandle{}, noSharedStorage, neverSeparateValues)
807809
fmt.Fprintf(&result, "L%d -> L%d\n", ptc.startLevel.level, ptc.outputLevel.level)
808810
fmt.Fprintf(&result, "L%d: %s\n", ptc.startLevel.level, tableNums(ptc.startLevel.files))
809811
if !ptc.outputLevel.files.Empty() {

compaction_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ func TestPickCompaction(t *testing.T) {
545545
pc, got := vs.picker.pickAutoScore(compactionEnv{diskAvailBytes: math.MaxUint64}), ""
546546
if pc != nil {
547547
tableCompaction := pc.(*pickedTableCompaction)
548-
c := newCompaction(tableCompaction, opts, time.Now(), nil /* provider */, noopGrantHandle{}, sstable.TableFormatMinSupported, neverSeparateValues)
548+
c := newCompaction(tableCompaction, opts, time.Now(), nil /* provider */, noopGrantHandle{}, noSharedStorage, neverSeparateValues)
549549

550550
gotStart := fileNums(c.startLevel.files)
551551
gotML := ""
@@ -1609,7 +1609,7 @@ func TestCompactionOutputLevel(t *testing.T) {
16091609
d.ScanArgs(t, "start", &start)
16101610
d.ScanArgs(t, "base", &base)
16111611
pc := newPickedTableCompaction(opts, version, l0Organizer, start, defaultOutputLevel(start, base), base)
1612-
c := newCompaction(pc, opts, time.Now(), nil /* provider */, noopGrantHandle{}, sstable.TableFormatMinSupported, neverSeparateValues)
1612+
c := newCompaction(pc, opts, time.Now(), nil /* provider */, noopGrantHandle{}, noSharedStorage, neverSeparateValues)
16131613
return fmt.Sprintf("output=%d\nmax-output-file-size=%d\n",
16141614
c.outputLevel.level, c.maxOutputFileSize)
16151615

@@ -3551,7 +3551,7 @@ func TestTombstoneDensityCompactionMoveOptimization(t *testing.T) {
35513551

35523552
// Create the compaction.
35533553
tableCompaction := pc.(*pickedTableCompaction)
3554-
c := newCompaction(tableCompaction, opts, time.Now(), nil, noopGrantHandle{}, sstable.TableFormatMinSupported, neverSeparateValues)
3554+
c := newCompaction(tableCompaction, opts, time.Now(), nil, noopGrantHandle{}, noSharedStorage, neverSeparateValues)
35553555

35563556
// The compaction should be converted to a move.
35573557
require.Equal(t, compactionKindMove, c.kind, "expected compaction to be optimized to a move")
@@ -3667,7 +3667,7 @@ func TestTombstoneDensityCompactionMoveOptimization_NoMoveWithOverlap(t *testing
36673667

36683668
// Create the compaction.
36693669
tableCompaction := pc.(*pickedTableCompaction)
3670-
c := newCompaction(tableCompaction, opts, time.Now(), nil, noopGrantHandle{}, sstable.TableFormatMinSupported, neverSeparateValues)
3670+
c := newCompaction(tableCompaction, opts, time.Now(), nil, noopGrantHandle{}, noSharedStorage, neverSeparateValues)
36713671

36723672
// The compaction should NOT be converted to a move.
36733673
require.NotEqual(t, compactionKindMove, c.kind, "move optimization should NOT apply when there is overlap in output level")

data_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ func runBuildRemoteCmd(td *datadriven.TestData, d *DB, storage remote.Storage) e
583583
}
584584
path := td.CmdArgs[0].String()
585585

586-
// Use TableFormatMax here and downgrade after, if necessary. This ensures
586+
// Use d.TableFormat() here and downgrade after, if necessary. This ensures
587587
// that all fields are set.
588588
writeOpts := d.opts.MakeWriterOptions(0 /* level */, d.TableFormat())
589589
if rand.IntN(4) == 0 {
@@ -648,7 +648,7 @@ func runBuildCmd(
648648
}
649649
path := td.CmdArgs[0].String()
650650

651-
writeOpts := d.opts.MakeWriterOptions(0 /* level */, d.TableFormat())
651+
writeOpts := d.makeWriterOptions(0 /* level */)
652652
if err := sstable.ParseWriterOptions(&writeOpts, td.CmdArgs[1:]...); err != nil {
653653
return err
654654
}
@@ -932,11 +932,11 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
932932
flushed: make(chan struct{}),
933933
}}
934934
c, err := newFlush(d.opts, d.mu.versions.currentVersion(), d.mu.versions.latest.l0Organizer,
935-
d.mu.versions.picker.getBaseLevel(), toFlush, time.Now(), d.TableFormat(), d.determineCompactionValueSeparation)
935+
d.mu.versions.picker.getBaseLevel(), toFlush, time.Now(), d.shouldCreateShared(0), d.determineCompactionValueSeparation)
936936
if err != nil {
937937
return err
938938
}
939-
c.getValueSeparation = func(JobID, *tableCompaction, sstable.TableFormat) compact.ValueSeparation {
939+
c.getValueSeparation = func(JobID, *tableCompaction) compact.ValueSeparation {
940940
return valueSeparator
941941
}
942942
// NB: define allows the test to exactly specify which keys go
@@ -1163,7 +1163,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
11631163
writable, _, err := d.objProvider.Create(context.Background(), base.FileTypeBlob, fileNum, objstorage.CreateOptions{})
11641164
return writable, err
11651165
}
1166-
blobWriterOpts := d.opts.MakeBlobWriterOptions(0, d.BlobFileFormat())
1166+
blobWriterOpts := d.makeBlobWriterOptions(0)
11671167
fileStats, err := valueSeparator.bv.WriteFiles(newBlobObject, blobWriterOpts)
11681168
if err != nil {
11691169
return nil, err

download.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ func (d *DB) tryLaunchDownloadForFile(
446446

447447
download.numLaunchedDownloads++
448448
doneCh = make(chan error, 1)
449-
c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider, noopGrantHandle{}, d.TableFormat(), d.determineCompactionValueSeparation)
449+
c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider, noopGrantHandle{}, d.shouldCreateShared(pc.outputLevel.level), d.determineCompactionValueSeparation)
450450
c.isDownload = true
451451
d.mu.compact.downloadingCount++
452452
c.AddInProgressLocked(d)

options.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2623,19 +2623,25 @@ func (o *Options) MakeWriterOptions(level int, format sstable.TableFormat) sstab
26232623
return writerOpts
26242624
}
26252625

2626-
// MakeBlobWriterOptions constructs blob.FileWriterOptions from the corresponding
2627-
// options in the receiver.
2628-
func (o *Options) MakeBlobWriterOptions(level int, format blob.FileFormat) blob.FileWriterOptions {
2629-
lo := o.Levels[level]
2626+
// makeWriterOptions constructs sstable.WriterOptions for the specified level
2627+
// using the current DB options and format.
2628+
func (d *DB) makeWriterOptions(level int) sstable.WriterOptions {
2629+
return d.opts.MakeWriterOptions(level, d.TableFormat())
2630+
}
2631+
2632+
// makeBlobWriterOptions constructs blob.FileWriterOptions using the current DB
2633+
// options and format.
2634+
func (d *DB) makeBlobWriterOptions(level int) blob.FileWriterOptions {
2635+
lo := &d.opts.Levels[level]
26302636
return blob.FileWriterOptions{
2631-
Format: format,
2637+
Format: d.BlobFileFormat(),
26322638
Compression: lo.Compression(),
26332639
ChecksumType: block.ChecksumTypeCRC32c,
26342640
FlushGovernor: block.MakeFlushGovernor(
26352641
lo.BlockSize,
26362642
lo.BlockSizeThreshold,
26372643
base.SizeClassAwareBlockSizeThreshold,
2638-
o.AllocatorSizeClasses,
2644+
d.opts.AllocatorSizeClasses,
26392645
),
26402646
}
26412647
}

value_separation.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,19 @@ import (
2424
// ValueStorageLatencyTolerant.
2525
const latencyTolerantMinimumSize = 10
2626

27-
var neverSeparateValues getValueSeparation = func(JobID, *tableCompaction, sstable.TableFormat) compact.ValueSeparation {
27+
var neverSeparateValues getValueSeparation = func(JobID, *tableCompaction) compact.ValueSeparation {
2828
return compact.NeverSeparateValues{}
2929
}
3030

3131
// determineCompactionValueSeparation determines whether a compaction should
3232
// separate values into blob files. It returns a compact.ValueSeparation
3333
// implementation that should be used for the compaction.
34+
//
35+
// It assumes that the compaction will write tables at d.TableFormat() or above.
3436
func (d *DB) determineCompactionValueSeparation(
35-
jobID JobID, c *tableCompaction, tableFormat sstable.TableFormat,
37+
jobID JobID, c *tableCompaction,
3638
) compact.ValueSeparation {
37-
if tableFormat < sstable.TableFormatPebblev7 || d.FormatMajorVersion() < FormatValueSeparation ||
39+
if d.FormatMajorVersion() < FormatValueSeparation ||
3840
d.opts.Experimental.ValueSeparationPolicy == nil {
3941
return compact.NeverSeparateValues{}
4042
}
@@ -61,7 +63,7 @@ func (d *DB) determineCompactionValueSeparation(
6163
jobID, c.kind, c.outputLevel.level, &c.metrics.bytesWritten, c.objCreateOpts)
6264
},
6365
shortAttrExtractor: d.opts.Experimental.ShortAttributeExtractor,
64-
writerOpts: d.opts.MakeBlobWriterOptions(c.outputLevel.level, d.BlobFileFormat()),
66+
writerOpts: d.makeBlobWriterOptions(c.outputLevel.level),
6567
minimumSize: policy.MinimumSize,
6668
globalMinimumSize: policy.MinimumSize,
6769
invalidValueCallback: func(userKey []byte, value []byte, err error) {

0 commit comments

Comments
 (0)