Skip to content

Commit d9573d2

Browse files
committed
storage: add method to ingest external files, rename IngestExternalFiles
This change renames the existing IngestExternalFiles method on storage.Engine to IngestLocalFiles, and adds a new IngestExternalFiles that ingests pebble.ExternalFile, for use with online restore. Depends on cockroachdb/pebble#2753. Epic: none Release note: None
1 parent 1f8fa96 commit d9573d2

File tree

9 files changed

+61
-35
lines changed

9 files changed

+61
-35
lines changed

pkg/kv/kvserver/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ go_library(
224224
"@com_github_cockroachdb_logtags//:logtags",
225225
"@com_github_cockroachdb_pebble//:pebble",
226226
"@com_github_cockroachdb_pebble//objstorage",
227+
"@com_github_cockroachdb_pebble//objstorage/remote",
227228
"@com_github_cockroachdb_pebble//vfs",
228229
"@com_github_cockroachdb_redact//:redact",
229230
"@com_github_gogo_protobuf//proto",

pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,7 +1249,7 @@ func TestEvalAddSSTable(t *testing.T) {
12491249
} else {
12501250
require.NotNil(t, result.Replicated.AddSSTable)
12511251
require.NoError(t, fs.WriteFile(engine, "sst", result.Replicated.AddSSTable.Data))
1252-
require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"}))
1252+
require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"}))
12531253
}
12541254

12551255
var expect kvs
@@ -1652,7 +1652,7 @@ func TestAddSSTableMVCCStats(t *testing.T) {
16521652
require.NoError(t, err)
16531653

16541654
require.NoError(t, fs.WriteFile(engine, "sst", sst))
1655-
require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"}))
1655+
require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"}))
16561656

16571657
statsEvaled := statsBefore
16581658
statsEvaled.Add(*cArgs.Stats)

pkg/kv/kvserver/replica_proposal.go

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import (
3737
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3838
"github.com/cockroachdb/cockroach/pkg/util/tracing"
3939
"github.com/cockroachdb/errors"
40+
"github.com/cockroachdb/pebble"
41+
"github.com/cockroachdb/pebble/objstorage/remote"
4042
"github.com/cockroachdb/redact"
4143
"github.com/kr/pretty"
4244
"golang.org/x/time/rate"
@@ -628,19 +630,32 @@ func addSSTablePreApply(
628630
sst.Span,
629631
sst.RemoteFileLoc,
630632
)
631-
// TODO(bilal): replace this with the real ingest.
632-
/*
633-
start := storage.EngineKey{Key: sst.Span.Key}
634-
end := storage.EngineKey{Key: sst.Span.EndKey}
635-
636-
externalFile := pebble.ExternalFile{
637-
Locator: shared.Locator(sst.RemoteFileLoc),
638-
ObjName: sst.RemoteFilePath,
639-
Size: sst.BackingFileSize,
640-
SmallestUserKey: start.Encode(),
641-
LargestUserKey: end.Encode(),
642-
}*/
643-
log.Fatalf(ctx, "Unsupported IngestRemoteFile")
633+
start := storage.EngineKey{Key: sst.Span.Key}
634+
end := storage.EngineKey{Key: sst.Span.EndKey}
635+
externalFile := pebble.ExternalFile{
636+
Locator: remote.Locator(sst.RemoteFileLoc),
637+
ObjName: sst.RemoteFilePath,
638+
Size: sst.BackingFileSize,
639+
SmallestUserKey: start.Encode(),
640+
LargestUserKey: end.Encode(),
641+
}
642+
tBegin := timeutil.Now()
643+
defer func() {
644+
if dur := timeutil.Since(tBegin); dur > addSSTPreApplyWarn.threshold && addSSTPreApplyWarn.ShouldLog() {
645+
log.Infof(ctx,
646+
"ingesting SST of size %s at index %d took %.2fs",
647+
humanizeutil.IBytes(int64(len(sst.Data))), index, dur.Seconds(),
648+
)
649+
}
650+
}()
651+
652+
_, ingestErr := env.eng.IngestExternalFiles(ctx, []pebble.ExternalFile{externalFile})
653+
if ingestErr != nil {
654+
log.Fatalf(ctx, "while ingesting %s: %v", sst.RemoteFilePath, ingestErr)
655+
}
656+
// Adding without modification succeeded, no copy necessary.
657+
log.Eventf(ctx, "ingested SSTable at index %d, term %d: external %s", index, term, sst.RemoteFilePath)
658+
return false
644659
}
645660
checksum := util.CRC32(sst.Data)
646661

@@ -685,7 +700,7 @@ func addSSTablePreApply(
685700
}
686701

687702
// Regular path - we made a hard link, so we can ingest the hard link now.
688-
ingestErr := env.eng.IngestExternalFiles(ctx, []string{ingestPath})
703+
ingestErr := env.eng.IngestLocalFiles(ctx, []string{ingestPath})
689704
if ingestErr != nil {
690705
log.Fatalf(ctx, "while ingesting %s: %v", ingestPath, ingestErr)
691706
}
@@ -726,7 +741,7 @@ func ingestViaCopy(
726741
if err := kvserverbase.WriteFileSyncing(ctx, ingestPath, sst.Data, eng, 0600, st, limiter); err != nil {
727742
return errors.Wrapf(err, "while ingesting %s", ingestPath)
728743
}
729-
if err := eng.IngestExternalFiles(ctx, []string{ingestPath}); err != nil {
744+
if err := eng.IngestLocalFiles(ctx, []string{ingestPath}); err != nil {
730745
return errors.Wrapf(err, "while ingesting %s", ingestPath)
731746
}
732747
log.Eventf(ctx, "ingested SSTable at index %d, term %d: %s", index, term, ingestPath)

pkg/kv/kvserver/replica_raftstorage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,7 @@ func (r *Replica) applySnapshot(
578578
// TODO: separate ingestions for log and statemachine engine. See:
579579
//
580580
// https://github.com/cockroachdb/cockroach/issues/93251
581-
r.store.TODOEngine().IngestExternalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
581+
r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
582582
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
583583
}
584584
if r.store.cfg.KVAdmissionController != nil {

pkg/storage/bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1944,7 +1944,7 @@ func BenchmarkMVCCScannerWithIntentsAndVersions(b *testing.B) {
19441944
}
19451945
require.NoError(b, writer.Close())
19461946
batch.Close()
1947-
require.NoError(b, eng.IngestExternalFiles(ctx, []string{sstFileName}))
1947+
require.NoError(b, eng.IngestLocalFiles(ctx, []string{sstFileName}))
19481948
}
19491949
for i := 0; i < b.N; i++ {
19501950
rw := eng.NewReadOnly(StandardDurability)

pkg/storage/engine.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,18 +1024,22 @@ type Engine interface {
10241024
NewSnapshot() Reader
10251025
// Type returns engine type.
10261026
Type() enginepb.EngineType
1027-
// IngestExternalFiles atomically links a slice of files into the RocksDB
1027+
// IngestLocalFiles atomically links a slice of files into the RocksDB
10281028
// log-structured merge-tree.
1029-
IngestExternalFiles(ctx context.Context, paths []string) error
1030-
// IngestExternalFilesWithStats is a variant of IngestExternalFiles that
1029+
IngestLocalFiles(ctx context.Context, paths []string) error
1030+
// IngestLocalFilesWithStats is a variant of IngestLocalFiles that
10311031
// additionally returns ingestion stats.
1032-
IngestExternalFilesWithStats(
1032+
IngestLocalFilesWithStats(
10331033
ctx context.Context, paths []string) (pebble.IngestOperationStats, error)
1034-
// IngestAndExciseExternalFiles is a variant of IngestExternalFilesWithStats
1034+
// IngestAndExciseFiles is a variant of IngestLocalFilesWithStats
10351035
// that excises an ExciseSpan, and ingests either local or shared sstables or
10361036
// both.
1037-
IngestAndExciseExternalFiles(
1037+
IngestAndExciseFiles(
10381038
ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span) (pebble.IngestOperationStats, error)
1039+
// IngestExternalFiles is a variant of IngestLocalFiles that takes external
1040+
// files. These files can be referred to by multiple stores, but are not
1041+
// modified or deleted by the Engine doing the ingestion.
1042+
IngestExternalFiles(ctx context.Context, external []pebble.ExternalFile) (pebble.IngestOperationStats, error)
10391043
// PreIngestDelay offers an engine the chance to backpressure ingestions.
10401044
// When called, it may choose to block if the engine determines that it is in
10411045
// or approaching a state where further ingestions may risk its health.

pkg/storage/metamorphic/operations.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,7 @@ func (i ingestOp) run(ctx context.Context) string {
779779
}
780780
sstWriter.Close()
781781

782-
if err := i.m.engine.IngestExternalFiles(ctx, []string{sstPath}); err != nil {
782+
if err := i.m.engine.IngestLocalFiles(ctx, []string{sstPath}); err != nil {
783783
return fmt.Sprintf("error = %s", err.Error())
784784
}
785785

pkg/storage/mvcc_incremental_iterator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1371,7 +1371,7 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) {
13711371
if err := fs.WriteFile(db2, `ingest`, memFile.Data()); err != nil {
13721372
t.Fatal(err)
13731373
}
1374-
if err := db2.IngestExternalFiles(ctx, []string{`ingest`}); err != nil {
1374+
if err := db2.IngestLocalFiles(ctx, []string{`ingest`}); err != nil {
13751375
t.Fatal(err)
13761376
}
13771377
}

pkg/storage/pebble.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -582,8 +582,7 @@ func DefaultPebbleOptions() *pebble.Options {
582582
MemTableStopWritesThreshold: 4,
583583
Merger: MVCCMerger,
584584
BlockPropertyCollectors: PebbleBlockPropertyCollectors,
585-
// Minimum supported format.
586-
FormatMajorVersion: MinimumSupportedFormatVersion,
585+
FormatMajorVersion: MinimumSupportedFormatVersion,
587586
}
588587
// Automatically flush 10s after the first range tombstone is added to a
589588
// memtable. This ensures that we can reclaim space even when there's no
@@ -2041,20 +2040,20 @@ func (p *Pebble) Type() enginepb.EngineType {
20412040
return enginepb.EngineTypePebble
20422041
}
20432042

2044-
// IngestExternalFiles implements the Engine interface.
2045-
func (p *Pebble) IngestExternalFiles(ctx context.Context, paths []string) error {
2043+
// IngestLocalFiles implements the Engine interface.
2044+
func (p *Pebble) IngestLocalFiles(ctx context.Context, paths []string) error {
20462045
return p.db.Ingest(paths)
20472046
}
20482047

2049-
// IngestExternalFilesWithStats implements the Engine interface.
2050-
func (p *Pebble) IngestExternalFilesWithStats(
2048+
// IngestLocalFilesWithStats implements the Engine interface.
2049+
func (p *Pebble) IngestLocalFilesWithStats(
20512050
ctx context.Context, paths []string,
20522051
) (pebble.IngestOperationStats, error) {
20532052
return p.db.IngestWithStats(paths)
20542053
}
20552054

2056-
// IngestAndExciseExternalFiles implements the Engine interface.
2057-
func (p *Pebble) IngestAndExciseExternalFiles(
2055+
// IngestAndExciseFiles implements the Engine interface.
2056+
func (p *Pebble) IngestAndExciseFiles(
20582057
ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span,
20592058
) (pebble.IngestOperationStats, error) {
20602059
rawSpan := pebble.KeyRange{
@@ -2064,6 +2063,13 @@ func (p *Pebble) IngestAndExciseExternalFiles(
20642063
return p.db.IngestAndExcise(paths, shared, rawSpan)
20652064
}
20662065

2066+
// IngestExternalFiles implements the Engine interface.
2067+
func (p *Pebble) IngestExternalFiles(
2068+
ctx context.Context, external []pebble.ExternalFile,
2069+
) (pebble.IngestOperationStats, error) {
2070+
return p.db.IngestExternalFiles(external)
2071+
}
2072+
20672073
// PreIngestDelay implements the Engine interface.
20682074
func (p *Pebble) PreIngestDelay(ctx context.Context) {
20692075
preIngestDelay(ctx, p, p.settings)

0 commit comments

Comments
 (0)