Skip to content

Commit 1010734

Browse files
committed
db: plumb context through ingest methods
Informs #3728
1 parent 80a5615 commit 1010734

16 files changed

+97
-85
lines changed

compaction.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,7 +1279,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
12791279
iter := overlaps.Iter()
12801280

12811281
for m := iter.First(); m != nil; m = iter.Next() {
1282-
newFiles, err := d.excise(ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
1282+
newFiles, err := d.excise(context.TODO(), ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
12831283
if err != nil {
12841284
return nil, err
12851285
}
@@ -1298,7 +1298,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
12981298
}
12991299

13001300
if len(ingestSplitFiles) > 0 {
1301-
if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
1301+
if err := d.ingestSplit(context.TODO(), ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
13021302
return nil, err
13031303
}
13041304
}

compaction_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2106,7 +2106,7 @@ func TestCompactionErrorCleanup(t *testing.T) {
21062106
require.NoError(t, w.Set([]byte(k), nil))
21072107
}
21082108
require.NoError(t, w.Close())
2109-
require.NoError(t, d.Ingest([]string{"ext"}))
2109+
require.NoError(t, d.Ingest(context.Background(), []string{"ext"}))
21102110
}
21112111
ingest("a", "c")
21122112
ingest("b")
@@ -2591,7 +2591,7 @@ func TestCompaction_LogAndApplyFails(t *testing.T) {
25912591
require.NoError(t, w.Set(key, nil))
25922592
require.NoError(t, w.Close())
25932593
// Ingest the SST.
2594-
return db.Ingest([]string{fName})
2594+
return db.Ingest(context.Background(), []string{fName})
25952595
}
25962596

25972597
testCases := []struct {
@@ -2800,7 +2800,7 @@ func TestCompactionErrorStats(t *testing.T) {
28002800
require.NoError(t, w.Set([]byte(k), nil))
28012801
}
28022802
require.NoError(t, w.Close())
2803-
require.NoError(t, d.Ingest([]string{"ext"}))
2803+
require.NoError(t, d.Ingest(context.Background(), []string{"ext"}))
28042804
}
28052805
ingest("a", "c")
28062806
// Snapshot will preserve the older "a" key during compaction.

data_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,7 +1352,7 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
13521352
}
13531353
}
13541354

1355-
if _, err := d.IngestAndExcise(paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil {
1355+
if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil {
13561356
return err
13571357
}
13581358
return nil
@@ -1364,7 +1364,7 @@ func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
13641364
paths = append(paths, arg.String())
13651365
}
13661366

1367-
if err := d.Ingest(paths); err != nil {
1367+
if err := d.Ingest(context.Background(), paths); err != nil {
13681368
return err
13691369
}
13701370
return nil
@@ -1444,7 +1444,7 @@ func runIngestExternalCmd(
14441444
external = append(external, ef)
14451445
}
14461446

1447-
if _, err := d.IngestExternalFiles(external); err != nil {
1447+
if _, err := d.IngestExternalFiles(context.Background(), external); err != nil {
14481448
return err
14491449
}
14501450
return nil

db_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,7 +1110,7 @@ func TestDBClosed(t *testing.T) {
11101110
require.True(t, errors.Is(catch(func() { _, _, _ = d.Get(nil) }), ErrClosed))
11111111
require.True(t, errors.Is(catch(func() { _ = d.Delete(nil, nil) }), ErrClosed))
11121112
require.True(t, errors.Is(catch(func() { _ = d.DeleteRange(nil, nil, nil) }), ErrClosed))
1113-
require.True(t, errors.Is(catch(func() { _ = d.Ingest(nil) }), ErrClosed))
1113+
require.True(t, errors.Is(catch(func() { _ = d.Ingest(context.Background(), nil) }), ErrClosed))
11141114
require.True(t, errors.Is(catch(func() { _ = d.LogData(nil, nil) }), ErrClosed))
11151115
require.True(t, errors.Is(catch(func() { _ = d.Merge(nil, nil, nil) }), ErrClosed))
11161116
require.True(t, errors.Is(catch(func() { _ = d.RatchetFormatMajorVersion(internalFormatNewest) }), ErrClosed))
@@ -1182,7 +1182,7 @@ func TestDBConcurrentCompactClose(t *testing.T) {
11821182
})
11831183
require.NoError(t, w.Set([]byte(fmt.Sprint(j)), nil))
11841184
require.NoError(t, w.Close())
1185-
require.NoError(t, d.Ingest([]string{path}))
1185+
require.NoError(t, d.Ingest(context.Background(), []string{path}))
11861186
}
11871187

11881188
require.NoError(t, d.Close())
@@ -1642,7 +1642,7 @@ func TestMemtableIngestInversion(t *testing.T) {
16421642
})
16431643
require.NoError(t, w.Set([]byte("cc"), []byte("foo")))
16441644
require.NoError(t, w.Close())
1645-
require.NoError(t, d.Ingest([]string{path}))
1645+
require.NoError(t, d.Ingest(context.Background(), []string{path}))
16461646
}
16471647
{
16481648
path := "ingest2.sst"
@@ -1654,7 +1654,7 @@ func TestMemtableIngestInversion(t *testing.T) {
16541654
require.NoError(t, w.Set([]byte("bb"), []byte("foo2")))
16551655
require.NoError(t, w.Set([]byte("cc"), []byte("foo2")))
16561656
require.NoError(t, w.Close())
1657-
require.NoError(t, d.Ingest([]string{path}))
1657+
require.NoError(t, d.Ingest(context.Background(), []string{path}))
16581658
}
16591659
{
16601660
path := "ingest3.sst"
@@ -1665,7 +1665,7 @@ func TestMemtableIngestInversion(t *testing.T) {
16651665
})
16661666
require.NoError(t, w.Set([]byte("bb"), []byte("foo3")))
16671667
require.NoError(t, w.Close())
1668-
require.NoError(t, d.Ingest([]string{path}))
1668+
require.NoError(t, d.Ingest(context.Background(), []string{path}))
16691669
}
16701670
{
16711671
path := "ingest4.sst"
@@ -1676,7 +1676,7 @@ func TestMemtableIngestInversion(t *testing.T) {
16761676
})
16771677
require.NoError(t, w.Set([]byte("bb"), []byte("foo4")))
16781678
require.NoError(t, w.Close())
1679-
require.NoError(t, d.Ingest([]string{path}))
1679+
require.NoError(t, d.Ingest(context.Background(), []string{path}))
16801680
}
16811681

16821682
// We now have a base compaction blocked. Block a memtable flush to cause
@@ -1755,7 +1755,7 @@ func TestMemtableIngestInversion(t *testing.T) {
17551755
})
17561756
require.NoError(t, w.DeleteRange([]byte("cc"), []byte("e")))
17571757
require.NoError(t, w.Close())
1758-
require.NoError(t, d.Ingest([]string{path}))
1758+
require.NoError(t, d.Ingest(context.Background(), []string{path}))
17591759
}
17601760
t.Log("main ingest complete")
17611761
printLSM()
@@ -1789,7 +1789,7 @@ func TestMemtableIngestInversion(t *testing.T) {
17891789
})
17901790
require.NoError(t, w.Set([]byte("cc"), []byte("doesntmatter")))
17911791
require.NoError(t, w.Close())
1792-
require.NoError(t, d.Ingest([]string{path}))
1792+
require.NoError(t, d.Ingest(context.Background(), []string{path}))
17931793
}
17941794

17951795
// Unblock earlier flushes. We will first finish flushing the blocked

event_listener_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package pebble
66

77
import (
88
"bytes"
9+
"context"
910
"fmt"
1011
"reflect"
1112
"runtime"
@@ -147,7 +148,7 @@ func TestEventListener(t *testing.T) {
147148
if err := w.Close(); err != nil {
148149
return err.Error()
149150
}
150-
if err := d.Ingest([]string{"ext/0"}); err != nil {
151+
if err := d.Ingest(context.Background(), []string{"ext/0"}); err != nil {
151152
return err.Error()
152153
}
153154
return memLog.String()
@@ -190,7 +191,7 @@ func TestEventListener(t *testing.T) {
190191
if err := writeTable(tableB, 'b'); err != nil {
191192
return err.Error()
192193
}
193-
if err := d.Ingest([]string{tableA, tableB}); err != nil {
194+
if err := d.Ingest(context.Background(), []string{tableA, tableB}); err != nil {
194195
return err.Error()
195196
}
196197

flushable_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pebble
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"testing"
78

@@ -57,7 +58,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
5758

5859
// We can reuse the ingestLoad function for this test even if we're
5960
// not actually ingesting a file.
60-
lr, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs)
61+
lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs)
6162
if err != nil {
6263
panic(err)
6364
}
@@ -85,7 +86,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
8586
// (e.g. because the files reside on a different filesystem), ingestLink will
8687
// fall back to copying, and if that fails we undo our work and return an
8788
// error.
88-
if err := ingestLinkLocal(jobID, d.opts, d.objProvider, lr.local); err != nil {
89+
if err := ingestLinkLocal(context.Background(), jobID, d.opts, d.objProvider, lr.local); err != nil {
8990
panic("couldn't hard link sstables")
9091
}
9192

ingest.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ func (r *ingestLoadResult) fileCount() int {
423423
}
424424

425425
func ingestLoad(
426+
ctx context.Context,
426427
opts *Options,
427428
fmv FormatMajorVersion,
428429
paths []string,
@@ -431,8 +432,6 @@ func ingestLoad(
431432
cacheID uint64,
432433
pending []base.FileNum,
433434
) (ingestLoadResult, error) {
434-
ctx := context.TODO()
435-
436435
localFileNums := pending[:len(paths)]
437436
sharedFileNums := pending[len(paths) : len(paths)+len(shared)]
438437
externalFileNums := pending[len(paths)+len(shared) : len(paths)+len(shared)+len(external)]
@@ -590,11 +589,15 @@ func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) erro
590589
// ingestLinkLocal creates new objects which are backed by either hardlinks to or
591590
// copies of the ingested files.
592591
func ingestLinkLocal(
593-
jobID JobID, opts *Options, objProvider objstorage.Provider, localMetas []ingestLocalMeta,
592+
ctx context.Context,
593+
jobID JobID,
594+
opts *Options,
595+
objProvider objstorage.Provider,
596+
localMetas []ingestLocalMeta,
594597
) error {
595598
for i := range localMetas {
596599
objMeta, err := objProvider.LinkOrCopyFromLocal(
597-
context.TODO(), opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum,
600+
ctx, opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum,
598601
objstorage.CreateOptions{PreferSharedStorage: true},
599602
)
600603
if err != nil {
@@ -1027,14 +1030,14 @@ func ingestTargetLevel(
10271030
// can produce a noticeable hiccup in performance. See
10281031
// https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
10291032
// this hiccup.
1030-
func (d *DB) Ingest(paths []string) error {
1033+
func (d *DB) Ingest(ctx context.Context, paths []string) error {
10311034
if err := d.closed.Load(); err != nil {
10321035
panic(err)
10331036
}
10341037
if d.opts.ReadOnly {
10351038
return ErrReadOnly
10361039
}
1037-
_, err := d.ingest(paths, nil /* shared */, KeyRange{}, false, nil /* external */)
1040+
_, err := d.ingest(ctx, paths, nil /* shared */, KeyRange{}, false, nil /* external */)
10381041
return err
10391042
}
10401043

@@ -1115,21 +1118,23 @@ type ExternalFile struct {
11151118

11161119
// IngestWithStats does the same as Ingest, and additionally returns
11171120
// IngestOperationStats.
1118-
func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) {
1121+
func (d *DB) IngestWithStats(ctx context.Context, paths []string) (IngestOperationStats, error) {
11191122
if err := d.closed.Load(); err != nil {
11201123
panic(err)
11211124
}
11221125
if d.opts.ReadOnly {
11231126
return IngestOperationStats{}, ErrReadOnly
11241127
}
1125-
return d.ingest(paths, nil, KeyRange{}, false, nil)
1128+
return d.ingest(ctx, paths, nil, KeyRange{}, false, nil)
11261129
}
11271130

11281131
// IngestExternalFiles does the same as IngestWithStats, and additionally
11291132
// accepts external files (with locator info that can be resolved using
11301133
// d.opts.SharedStorage). These files must also be non-overlapping with
11311134
// each other, and must be resolvable through d.objProvider.
1132-
func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, error) {
1135+
func (d *DB) IngestExternalFiles(
1136+
ctx context.Context, external []ExternalFile,
1137+
) (IngestOperationStats, error) {
11331138
if err := d.closed.Load(); err != nil {
11341139
panic(err)
11351140
}
@@ -1140,7 +1145,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats,
11401145
if d.opts.Experimental.RemoteStorage == nil {
11411146
return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
11421147
}
1143-
return d.ingest(nil, nil, KeyRange{}, false, external)
1148+
return d.ingest(ctx, nil, nil, KeyRange{}, false, external)
11441149
}
11451150

11461151
// IngestAndExcise does the same as IngestWithStats, and additionally accepts a
@@ -1154,6 +1159,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats,
11541159
// Panics if this DB instance was not instantiated with a remote.Storage and
11551160
// shared sstables are present.
11561161
func (d *DB) IngestAndExcise(
1162+
ctx context.Context,
11571163
paths []string,
11581164
shared []SharedSSTMeta,
11591165
external []ExternalFile,
@@ -1181,7 +1187,7 @@ func (d *DB) IngestAndExcise(
11811187
v, FormatMinForSharedObjects,
11821188
)
11831189
}
1184-
return d.ingest(paths, shared, exciseSpan, sstsContainExciseTombstone, external)
1190+
return d.ingest(ctx, paths, shared, exciseSpan, sstsContainExciseTombstone, external)
11851191
}
11861192

11871193
// Both DB.mu and commitPipeline.mu must be held while this is called.
@@ -1303,6 +1309,7 @@ func (d *DB) handleIngestAsFlushable(
13031309

13041310
// See comment at Ingest() for details on how this works.
13051311
func (d *DB) ingest(
1312+
ctx context.Context,
13061313
paths []string,
13071314
shared []SharedSSTMeta,
13081315
exciseSpan KeyRange,
@@ -1325,7 +1332,6 @@ func (d *DB) ingest(
13251332
}
13261333
}
13271334
}
1328-
ctx := context.Background()
13291335
// Allocate file numbers for all of the files being ingested and mark them as
13301336
// pending in order to prevent them from being deleted. Note that this causes
13311337
// the file number ordering to be out of alignment with sequence number
@@ -1342,7 +1348,7 @@ func (d *DB) ingest(
13421348

13431349
// Load the metadata for all the files being ingested. This step detects
13441350
// and elides empty sstables.
1345-
loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs)
1351+
loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs)
13461352
if err != nil {
13471353
return IngestOperationStats{}, err
13481354
}
@@ -1362,7 +1368,7 @@ func (d *DB) ingest(
13621368
// (e.g. because the files reside on a different filesystem), ingestLinkLocal
13631369
// will fall back to copying, and if that fails we undo our work and return an
13641370
// error.
1365-
if err := ingestLinkLocal(jobID, d.opts, d.objProvider, loadResult.local); err != nil {
1371+
if err := ingestLinkLocal(ctx, jobID, d.opts, d.objProvider, loadResult.local); err != nil {
13661372
return IngestOperationStats{}, err
13671373
}
13681374

@@ -1697,7 +1703,7 @@ func (d *DB) ingest(
16971703
//
16981704
// The manifest lock must be held when calling this method.
16991705
func (d *DB) excise(
1700-
exciseSpan base.UserKeyBounds, m *fileMetadata, ve *versionEdit, level int,
1706+
ctx context.Context, exciseSpan base.UserKeyBounds, m *fileMetadata, ve *versionEdit, level int,
17011707
) ([]manifest.NewFileEntry, error) {
17021708
numCreatedFiles := 0
17031709
// Check if there's actually an overlap between m and exciseSpan.
@@ -1722,7 +1728,7 @@ func (d *DB) excise(
17221728
return nil
17231729
}
17241730
var err error
1725-
iters, err = d.newIters(context.TODO(), m, &IterOptions{
1731+
iters, err = d.newIters(ctx, m, &IterOptions{
17261732
CategoryAndQoS: sstable.CategoryAndQoS{
17271733
Category: "pebble-ingest",
17281734
QoSLevel: sstable.LatencySensitiveQoSLevel,
@@ -1982,6 +1988,7 @@ type ingestSplitFile struct {
19821988
//
19831989
// d.mu as well as the manifest lock must be held when calling this method.
19841990
func (d *DB) ingestSplit(
1991+
ctx context.Context,
19851992
ve *versionEdit,
19861993
updateMetrics func(*fileMetadata, int, []newFileEntry),
19871994
files []ingestSplitFile,
@@ -2047,7 +2054,7 @@ func (d *DB) ingestSplit(
20472054
// as we're guaranteed to not have any data overlap between splitFile and
20482055
// s.ingestFile. d.excise will return an error if we pass an inclusive user
20492056
// key bound _and_ we end up seeing data overlap at the end key.
2050-
added, err := d.excise(base.UserKeyBoundsFromInternal(s.ingestFile.Smallest, s.ingestFile.Largest), splitFile, ve, s.level)
2057+
added, err := d.excise(ctx, base.UserKeyBoundsFromInternal(s.ingestFile.Smallest, s.ingestFile.Largest), splitFile, ve, s.level)
20512058
if err != nil {
20522059
return err
20532060
}
@@ -2288,7 +2295,7 @@ func (d *DB) ingestApply(
22882295
iter := overlaps.Iter()
22892296

22902297
for m := iter.First(); m != nil; m = iter.Next() {
2291-
newFiles, err := d.excise(exciseSpan.UserKeyBounds(), m, ve, level)
2298+
newFiles, err := d.excise(ctx, exciseSpan.UserKeyBounds(), m, ve, level)
22922299
if err != nil {
22932300
return nil, err
22942301
}
@@ -2308,7 +2315,7 @@ func (d *DB) ingestApply(
23082315
if len(filesToSplit) > 0 {
23092316
// For the same reasons as the above call to excise, we hold the db mutex
23102317
// while calling this method.
2311-
if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
2318+
if err := d.ingestSplit(ctx, ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
23122319
return nil, err
23132320
}
23142321
}

0 commit comments

Comments
 (0)