Skip to content

Commit 3771e33

Browse files
committed
storage: Update Engine/Reader/Writer interfaces for ScanInternal
This change updates pkg/storage interfaces and implementations to allow the use of ScanInternal in skip-shared iteration mode as well as writing/reading of internal point keys, range dels and range keys. Replication / snapshot code will soon rely on these changes to be able to replicate internal keys in higher levels plus metadata of shared sstables in lower levels, as opposed to just observed user keys. Part of cockroachdb#103028 Epic: none Release note: None
1 parent 4c34b48 commit 3771e33

File tree

8 files changed

+275
-6
lines changed

8 files changed

+275
-6
lines changed

pkg/kv/kvserver/spanset/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ go_library(
1919
"//pkg/util/uuid",
2020
"@com_github_cockroachdb_errors//:errors",
2121
"@com_github_cockroachdb_pebble//:pebble",
22+
"@com_github_cockroachdb_pebble//rangekey",
2223
],
2324
)
2425

pkg/kv/kvserver/spanset/batch.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
2323
"github.com/cockroachdb/cockroach/pkg/util/uuid"
2424
"github.com/cockroachdb/pebble"
25+
"github.com/cockroachdb/pebble/rangekey"
2526
)
2627

2728
// MVCCIterator wraps an storage.MVCCIterator and ensures that it can
@@ -445,6 +446,17 @@ type spanSetReader struct {
445446

446447
var _ storage.Reader = spanSetReader{}
447448

449+
func (s spanSetReader) ScanInternal(
450+
ctx context.Context,
451+
lower, upper roachpb.Key,
452+
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue, info pebble.IteratorLevel) error,
453+
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
454+
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
455+
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
456+
) error {
457+
return s.r.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
458+
}
459+
448460
func (s spanSetReader) Close() {
449461
s.r.Close()
450462
}
@@ -762,6 +774,18 @@ type spanSetBatch struct {
762774

763775
var _ storage.Batch = spanSetBatch{}
764776

777+
func (s spanSetBatch) ScanInternal(
778+
ctx context.Context,
779+
lower, upper roachpb.Key,
780+
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue, info pebble.IteratorLevel) error,
781+
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
782+
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
783+
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
784+
) error {
785+
// Only used on Engine.
786+
panic("unimplemented")
787+
}
788+
765789
func (s spanSetBatch) Commit(sync bool) error {
766790
return s.b.Commit(sync)
767791
}
@@ -794,6 +818,18 @@ func (s spanSetBatch) CommitStats() storage.BatchCommitStats {
794818
return s.b.CommitStats()
795819
}
796820

821+
func (s spanSetBatch) PutInternalRangeKey(start, end []byte, key rangekey.Key) error {
822+
return s.b.PutInternalRangeKey(start, end, key)
823+
}
824+
825+
func (s spanSetBatch) PutInternalPointKey(key *pebble.InternalKey, value []byte) error {
826+
return s.b.PutInternalPointKey(key, value)
827+
}
828+
829+
func (s spanSetBatch) ClearRawEncodedRange(start, end []byte) error {
830+
return s.b.ClearRawEncodedRange(start, end)
831+
}
832+
797833
// NewBatch returns a storage.Batch that asserts access of the underlying
798834
// Batch against the given SpanSet. We only consider span boundaries, associated
799835
// timestamps are not considered.

pkg/storage/batch.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ func (r *BatchReader) Value() []byte {
150150
}
151151
}
152152

153-
// EngineEndKey returns the engine end key of the current ranged batch entry.
154-
func (r *BatchReader) EngineEndKey() (EngineKey, error) {
153+
// EndKey returns the raw end key of the current ranged batch entry.
154+
func (r *BatchReader) EndKey() ([]byte, error) {
155155
var rawKey []byte
156156
switch r.kind {
157157
case pebble.InternalKeyKindRangeDelete:
@@ -160,14 +160,23 @@ func (r *BatchReader) EngineEndKey() (EngineKey, error) {
160160
case pebble.InternalKeyKindRangeKeySet, pebble.InternalKeyKindRangeKeyUnset, pebble.InternalKeyKindRangeKeyDelete:
161161
rangeKeys, err := r.rangeKeys()
162162
if err != nil {
163-
return EngineKey{}, err
163+
return nil, err
164164
}
165165
rawKey = rangeKeys.End
166166

167167
default:
168-
return EngineKey{}, errors.AssertionFailedf(
168+
return nil, errors.AssertionFailedf(
169169
"can only ask for EndKey on a ranged entry, got %v", r.kind)
170170
}
171+
return rawKey, nil
172+
}
173+
174+
// EngineEndKey returns the engine end key of the current ranged batch entry.
175+
func (r *BatchReader) EngineEndKey() (EngineKey, error) {
176+
rawKey, err := r.EndKey()
177+
if err != nil {
178+
return EngineKey{}, err
179+
}
171180

172181
key, ok := DecodeEngineKey(rawKey)
173182
if !ok {
@@ -176,6 +185,21 @@ func (r *BatchReader) EngineEndKey() (EngineKey, error) {
176185
return key, nil
177186
}
178187

188+
// RawRangeKeys returns the raw range key values at the current entry.
189+
func (r *BatchReader) RawRangeKeys() ([]rangekey.Key, error) {
190+
switch r.kind {
191+
case pebble.InternalKeyKindRangeKeySet, pebble.InternalKeyKindRangeKeyUnset:
192+
default:
193+
return nil, errors.AssertionFailedf(
194+
"can only ask for range keys on a range key entry, got %v", r.kind)
195+
}
196+
rangeKeys, err := r.rangeKeys()
197+
if err != nil {
198+
return nil, err
199+
}
200+
return rangeKeys.Keys, nil
201+
}
202+
179203
// EngineRangeKeys returns the engine range key values at the current entry.
180204
func (r *BatchReader) EngineRangeKeys() ([]EngineRangeKeyValue, error) {
181205
switch r.kind {

pkg/storage/engine.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3131
"github.com/cockroachdb/errors"
3232
"github.com/cockroachdb/pebble"
33+
"github.com/cockroachdb/pebble/rangekey"
3334
"github.com/cockroachdb/pebble/vfs"
3435
"github.com/cockroachdb/redact"
3536
prometheusgo "github.com/prometheus/client_model/go"
@@ -598,6 +599,26 @@ type Reader interface {
598599
// with the iterator to free resources. The caller can change IterOptions
599600
// after this function returns.
600601
NewEngineIterator(opts IterOptions) EngineIterator
602+
// ScanInternal allows a caller to inspect the underlying engine's InternalKeys
603+
// using a visitor pattern, while also allowing for keys in shared files to be
604+
// skipped if a visitor is provided for visitSharedFiles. Useful for
605+
// fast-replicating state from one Reader to another. Point keys are collapsed
606+
// such that only one internal key per user key is exposed, and rangedels and
607+
// range keys are collapsed and defragmented with each span being surfaced
608+
// exactly once, alongside the highest seqnum for a rangedel on that span
609+
// (for rangedels) or all coalesced rangekey.Keys in that span (for range
610+
// keys). A point key deleted by a rangedel will not be exposed, but the
611+
// rangedel would be exposed.
612+
//
613+
// Note that ScanInternal does not obey the guarantees indicated by
614+
// ConsistentIterators.
615+
ScanInternal(
616+
ctx context.Context, lower, upper roachpb.Key,
617+
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue, info pebble.IteratorLevel) error,
618+
visitRangeDel func(start, end []byte, seqNum uint64) error,
619+
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
620+
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
621+
) error
601622
// ConsistentIterators returns true if the Reader implementation guarantees
602623
// that the different iterators constructed by this Reader will see the same
603624
// underlying Engine state. This is not true about Batch writes: new iterators
@@ -854,6 +875,32 @@ type Writer interface {
854875
BufferedSize() int
855876
}
856877

878+
// InternalWriter is an extension of Writer that supports additional low-level
879+
// methods to operate on internal keys in Pebble. These additional methods
880+
// should only be used sparingly, when one of the high-level methods cannot
881+
// achieve the same ends.
882+
type InternalWriter interface {
883+
Writer
884+
// ClearRawEncodedRange is similar to ClearRawRange, except it takes pre-encoded
885+
// start, end keys and bypasses the EngineKey encoding step. It also only
886+
// operates on point keys; for range keys, use ClearEngineRangeKey or
887+
// PutInternalRangeKey.
888+
//
889+
// It is safe to modify the contents of the arguments after it returns.
890+
ClearRawEncodedRange(start, end []byte) error
891+
892+
// PutInternalRangeKey adds an InternalRangeKey to this batch. This is a very
893+
// low-level method that should be used sparingly.
894+
//
895+
// It is safe to modify the contents of the arguments after it returns.
896+
PutInternalRangeKey(start, end []byte, key rangekey.Key) error
897+
// PutInternalPointKey adds a point InternalKey to this batch. This is a very
898+
// low-level method that should be used sparingly.
899+
//
900+
// It is safe to modify the contents of the arguments after it returns.
901+
PutInternalPointKey(key *pebble.InternalKey, value []byte) error
902+
}
903+
857904
// ClearOptions holds optional parameters to methods that clear keys from the
858905
// storage engine.
859906
type ClearOptions struct {
@@ -984,6 +1031,11 @@ type Engine interface {
9841031
// additionally returns ingestion stats.
9851032
IngestExternalFilesWithStats(
9861033
ctx context.Context, paths []string) (pebble.IngestOperationStats, error)
1034+
// IngestAndExciseExternalFiles is a variant of IngestExternalFilesWithStats
1035+
// that excises an ExciseSpan, and ingests either local or shared sstables or
1036+
// both.
1037+
IngestAndExciseExternalFiles(
1038+
ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span) (pebble.IngestOperationStats, error)
9871039
// PreIngestDelay offers an engine the chance to backpressure ingestions.
9881040
// When called, it may choose to block if the engine determines that it is in
9891041
// or approaching a state where further ingestions may risk its health.
@@ -1048,7 +1100,7 @@ type Batch interface {
10481100

10491101
// WriteBatch is the interface for write batch specific operations.
10501102
type WriteBatch interface {
1051-
Writer
1103+
InternalWriter
10521104
// Close closes the batch, freeing up any outstanding resources.
10531105
Close()
10541106
// Commit atomically applies any batched updates to the underlying engine. If

pkg/storage/open.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ func BallastSize(size int64) ConfigOption {
181181
func SharedStorage(sharedStorage cloud.ExternalStorage) ConfigOption {
182182
return func(cfg *engineConfig) error {
183183
cfg.SharedStorage = sharedStorage
184+
// TODO(bilal): Do the format major version ratchet while accounting for
185+
// version upgrade finalization. However, seeing as shared storage is
186+
// an experimental feature and upgrading from existing stores is not
187+
// supported, this is fine.
188+
cfg.Opts.FormatMajorVersion = pebble.ExperimentalFormatVirtualSSTables
184189
return nil
185190
}
186191
}

pkg/storage/pebble.go

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1025,7 +1025,13 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
10251025
ValueBlocksEnabled.Get(&cfg.Settings.SV)
10261026
}
10271027
opts.Experimental.DisableIngestAsFlushable = func() bool {
1028-
return !IngestAsFlushable.Get(&cfg.Settings.SV)
1028+
// Disable flushable ingests if shared storage is enabled. This is because
1029+
// flushable ingests currently do not support Excise operations.
1030+
//
1031+
// TODO(bilal): Remove the first part of this || statement when
1032+
// https://github.com/cockroachdb/pebble/issues/2676 is completed, or when
1033+
// Pebble has better guards against this.
1034+
return cfg.SharedStorage != nil || !IngestAsFlushable.Get(&cfg.Settings.SV)
10291035
}
10301036

10311037
auxDir := opts.FS.PathJoin(cfg.Dir, base.AuxiliaryDir)
@@ -1475,6 +1481,20 @@ func (p *Pebble) NewEngineIterator(opts IterOptions) EngineIterator {
14751481
return newPebbleIterator(p.db, opts, StandardDurability, p)
14761482
}
14771483

1484+
// ScanInternal implements the Engine interface.
1485+
func (p *Pebble) ScanInternal(
1486+
ctx context.Context,
1487+
lower, upper roachpb.Key,
1488+
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue, info pebble.IteratorLevel) error,
1489+
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
1490+
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
1491+
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
1492+
) error {
1493+
rawLower := EngineKey{Key: lower}.Encode()
1494+
rawUpper := EngineKey{Key: upper}.Encode()
1495+
return p.db.ScanInternal(ctx, rawLower, rawUpper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
1496+
}
1497+
14781498
// ConsistentIterators implements the Engine interface.
14791499
func (p *Pebble) ConsistentIterators() bool {
14801500
return false
@@ -2033,6 +2053,17 @@ func (p *Pebble) IngestExternalFilesWithStats(
20332053
return p.db.IngestWithStats(paths)
20342054
}
20352055

2056+
// IngestAndExciseExternalFiles implements the Engine interface.
2057+
func (p *Pebble) IngestAndExciseExternalFiles(
2058+
ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span,
2059+
) (pebble.IngestOperationStats, error) {
2060+
rawSpan := pebble.KeyRange{
2061+
Start: EngineKey{Key: exciseSpan.Key}.Encode(),
2062+
End: EngineKey{Key: exciseSpan.EndKey}.Encode(),
2063+
}
2064+
return p.db.IngestAndExcise(paths, shared, rawSpan)
2065+
}
2066+
20362067
// PreIngestDelay implements the Engine interface.
20372068
func (p *Pebble) PreIngestDelay(ctx context.Context) {
20382069
preIngestDelay(ctx, p, p.settings)
@@ -2444,10 +2475,23 @@ func (p *pebbleReadOnly) PinEngineStateForIterators() error {
24442475
return nil
24452476
}
24462477

2478+
// ScanInternal implements the Reader interface.
2479+
func (p *pebbleReadOnly) ScanInternal(
2480+
ctx context.Context,
2481+
lower, upper roachpb.Key,
2482+
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue, info pebble.IteratorLevel) error,
2483+
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
2484+
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
2485+
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
2486+
) error {
2487+
return p.parent.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
2488+
}
2489+
24472490
// Writer methods are not implemented for pebbleReadOnly. Ideally, the code
24482491
// could be refactored so that a Reader could be supplied to evaluateBatch
24492492

24502493
// Writer is the write interface to an engine's data.
2494+
24512495
func (p *pebbleReadOnly) ApplyBatchRepr(repr []byte, sync bool) error {
24522496
panic("not implemented")
24532497
}
@@ -2621,6 +2665,20 @@ func (p *pebbleSnapshot) PinEngineStateForIterators() error {
26212665
return nil
26222666
}
26232667

2668+
// ScanInternal implements the Reader interface.
2669+
func (p *pebbleSnapshot) ScanInternal(
2670+
ctx context.Context,
2671+
lower, upper roachpb.Key,
2672+
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue, info pebble.IteratorLevel) error,
2673+
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
2674+
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
2675+
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
2676+
) error {
2677+
rawLower := EngineKey{Key: lower}.Encode()
2678+
rawUpper := EngineKey{Key: upper}.Encode()
2679+
return p.snapshot.ScanInternal(ctx, rawLower, rawUpper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
2680+
}
2681+
26242682
// ExceedMaxSizeError is the error returned when an export request
26252683
// fails due the export size exceeding the budget. This can be caused
26262684
// by large KVs that have many revisions.

pkg/storage/pebble_batch.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/util/uuid"
2222
"github.com/cockroachdb/errors"
2323
"github.com/cockroachdb/pebble"
24+
"github.com/cockroachdb/pebble/rangekey"
2425
)
2526

2627
// Wrapper struct around a pebble.Batch.
@@ -257,6 +258,23 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator {
257258
return iter
258259
}
259260

261+
// ScanInternal implements the Reader interface.
262+
func (p *pebbleBatch) ScanInternal(
263+
ctx context.Context,
264+
lower, upper roachpb.Key,
265+
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue, info pebble.IteratorLevel) error,
266+
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
267+
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
268+
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
269+
) error {
270+
panic("ScanInternal only supported on Engine and Snapshot.")
271+
}
272+
273+
// ClearRawEncodedRange implements the InternalWriter interface.
274+
func (p *pebbleBatch) ClearRawEncodedRange(start, end []byte) error {
275+
return p.batch.DeleteRange(start, end, pebble.Sync)
276+
}
277+
260278
// ConsistentIterators implements the Batch interface.
261279
func (p *pebbleBatch) ConsistentIterators() bool {
262280
return true
@@ -482,6 +500,20 @@ func (p *pebbleBatch) PutEngineRangeKey(start, end roachpb.Key, suffix, value []
482500
EngineKey{Key: start}.Encode(), EngineKey{Key: end}.Encode(), suffix, value, nil)
483501
}
484502

503+
// PutInternalRangeKey implements the InternalWriter interface.
504+
func (p *pebbleBatch) PutInternalRangeKey(start, end []byte, key rangekey.Key) error {
505+
switch key.Kind() {
506+
case pebble.InternalKeyKindRangeKeyUnset:
507+
return p.batch.RangeKeyUnset(start, end, key.Suffix, nil /* writeOptions */)
508+
case pebble.InternalKeyKindRangeKeySet:
509+
return p.batch.RangeKeySet(start, end, key.Suffix, key.Value, nil /* writeOptions */)
510+
case pebble.InternalKeyKindRangeKeyDelete:
511+
return p.batch.RangeKeyDelete(start, end, nil /* writeOptions */)
512+
default:
513+
panic("unexpected range key kind")
514+
}
515+
}
516+
485517
// ClearEngineRangeKey implements the Engine interface.
486518
func (p *pebbleBatch) ClearEngineRangeKey(start, end roachpb.Key, suffix []byte) error {
487519
return p.batch.RangeKeyUnset(
@@ -542,6 +574,14 @@ func (p *pebbleBatch) PutEngineKey(key EngineKey, value []byte) error {
542574
return p.batch.Set(p.buf, value, nil)
543575
}
544576

577+
// PutInternalPointKey implements the WriteBatch interface.
578+
func (p *pebbleBatch) PutInternalPointKey(key *pebble.InternalKey, value []byte) error {
579+
if len(key.UserKey) == 0 {
580+
return emptyKeyError()
581+
}
582+
return p.batch.AddInternalKey(key, value, nil /* writeOptions */)
583+
}
584+
545585
func (p *pebbleBatch) put(key MVCCKey, value []byte) error {
546586
if len(key.Key) == 0 {
547587
return emptyKeyError()

0 commit comments

Comments
 (0)