Skip to content

Commit f0a0e2b

Browse files
committed
sstable: split Writer into Writer and RawWriter
The sstable.Writer had some egronomic methods for writing to the Writer from non-internal client code (implementing the pebble.Writer interface). These methods are internally unused, but may be used by external clients writing to sstables for ingestion, storage outside the LSM, tests, etc. This commit splits the Writer into two types, with the sugaring methods on the Writer type and the lower-level internal methods on the RawWriter type. In future work, sstable.RawWriter may become an interface to support two implementations: one for rowblk-based sstables and one for colblk-based sstables.
1 parent ed42fb4 commit f0a0e2b

29 files changed

+2171
-2115
lines changed

compaction.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2656,7 +2656,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
26562656
// compaction or flush.
26572657
func (d *DB) newCompactionOutput(
26582658
jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
2659-
) (objstorage.ObjectMetadata, *sstable.Writer, CPUWorkHandle, error) {
2659+
) (objstorage.ObjectMetadata, *sstable.RawWriter, CPUWorkHandle, error) {
26602660
d.mu.Lock()
26612661
diskFileNum := d.mu.versions.getNextDiskFileNum()
26622662
d.mu.Unlock()
@@ -2730,7 +2730,7 @@ func (d *DB) newCompactionOutput(
27302730
d.opts.Experimental.MaxWriterConcurrency > 0 &&
27312731
(cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)
27322732

2733-
tw := sstable.NewWriter(writable, writerOpts)
2733+
tw := sstable.NewRawWriter(writable, writerOpts)
27342734
return objMeta, tw, cpuWorkHandle, nil
27352735
}
27362736

data_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ func runBuildRemoteCmd(td *datadriven.TestData, d *DB, storage remote.Storage) e
586586
for kv := iter.First(); kv != nil; kv = iter.Next() {
587587
tmp := kv.K
588588
tmp.SetSeqNum(0)
589-
if err := w.Add(tmp, kv.InPlaceValue()); err != nil {
589+
if err := w.Raw().Add(tmp, kv.InPlaceValue()); err != nil {
590590
return err
591591
}
592592
}
@@ -599,7 +599,7 @@ func runBuildRemoteCmd(td *datadriven.TestData, d *DB, storage remote.Storage) e
599599
for ; s != nil && err == nil; s, err = rdi.Next() {
600600
err = rangedel.Encode(s, func(k base.InternalKey, v []byte) error {
601601
k.SetSeqNum(0)
602-
return w.Add(k, v)
602+
return w.Raw().Add(k, v)
603603
})
604604
if err != nil {
605605
return err
@@ -680,7 +680,7 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
680680
for kv := iter.First(); kv != nil; kv = iter.Next() {
681681
tmp := kv.K
682682
tmp.SetSeqNum(0)
683-
if err := w.Add(tmp, kv.InPlaceValue()); err != nil {
683+
if err := w.Raw().Add(tmp, kv.InPlaceValue()); err != nil {
684684
return err
685685
}
686686
}
@@ -693,7 +693,7 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
693693
for ; s != nil && err == nil; s, err = rdi.Next() {
694694
err = rangedel.Encode(s, func(k base.InternalKey, v []byte) error {
695695
k.SetSeqNum(0)
696-
return w.Add(k, v)
696+
return w.Raw().Add(k, v)
697697
})
698698
if err != nil {
699699
return err

event_listener_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func TestEventListener(t *testing.T) {
138138
if err != nil {
139139
return err.Error()
140140
}
141-
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
141+
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
142142
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
143143
})
144144
if err := w.Add(base.MakeInternalKey([]byte("a"), 0, InternalKeyKindSet), nil); err != nil {
@@ -172,7 +172,7 @@ func TestEventListener(t *testing.T) {
172172
if err != nil {
173173
return err
174174
}
175-
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
175+
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
176176
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
177177
})
178178
if err := w.Add(base.MakeInternalKey([]byte{key}, 0, InternalKeyKindSet), nil); err != nil {

ingest_test.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func TestIngestLoad(t *testing.T) {
9999
if err != nil {
100100
return err.Error()
101101
}
102-
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writerOpts)
102+
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writerOpts)
103103
for _, data := range strings.Split(td.Input, "\n") {
104104
if strings.HasPrefix(data, "rangekey: ") {
105105
data = strings.TrimPrefix(data, "rangekey: ")
@@ -193,7 +193,7 @@ func TestIngestLoadRand(t *testing.T) {
193193

194194
expected[i].ExtendPointKeyBounds(cmp, keys[0], keys[len(keys)-1])
195195

196-
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
196+
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
197197
TableFormat: version.MaxTableFormat(),
198198
})
199199
var count uint64
@@ -1092,7 +1092,7 @@ func testIngestSharedImpl(
10921092
f, err := to.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified)
10931093
require.NoError(t, err)
10941094
replicateCounter++
1095-
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts)
1095+
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)
10961096

10971097
var sharedSSTs []SharedSSTMeta
10981098
err = from.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, startKey, endKey,
@@ -1103,7 +1103,11 @@ func testIngestSharedImpl(
11031103
return nil
11041104
},
11051105
func(start, end []byte, seqNum base.SeqNum) error {
1106-
require.NoError(t, w.DeleteRange(start, end))
1106+
require.NoError(t, w.EncodeSpan(&keyspan.Span{
1107+
Start: start,
1108+
End: end,
1109+
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete)}},
1110+
}))
11071111
return nil
11081112
},
11091113
func(start, end []byte, keys []keyspan.Key) error {
@@ -1588,7 +1592,7 @@ func TestConcurrentExcise(t *testing.T) {
15881592
f, err := to.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified)
15891593
require.NoError(t, err)
15901594
replicateCounter++
1591-
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts)
1595+
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)
15921596

15931597
var sharedSSTs []SharedSSTMeta
15941598
err = from.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, startKey, endKey,
@@ -1599,7 +1603,11 @@ func TestConcurrentExcise(t *testing.T) {
15991603
return nil
16001604
},
16011605
func(start, end []byte, seqNum base.SeqNum) error {
1602-
require.NoError(t, w.DeleteRange(start, end))
1606+
require.NoError(t, w.EncodeSpan(&keyspan.Span{
1607+
Start: start,
1608+
End: end,
1609+
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete)}},
1610+
}))
16031611
return nil
16041612
},
16051613
func(start, end []byte, keys []keyspan.Key) error {
@@ -2021,7 +2029,7 @@ func TestIngestExternal(t *testing.T) {
20212029
f, err := to.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified)
20222030
require.NoError(t, err)
20232031
replicateCounter++
2024-
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts)
2032+
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)
20252033

20262034
var externalFiles []ExternalFile
20272035
err = from.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, startKey, endKey,
@@ -2032,7 +2040,11 @@ func TestIngestExternal(t *testing.T) {
20322040
return nil
20332041
},
20342042
func(start, end []byte, seqNum base.SeqNum) error {
2035-
require.NoError(t, w.DeleteRange(start, end))
2043+
require.NoError(t, w.EncodeSpan(&keyspan.Span{
2044+
Start: start,
2045+
End: end,
2046+
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete)}},
2047+
}))
20362048
return nil
20372049
},
20382050
func(start, end []byte, keys []keyspan.Key) error {
@@ -2574,7 +2586,7 @@ func TestIngestCompact(t *testing.T) {
25742586
f, err := mem.Create(src(0), vfs.WriteCategoryUnspecified)
25752587
require.NoError(t, err)
25762588

2577-
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{})
2589+
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{})
25782590
key := []byte("a")
25792591
require.NoError(t, w.Add(base.MakeInternalKey(key, 0, InternalKeyKindSet), nil))
25802592
require.NoError(t, w.Close())
@@ -3212,12 +3224,12 @@ func TestIngestFileNumReuseCrash(t *testing.T) {
32123224
func TestIngest_UpdateSequenceNumber(t *testing.T) {
32133225
mem := vfs.NewMem()
32143226
cmp := base.DefaultComparer.Compare
3215-
parse := func(input string) (*sstable.Writer, error) {
3227+
parse := func(input string) (*sstable.RawWriter, error) {
32163228
f, err := mem.Create("ext", vfs.WriteCategoryUnspecified)
32173229
if err != nil {
32183230
return nil, err
32193231
}
3220-
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
3232+
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
32213233
TableFormat: sstable.TableFormatMax,
32223234
})
32233235
for _, data := range strings.Split(input, "\n") {

internal/compact/run.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func (r *Runner) MoreDataToWrite() bool {
132132
// Result.Tables. Should only be called if MoreDataToWrite() returned true.
133133
//
134134
// WriteTable always closes the Writer.
135-
func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.Writer) {
135+
func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.RawWriter) {
136136
if r.err != nil {
137137
panic("error already encountered")
138138
}
@@ -159,7 +159,7 @@ func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.Write
159159
r.tables[len(r.tables)-1].WriterMeta = *writerMeta
160160
}
161161

162-
func (r *Runner) writeKeysToTable(tw *sstable.Writer) (splitKey []byte, _ error) {
162+
func (r *Runner) writeKeysToTable(tw *sstable.RawWriter) (splitKey []byte, _ error) {
163163
firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan))
164164
if r.key != nil && firstKey == nil {
165165
firstKey = r.key.UserKey

internal/compact/spans.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (c *RangeKeySpanCompactor) elideInLastStripe(
182182
//
183183
// The span can contain either only RANGEDEL keys or only range keys.
184184
func SplitAndEncodeSpan(
185-
cmp base.Compare, span *keyspan.Span, upToKey []byte, tw *sstable.Writer,
185+
cmp base.Compare, span *keyspan.Span, upToKey []byte, tw *sstable.RawWriter,
186186
) error {
187187
if span.Empty() {
188188
return nil

internal/compact/spans_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func TestSplitAndEncodeSpan(t *testing.T) {
121121
}
122122

123123
obj := &objstorage.MemObj{}
124-
tw := sstable.NewWriter(obj, sstable.WriterOptions{TableFormat: sstable.TableFormatMax})
124+
tw := sstable.NewRawWriter(obj, sstable.WriterOptions{TableFormat: sstable.TableFormatMax})
125125
require.NoError(t, SplitAndEncodeSpan(base.DefaultComparer.Compare, &span, upToKey, tw))
126126
require.NoError(t, tw.Close())
127127
_, rangeDels, rangeKeys := sstable.ReadAll(obj)

level_checker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func TestCheckLevelsCornerCases(t *testing.T) {
183183
writerOpts.SetInternal(sstableinternal.WriterOptions{
184184
DisableKeyOrderChecks: disableKeyOrderChecks,
185185
})
186-
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writerOpts)
186+
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writerOpts)
187187
var tombstones []keyspan.Span
188188
frag := keyspan.Fragmenter{
189189
Cmp: testkeys.Comparer.Compare,

level_iter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func (lt *levelIterTest) runBuild(d *datadriven.TestData) string {
221221
}
222222
}
223223
fp := bloom.FilterPolicy(10)
224-
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f0), sstable.WriterOptions{
224+
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f0), sstable.WriterOptions{
225225
Comparer: &lt.cmp,
226226
FilterPolicy: fp,
227227
TableFormat: tableFormat,
@@ -508,9 +508,9 @@ func buildLevelIterTables(
508508
files[i] = f
509509
}
510510

511-
writers := make([]*sstable.Writer, len(files))
511+
writers := make([]*sstable.RawWriter, len(files))
512512
for i := range files {
513-
writers[i] = sstable.NewWriter(objstorageprovider.NewFileWritable(files[i]), sstable.WriterOptions{
513+
writers[i] = sstable.NewRawWriter(objstorageprovider.NewFileWritable(files[i]), sstable.WriterOptions{
514514
BlockRestartInterval: restartInterval,
515515
BlockSize: blockSize,
516516
Compression: NoCompression,

merging_iter_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func TestMergingIterCornerCases(t *testing.T) {
221221
if err != nil {
222222
return err.Error()
223223
}
224-
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{})
224+
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{})
225225
var tombstones []keyspan.Span
226226
frag := keyspan.Fragmenter{
227227
Cmp: cmp,
@@ -339,9 +339,9 @@ func buildMergingIterTables(
339339
files[i] = f
340340
}
341341

342-
writers := make([]*sstable.Writer, len(files))
342+
writers := make([]*sstable.RawWriter, len(files))
343343
for i := range files {
344-
writers[i] = sstable.NewWriter(objstorageprovider.NewFileWritable(files[i]), sstable.WriterOptions{
344+
writers[i] = sstable.NewRawWriter(objstorageprovider.NewFileWritable(files[i]), sstable.WriterOptions{
345345
BlockRestartInterval: restartInterval,
346346
BlockSize: blockSize,
347347
Compression: NoCompression,
@@ -542,7 +542,7 @@ func buildLevelsForMergingIterSeqSeek(
542542
}
543543

544544
const targetL6FirstFileSize = 2 << 20
545-
writers := make([][]*sstable.Writer, levelCount)
545+
writers := make([][]*sstable.RawWriter, levelCount)
546546
// A policy unlikely to have false positives.
547547
filterPolicy := bloom.FilterPolicy(100)
548548
for i := range files {
@@ -572,7 +572,7 @@ func buildLevelsForMergingIterSeqSeek(
572572
writerOptions.IndexBlockSize = 1
573573
}
574574
}
575-
writers[i] = append(writers[i], sstable.NewWriter(objstorageprovider.NewFileWritable(files[i][j]), writerOptions))
575+
writers[i] = append(writers[i], sstable.NewRawWriter(objstorageprovider.NewFileWritable(files[i][j]), writerOptions))
576576
}
577577
}
578578

0 commit comments

Comments
 (0)