Skip to content

Commit d54cde3

Browse files
committed
kvserver/spanset: introduce forbidden spans matchers
This commit introduces the concept of forbidden spans, which will allow us to assert our batches don't modify Raft's engine keys.
1 parent 1befb1f commit d54cde3

File tree

8 files changed

+107
-12
lines changed

8 files changed

+107
-12
lines changed

pkg/kv/kvserver/batcheval/cmd_add_sstable.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ func EvalAddSSTable(
337337
// addition, and instead just use this key-only iterator. If a caller actually
338338
// needs to know what data is there, it must issue its own real Scan.
339339
if args.ReturnFollowingLikelyNonEmptySpanStart {
340-
existingIter, err := spanset.DisableReaderAssertions(readWriter).NewMVCCIterator(
340+
existingIter, err := spanset.DisableLatchAssertions(readWriter).NewMVCCIterator(
341341
ctx,
342342
storage.MVCCKeyIterKind, // don't care if it is committed or not, just that it isn't empty.
343343
storage.IterOptions{

pkg/kv/kvserver/batcheval/cmd_delete.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func Delete(
5858
// If requested, replace point tombstones with range tombstones.
5959
if cArgs.EvalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes && h.Txn == nil {
6060
if err := storage.ReplacePointTombstonesWithRangeTombstones(
61-
ctx, spanset.DisableReadWriterAssertions(readWriter),
61+
ctx, spanset.DisableLatchAssertions(readWriter),
6262
cArgs.Stats, args.Key, args.EndKey); err != nil {
6363
return result.Result{}, err
6464
}

pkg/kv/kvserver/batcheval/cmd_delete_range.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ func deleteRangeTransactional(
302302
// If requested, replace point tombstones with range tombstones.
303303
if cArgs.EvalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes && h.Txn == nil {
304304
if err := storage.ReplacePointTombstonesWithRangeTombstones(
305-
ctx, spanset.DisableReadWriterAssertions(readWriter),
305+
ctx, spanset.DisableLatchAssertions(readWriter),
306306
cArgs.Stats, args.Key, args.EndKey); err != nil {
307307
return result.Result{}, err
308308
}

pkg/kv/kvserver/batcheval/cmd_end_transaction.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ func resolveLocalLocksWithPagination(
719719
// If requested, replace point tombstones with range tombstones.
720720
if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
721721
if err := storage.ReplacePointTombstonesWithRangeTombstones(
722-
ctx, spanset.DisableReadWriterAssertions(readWriter),
722+
ctx, spanset.DisableLatchAssertions(readWriter),
723723
ms, update.Key, update.EndKey); err != nil {
724724
return 0, 0, 0, errors.Wrapf(err,
725725
"replacing point tombstones with range tombstones for write intent at %s on end transaction [%s]",
@@ -757,7 +757,7 @@ func resolveLocalLocksWithPagination(
757757
// If requested, replace point tombstones with range tombstones.
758758
if evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
759759
if err := storage.ReplacePointTombstonesWithRangeTombstones(
760-
ctx, spanset.DisableReadWriterAssertions(readWriter),
760+
ctx, spanset.DisableLatchAssertions(readWriter),
761761
ms, update.Key, update.EndKey); err != nil {
762762
return 0, 0, 0, errors.Wrapf(err,
763763
"replacing point tombstones with range tombstones for write intent range at %s on end transaction [%s]",

pkg/kv/kvserver/batcheval/cmd_resolve_intent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func ResolveIntent(
129129
// If requested, replace point tombstones with range tombstones.
130130
if ok && cArgs.EvalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
131131
if err := storage.ReplacePointTombstonesWithRangeTombstones(ctx,
132-
spanset.DisableReadWriterAssertions(readWriter), ms, update.Key, update.EndKey); err != nil {
132+
spanset.DisableLatchAssertions(readWriter), ms, update.Key, update.EndKey); err != nil {
133133
return result.Result{}, err
134134
}
135135
}

pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func ResolveIntentRange(
9999
// If requested, replace point tombstones with range tombstones.
100100
if cArgs.EvalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
101101
if err := storage.ReplacePointTombstonesWithRangeTombstones(ctx,
102-
spanset.DisableReadWriterAssertions(readWriter), ms, args.Key, args.EndKey); err != nil {
102+
spanset.DisableLatchAssertions(readWriter), ms, args.Key, args.EndKey); err != nil {
103103
return result.Result{}, err
104104
}
105105
}

pkg/kv/kvserver/spanset/batch.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,9 @@ func NewReadWriterAt(rw storage.ReadWriter, spans *SpanSet, ts hlc.Timestamp) st
766766

767767
type spanSetBatch struct {
768768
ReadWriter
769-
b storage.Batch
769+
b storage.Batch
770+
// TODO(ibrahim): The fields spans, spansOnly, and ts don't seem to be used.
771+
// Consider removing them and performing the necessary clean ups.
770772
spans *SpanSet
771773

772774
spansOnly bool
@@ -838,6 +840,22 @@ func (s spanSetBatch) ClearRawEncodedRange(start, end []byte) error {
838840
return s.b.ClearRawEncodedRange(start, end)
839841
}
840842

843+
// clone returns a shallow copy of the spanSetBatch. The returned batch shares
844+
// the same underlying storage.Batch but has its own spanSetBatch wrapper with
845+
// a new copy of the SpanSet that uses a shallow copy of the underlying spans.
846+
func (s spanSetBatch) clone() *spanSetBatch {
847+
return &spanSetBatch{
848+
ReadWriter: ReadWriter{
849+
spanSetReader: spanSetReader{r: s.b, spans: s.spanSetReader.spans.ShallowCopy(), spansOnly: s.spansOnly, ts: s.ts},
850+
spanSetWriter: spanSetWriter{w: s.b, spans: s.spanSetWriter.spans.ShallowCopy(), spansOnly: s.spansOnly, ts: s.ts},
851+
},
852+
b: s.b,
853+
spans: s.spans.ShallowCopy(),
854+
spansOnly: s.spansOnly,
855+
ts: s.ts,
856+
}
857+
}
858+
841859
// NewBatch returns a storage.Batch that asserts access of the underlying
842860
// Batch against the given SpanSet. We only consider span boundaries, associated
843861
// timestamps are not considered.
@@ -888,6 +906,40 @@ func DisableReadWriterAssertions(rw storage.ReadWriter) storage.ReadWriter {
888906
}
889907
}
890908

909+
// DisableLatchAssertions returns a new batch wrapper with latch assertions
910+
// disabled. It does not modify the original batch. The returned batch shares
911+
// the same underlying storage.Batch but has its own SpanSet wrapper with the
912+
// latch assertion disabled.
913+
func DisableLatchAssertions(rw storage.ReadWriter) storage.ReadWriter {
914+
switch v := rw.(type) {
915+
case *spanSetBatch:
916+
newSnapSetBatch := v.clone()
917+
newSnapSetBatch.spanSetReader.spans.DisableUndeclaredAccessAssertions()
918+
newSnapSetBatch.spanSetWriter.spans.DisableUndeclaredAccessAssertions()
919+
return newSnapSetBatch
920+
921+
default:
922+
return rw
923+
}
924+
}
925+
926+
// DisableForbiddenSpanAssertionsOnBatch returns a new batch wrapper with
927+
// forbidden span assertions disabled. It does not modify the original batch.
928+
// The returned batch shares the same underlying storage.Batch but has its own
929+
// SpanSet wrapper with the forbidden span assertion disabled.
930+
func DisableForbiddenSpanAssertionsOnBatch(rw storage.ReadWriter) storage.ReadWriter {
931+
switch v := rw.(type) {
932+
case *spanSetBatch:
933+
newSnapSetBatch := v.clone()
934+
newSnapSetBatch.spanSetReader.spans.DisableForbiddenSpansAssertions()
935+
newSnapSetBatch.spanSetWriter.spans.DisableForbiddenSpansAssertions()
936+
return newSnapSetBatch
937+
938+
default:
939+
return rw
940+
}
941+
}
942+
891943
// addLockTableSpans adds corresponding lock table spans for the declared
892944
// spans. This is to implicitly allow raw access to separated intents in the
893945
// lock table for any declared keys. Explicitly declaring lock table spans is

pkg/kv/kvserver/spanset/spanset.go

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,14 @@ type Span struct {
8383
// The Span slice for a particular access and scope contains non-overlapping
8484
// spans in increasing key order after calls to SortAndDedup.
8585
type SpanSet struct {
86-
spans [NumSpanAccess][NumSpanScope][]Span
87-
allowUndeclared bool
86+
spans [NumSpanAccess][NumSpanScope][]Span
87+
// forbiddenSpansMatchers are functions that return an error if the given span
88+
// shouldn't be accessed (forbidden). This allows for complex pattern matching
89+
// like forbidding specific keys across all range IDs without enumerating them
90+
// explicitly.
91+
forbiddenSpansMatchers []func(roachpb.Span) error
92+
allowUndeclared bool
93+
allowForbidden bool
8894
}
8995

9096
var spanSetPool = sync.Pool{
@@ -113,6 +119,8 @@ func (s *SpanSet) Release() {
113119
s.spans[sa][ss] = recycle
114120
}
115121
}
122+
s.forbiddenSpansMatchers = nil
123+
s.allowForbidden = false
116124
s.allowUndeclared = false
117125
spanSetPool.Put(s)
118126
}
@@ -155,7 +163,19 @@ func (s *SpanSet) Copy() *SpanSet {
155163
n.spans[sa][ss] = append(n.spans[sa][ss], s.spans[sa][ss]...)
156164
}
157165
}
166+
n.forbiddenSpansMatchers = append(n.forbiddenSpansMatchers, s.forbiddenSpansMatchers...)
158167
n.allowUndeclared = s.allowUndeclared
168+
n.allowForbidden = s.allowForbidden
169+
return n
170+
}
171+
172+
// ShallowCopy performs a shallow SpanSet copy.
173+
func (s *SpanSet) ShallowCopy() *SpanSet {
174+
n := New()
175+
n.spans = s.spans
176+
n.forbiddenSpansMatchers = s.forbiddenSpansMatchers
177+
n.allowUndeclared = s.allowUndeclared
178+
n.allowForbidden = s.allowForbidden
159179
return n
160180
}
161181

@@ -201,14 +221,22 @@ func (s *SpanSet) AddMVCC(access SpanAccess, span roachpb.Span, timestamp hlc.Ti
201221
s.spans[access][scope] = append(s.spans[access][scope], Span{Span: span, Timestamp: timestamp})
202222
}
203223

224+
// AddForbiddenMatcher adds a forbidden span matcher. The matcher is a function
225+
// that is called for each span access to check if it should be forbidden.
226+
func (s *SpanSet) AddForbiddenMatcher(matcher func(roachpb.Span) error) {
227+
s.forbiddenSpansMatchers = append(s.forbiddenSpansMatchers, matcher)
228+
}
229+
204230
// Merge merges all spans in s2 into s. s2 is not modified.
205231
func (s *SpanSet) Merge(s2 *SpanSet) {
206232
for sa := SpanAccess(0); sa < NumSpanAccess; sa++ {
207233
for ss := SpanScope(0); ss < NumSpanScope; ss++ {
208234
s.spans[sa][ss] = append(s.spans[sa][ss], s2.spans[sa][ss]...)
209235
}
210236
}
237+
s.forbiddenSpansMatchers = append(s.forbiddenSpansMatchers, s2.forbiddenSpansMatchers...)
211238
s.allowUndeclared = s2.allowUndeclared
239+
s.allowForbidden = s2.allowForbidden
212240
s.SortAndDedup()
213241
}
214242

@@ -331,9 +359,19 @@ func (s *SpanSet) CheckAllowedAt(
331359
func (s *SpanSet) checkAllowed(
332360
access SpanAccess, span roachpb.Span, check func(SpanAccess, Span) bool,
333361
) error {
362+
// Unless explicitly disabled, check if we access any forbidden spans.
363+
if !s.allowForbidden {
364+
// Check if the span is forbidden.
365+
for _, matcher := range s.forbiddenSpansMatchers {
366+
if err := matcher(span); err != nil {
367+
return errors.Errorf("cannot %s span %s: matches forbidden pattern",
368+
access, span)
369+
}
370+
}
371+
}
372+
373+
// Unless explicitly disabled, check if we access any undeclared spans.
334374
if s.allowUndeclared {
335-
// If the request has specified that undeclared spans are allowed, do
336-
// nothing.
337375
return nil
338376
}
339377

@@ -396,3 +434,8 @@ func (s *SpanSet) Validate() error {
396434
func (s *SpanSet) DisableUndeclaredAccessAssertions() {
397435
s.allowUndeclared = true
398436
}
437+
438+
// DisableForbiddenAssertions disables forbidden spans assertions.
439+
func (s *SpanSet) DisableForbiddenSpansAssertions() {
440+
s.allowForbidden = true
441+
}

0 commit comments

Comments
 (0)