Skip to content

Commit 5ee10bd

Browse files
committed
db: tolerate empty external ingestions
We now tolerate empty external ingestions. When we encounter an empty table in a download compaction, we just delete the table. The iterator correctness issues that occurred before seem to no longer apply, very likely thanks to the recent refactoring and cleanup of the iterator stack.
1 parent f75ed44 commit 5ee10bd

File tree

7 files changed

+152
-159
lines changed

7 files changed

+152
-159
lines changed

compaction.go

Lines changed: 55 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/cockroachdb/errors"
1919
"github.com/cockroachdb/pebble/internal/base"
2020
"github.com/cockroachdb/pebble/internal/compact"
21-
"github.com/cockroachdb/pebble/internal/invariants"
2221
"github.com/cockroachdb/pebble/internal/keyspan"
2322
"github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
2423
"github.com/cockroachdb/pebble/internal/manifest"
@@ -2163,14 +2162,26 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
21632162
// d.mu must be held when calling this method. The mutex will be released when
21642163
// doing IO.
21652164
func (d *DB) runCopyCompaction(
2166-
jobID JobID,
2167-
c *compaction,
2168-
inputMeta *fileMetadata,
2169-
objMeta objstorage.ObjectMetadata,
2170-
ve *versionEdit,
2171-
) error {
2172-
ctx := context.TODO()
2165+
jobID JobID, c *compaction,
2166+
) (ve *versionEdit, stats compact.Stats, _ error) {
2167+
iter := c.startLevel.files.Iter()
2168+
inputMeta := iter.First()
2169+
if iter.Next() != nil {
2170+
return nil, compact.Stats{}, base.AssertionFailedf("got more than one file for a move compaction")
2171+
}
2172+
if c.cancel.Load() {
2173+
return nil, compact.Stats{}, ErrCancelledCompaction
2174+
}
2175+
ve = &versionEdit{
2176+
DeletedFiles: map[deletedFileEntry]*fileMetadata{
2177+
{Level: c.startLevel.level, FileNum: inputMeta.FileNum}: inputMeta,
2178+
},
2179+
}
21732180

2181+
objMeta, err := d.objProvider.Lookup(fileTypeTable, inputMeta.FileBacking.DiskFileNum)
2182+
if err != nil {
2183+
return nil, compact.Stats{}, err
2184+
}
21742185
if !objMeta.IsExternal() {
21752186
if objMeta.IsRemote() || !remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level) {
21762187
panic("pebble: scheduled a copy compaction that is not actually moving files to shared storage")
@@ -2215,14 +2226,6 @@ func (d *DB) runCopyCompaction(
22152226
newMeta.InitPhysicalBacking()
22162227
}
22172228

2218-
c.metrics = map[int]*LevelMetrics{
2219-
c.outputLevel.level: {
2220-
BytesIn: inputMeta.Size,
2221-
BytesCompacted: inputMeta.Size,
2222-
TablesCompacted: 1,
2223-
},
2224-
}
2225-
22262229
// Before dropping the db mutex, grab a ref to the current version. This
22272230
// prevents any concurrent excises from deleting files that this compaction
22282231
// needs to read/maintain a reference to.
@@ -2244,11 +2247,12 @@ func (d *DB) runCopyCompaction(
22442247

22452248
// If the src obj is external, we're doing an external to local/shared copy.
22462249
if objMeta.IsExternal() {
2250+
ctx := context.TODO()
22472251
src, err := d.objProvider.OpenForReading(
22482252
ctx, fileTypeTable, inputMeta.FileBacking.DiskFileNum, objstorage.OpenOptions{},
22492253
)
22502254
if err != nil {
2251-
return err
2255+
return nil, compact.Stats{}, err
22522256
}
22532257
defer func() {
22542258
if src != nil {
@@ -2263,7 +2267,7 @@ func (d *DB) runCopyCompaction(
22632267
},
22642268
)
22652269
if err != nil {
2266-
return err
2270+
return nil, compact.Stats{}, err
22672271
}
22682272
deleteOnExit = true
22692273

@@ -2287,30 +2291,49 @@ func (d *DB) runCopyCompaction(
22872291
)
22882292
src = nil // We passed src to CopySpan; it's responsible for closing it.
22892293
if err != nil {
2290-
return err
2294+
if errors.Is(err, sstable.ErrEmptySpan) {
2295+
// The virtual table was empty. Just remove the backing file.
2296+
// Note that deleteOnExit is true so we will delete the created object.
2297+
c.metrics = map[int]*LevelMetrics{
2298+
c.outputLevel.level: {
2299+
BytesIn: inputMeta.Size,
2300+
},
2301+
}
2302+
return ve, compact.Stats{}, nil
2303+
}
2304+
return nil, compact.Stats{}, err
22912305
}
22922306
newMeta.FileBacking.Size = wrote
22932307
newMeta.Size = wrote
22942308
} else {
22952309
_, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
22962310
d.objProvider.Path(objMeta), fileTypeTable, newMeta.FileBacking.DiskFileNum,
22972311
objstorage.CreateOptions{PreferSharedStorage: true})
2298-
22992312
if err != nil {
2300-
return err
2313+
return nil, compact.Stats{}, err
23012314
}
23022315
deleteOnExit = true
23032316
}
2304-
ve.NewFiles[0].Meta = newMeta
2317+
ve.NewFiles = []newFileEntry{{
2318+
Level: c.outputLevel.level,
2319+
Meta: newMeta,
2320+
}}
23052321
if newMeta.Virtual {
23062322
ve.CreatedBackingTables = []*fileBacking{newMeta.FileBacking}
23072323
}
2324+
c.metrics = map[int]*LevelMetrics{
2325+
c.outputLevel.level: {
2326+
BytesIn: inputMeta.Size,
2327+
BytesCompacted: newMeta.Size,
2328+
TablesCompacted: 1,
2329+
},
2330+
}
23082331

23092332
if err := d.objProvider.Sync(); err != nil {
2310-
return err
2333+
return nil, compact.Stats{}, err
23112334
}
23122335
deleteOnExit = false
2313-
return nil
2336+
return ve, compact.Stats{}, nil
23142337
}
23152338

23162339
func (d *DB) runDeleteOnlyCompaction(
@@ -2334,23 +2357,17 @@ func (d *DB) runDeleteOnlyCompaction(
23342357
return ve, stats, nil
23352358
}
23362359

2337-
func (d *DB) runMoveOrCopyCompaction(
2360+
func (d *DB) runMoveCompaction(
23382361
jobID JobID, c *compaction,
23392362
) (ve *versionEdit, stats compact.Stats, _ error) {
23402363
iter := c.startLevel.files.Iter()
23412364
meta := iter.First()
2342-
if invariants.Enabled {
2343-
if iter.Next() != nil {
2344-
panic("got more than one file for a move or copy compaction")
2345-
}
2365+
if iter.Next() != nil {
2366+
return nil, stats, base.AssertionFailedf("got more than one file for a move compaction")
23462367
}
23472368
if c.cancel.Load() {
23482369
return ve, stats, ErrCancelledCompaction
23492370
}
2350-
objMeta, err := d.objProvider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
2351-
if err != nil {
2352-
return ve, stats, err
2353-
}
23542371
c.metrics = map[int]*LevelMetrics{
23552372
c.outputLevel.level: {
23562373
BytesMoved: meta.Size,
@@ -2365,12 +2382,8 @@ func (d *DB) runMoveOrCopyCompaction(
23652382
{Level: c.outputLevel.level, Meta: meta},
23662383
},
23672384
}
2368-
if c.kind == compactionKindMove {
2369-
return ve, stats, nil
2370-
}
23712385

2372-
err = d.runCopyCompaction(jobID, c, meta, objMeta, ve)
2373-
return ve, stats, err
2386+
return ve, stats, nil
23742387
}
23752388

23762389
// runCompaction runs a compaction that produces new on-disk tables from
@@ -2386,8 +2399,10 @@ func (d *DB) runCompaction(
23862399
switch c.kind {
23872400
case compactionKindDeleteOnly:
23882401
return d.runDeleteOnlyCompaction(jobID, c)
2389-
case compactionKindMove, compactionKindCopy:
2390-
return d.runMoveOrCopyCompaction(jobID, c)
2402+
case compactionKindMove:
2403+
return d.runMoveCompaction(jobID, c)
2404+
case compactionKindCopy:
2405+
return d.runCopyCompaction(jobID, c)
23912406
case compactionKindIngestedFlushable:
23922407
panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
23932408
}

ingest.go

Lines changed: 0 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -698,68 +698,6 @@ func (d *DB) ingestAttachRemote(jobID JobID, lr ingestLoadResult) error {
698698
}
699699
}
700700

701-
if invariants.Enabled || d.opts.Experimental.CheckExternalIngestions {
702-
// Verify that the ingestion is not empty. Empty tables cause correctness
703-
// issues in some iterators.
704-
for i := range lr.external {
705-
e := &lr.external[i]
706-
isEmpty, err := func() (bool, error) {
707-
readable, err := d.objProvider.OpenForReading(
708-
context.Background(), fileTypeTable, e.FileBacking.DiskFileNum, objstorage.OpenOptions{},
709-
)
710-
if err != nil {
711-
return false, err
712-
}
713-
reader, err := sstable.NewReader(readable, d.opts.MakeReaderOptions())
714-
if err != nil {
715-
readable.Close()
716-
return false, err
717-
}
718-
defer reader.Close()
719-
iter, err := reader.NewIter(e.IterTransforms(), nil, nil)
720-
if err != nil {
721-
return false, err
722-
}
723-
if iter != nil {
724-
defer iter.Close()
725-
if kv := iter.SeekGE(e.external.StartKey, base.SeekGEFlagsNone); kv != nil {
726-
cmp := d.opts.Comparer.Compare(kv.K.UserKey, e.external.EndKey)
727-
if cmp < 0 || (cmp == 0 && e.external.EndKeyIsInclusive) {
728-
return false, nil
729-
}
730-
}
731-
if err := iter.Error(); err != nil {
732-
return false, err
733-
}
734-
}
735-
rangeIter, err := reader.NewRawRangeDelIter(e.IterTransforms())
736-
if err != nil {
737-
return false, err
738-
}
739-
if rangeIter != nil {
740-
defer rangeIter.Close()
741-
span, err := rangeIter.SeekGE(e.external.StartKey)
742-
if err != nil {
743-
return false, err
744-
}
745-
if span != nil {
746-
cmp := d.opts.Comparer.Compare(span.Start, e.external.EndKey)
747-
if cmp < 0 || (cmp == 0 && e.external.EndKeyIsInclusive) {
748-
return false, nil
749-
}
750-
}
751-
}
752-
return true, nil
753-
}()
754-
if err != nil {
755-
return err
756-
}
757-
if isEmpty {
758-
panic(fmt.Sprintf("external ingestion is empty: %s (%q %q)", e.external.ObjName, e.external.StartKey, e.external.EndKey))
759-
}
760-
}
761-
}
762-
763701
if d.opts.EventListener.TableCreated != nil {
764702
for i := range remoteObjMetas {
765703
d.opts.EventListener.TableCreated(TableCreateInfo{

metamorphic/build.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -274,32 +274,6 @@ func openExternalObj(
274274
return reader, pointIter, rangeDelIter, rangeKeyIter
275275
}
276276

277-
// externalObjIsEmpty returns true if the given external object has no point or
278-
// range keys withing the given bounds.
279-
func externalObjIsEmpty(
280-
t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
281-
) bool {
282-
reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
283-
defer reader.Close()
284-
defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
285-
286-
kv := pointIter.First()
287-
panicIfErr(pointIter.Error())
288-
if kv != nil {
289-
return false
290-
}
291-
for _, it := range []keyspan.FragmentIterator{rangeDelIter, rangeKeyIter} {
292-
if it != nil {
293-
span, err := it.First()
294-
panicIfErr(err)
295-
if span != nil {
296-
return false
297-
}
298-
}
299-
}
300-
return true
301-
}
302-
303277
func panicIfErr(err error) {
304278
if err != nil {
305279
panic(err)

metamorphic/ops.go

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -995,33 +995,12 @@ type externalObjWithBounds struct {
995995
func (o *ingestExternalFilesOp) run(t *Test, h historyRecorder) {
996996
db := t.getDB(o.dbID)
997997

998-
// We modify objs to eliminate empty objects.
999-
var objs []externalObjWithBounds
1000-
for _, obj := range o.objs {
1001-
// Make sure the object exists and is not empty.
1002-
objMeta := t.getExternalObj(obj.externalObjID)
1003-
if m := objMeta.sstMeta; !m.HasPointKeys && !m.HasRangeKeys && !m.HasRangeDelKeys {
1004-
continue
1005-
}
1006-
if externalObjIsEmpty(t, obj.externalObjID, obj.bounds, obj.syntheticPrefix) {
1007-
// Currently we don't support ingesting external objects that have no point
1008-
// or range keys in the given range. Filter out any such objects.
1009-
// TODO(radu): even though we don't expect this case in practice, eventually
1010-
// we want to make sure that it doesn't cause failures.
1011-
continue
1012-
}
1013-
objs = append(objs, obj)
1014-
}
1015-
if len(objs) == 0 {
1016-
h.Recordf("%s // no-op", o)
1017-
return
1018-
}
1019998
var err error
1020999
if !t.testOpts.externalStorageEnabled {
10211000
// Emulate the operation by crating local, truncated SST files and ingesting
10221001
// them.
10231002
var paths []string
1024-
for i, obj := range objs {
1003+
for i, obj := range o.objs {
10251004
// Make sure the object exists and is not empty.
10261005
path, sstMeta := buildForIngestExternalEmulation(
10271006
t, o.dbID, obj.externalObjID, obj.bounds, obj.syntheticSuffix, obj.syntheticPrefix, i,
@@ -1034,8 +1013,8 @@ func (o *ingestExternalFilesOp) run(t *Test, h historyRecorder) {
10341013
err = db.Ingest(paths)
10351014
}
10361015
} else {
1037-
external := make([]pebble.ExternalFile, len(objs))
1038-
for i, obj := range objs {
1016+
external := make([]pebble.ExternalFile, len(o.objs))
1017+
for i, obj := range o.objs {
10391018
meta := t.getExternalObj(obj.externalObjID)
10401019
external[i] = pebble.ExternalFile{
10411020
Locator: "external",

options.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -682,10 +682,6 @@ type Options struct {
682682
// major version is at least `FormatFlushableIngest`.
683683
DisableIngestAsFlushable func() bool
684684

685-
// CheckExternalIngestions enables opening external ssts at ingest time and
686-
// validating that they are not empty. Used for testing/debugging.
687-
CheckExternalIngestions bool
688-
689685
// RemoteStorage enables use of remote storage (e.g. S3) for storing
690686
// sstables. Setting this option enables use of CreateOnShared option and
691687
// allows ingestion of external files.

sstable/copier.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ import (
3131
//
3232
// The resulting sstable will have no block properties.
3333
//
34-
// Closes input and finishes or aborts output, including on non-nil errors.
34+
// The function might return ErrEmptySpan if there are no blocks that could
35+
// include keys in the given range. See ErrEmptySpan for more details.
36+
//
37+
// Closes input and finishes or aborts output in all cases, including on errors.
3538
//
3639
// Note that CopySpan is not aware of any suffix or prefix replacement; the
3740
// caller must account for those when specifying the bounds.
@@ -81,7 +84,8 @@ func CopySpan(
8184
}()
8285

8386
if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
84-
return 0, errors.New("cannot CopySpan sstables with value blocks or range keys")
87+
// We just checked for these conditions above.
88+
return 0, base.AssertionFailedf("cannot CopySpan sstables with value blocks or range keys")
8589
}
8690

8791
// Set the filter block to be copied over if it exists. It will return false
@@ -128,7 +132,7 @@ func CopySpan(
128132
// a non-empty sst by copying something outside the span, but #3907 means that
129133
// the empty virtual span would still be a problem, so don't bother.
130134
if len(blocks) < 1 {
131-
return 0, errors.Newf("CopySpan cannot copy empty span %s %s", start, end)
135+
return 0, ErrEmptySpan
132136
}
133137

134138
// Find the span of the input file that contains all our blocks, and then copy
@@ -171,6 +175,15 @@ func CopySpan(
171175
return wrote, nil
172176
}
173177

178+
// ErrEmptySpan is returned by CopySpan if the input sstable has no keys in the
179+
// requested span.
180+
//
181+
// Note that CopySpan's determination of block overlap is best effort - we may
182+
// copy a block that doesn't actually contain any keys in the span, in which
183+
// case we won't generate this error. We currently only generate this error when
184+
// the span start is beyond all keys in the physical sstable.
185+
var ErrEmptySpan = errors.New("cannot copy empty span")
186+
174187
// indexEntry captures the two components of an sst index entry: the key and the
175188
// decoded block handle value.
176189
type indexEntry struct {

0 commit comments

Comments
 (0)