Skip to content

Commit 26c7441

Browse files
committed
kvserver/spanset: introduce forbidden spans matchers
This commit introduces the concept of forbidden spans, which will allow us to assert our batches don't modify Raft's engine keys.
1 parent 1befb1f commit 26c7441

File tree

8 files changed

+99
-11
lines changed

8 files changed

+99
-11
lines changed

pkg/kv/kvserver/batcheval/cmd_add_sstable.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ func EvalAddSSTable(
337337
// addition, and instead just use this key-only iterator. If a caller actually
338338
// needs to know what data is there, it must issue its own real Scan.
339339
if args.ReturnFollowingLikelyNonEmptySpanStart {
340-
existingIter, err := spanset.DisableReaderAssertions(readWriter).NewMVCCIterator(
340+
existingIter, err := spanset.DisableLatchAssertions(readWriter).NewMVCCIterator(
341341
ctx,
342342
storage.MVCCKeyIterKind, // don't care if it is committed or not, just that it isn't empty.
343343
storage.IterOptions{

pkg/kv/kvserver/batcheval/cmd_delete.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func Delete(
5858
// If requested, replace point tombstones with range tombstones.
5959
if cArgs.EvalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes && h.Txn == nil {
6060
if err := storage.ReplacePointTombstonesWithRangeTombstones(
61-
ctx, spanset.DisableReadWriterAssertions(readWriter),
61+
ctx, spanset.DisableLatchAssertions(readWriter),
6262
cArgs.Stats, args.Key, args.EndKey); err != nil {
6363
return result.Result{}, err
6464
}

pkg/kv/kvserver/batcheval/cmd_delete_range.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ func deleteRangeTransactional(
302302
// If requested, replace point tombstones with range tombstones.
303303
if cArgs.EvalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes && h.Txn == nil {
304304
if err := storage.ReplacePointTombstonesWithRangeTombstones(
305-
ctx, spanset.DisableReadWriterAssertions(readWriter),
305+
ctx, spanset.DisableLatchAssertions(readWriter),
306306
cArgs.Stats, args.Key, args.EndKey); err != nil {
307307
return result.Result{}, err
308308
}

pkg/kv/kvserver/batcheval/cmd_end_transaction.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ func resolveLocalLocksWithPagination(
719719
// If requested, replace point tombstones with range tombstones.
720720
if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
721721
if err := storage.ReplacePointTombstonesWithRangeTombstones(
722-
ctx, spanset.DisableReadWriterAssertions(readWriter),
722+
ctx, spanset.DisableLatchAssertions(readWriter),
723723
ms, update.Key, update.EndKey); err != nil {
724724
return 0, 0, 0, errors.Wrapf(err,
725725
"replacing point tombstones with range tombstones for write intent at %s on end transaction [%s]",
@@ -757,7 +757,7 @@ func resolveLocalLocksWithPagination(
757757
// If requested, replace point tombstones with range tombstones.
758758
if evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
759759
if err := storage.ReplacePointTombstonesWithRangeTombstones(
760-
ctx, spanset.DisableReadWriterAssertions(readWriter),
760+
ctx, spanset.DisableLatchAssertions(readWriter),
761761
ms, update.Key, update.EndKey); err != nil {
762762
return 0, 0, 0, errors.Wrapf(err,
763763
"replacing point tombstones with range tombstones for write intent range at %s on end transaction [%s]",

pkg/kv/kvserver/batcheval/cmd_resolve_intent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func ResolveIntent(
129129
// If requested, replace point tombstones with range tombstones.
130130
if ok && cArgs.EvalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
131131
if err := storage.ReplacePointTombstonesWithRangeTombstones(ctx,
132-
spanset.DisableReadWriterAssertions(readWriter), ms, update.Key, update.EndKey); err != nil {
132+
spanset.DisableLatchAssertions(readWriter), ms, update.Key, update.EndKey); err != nil {
133133
return result.Result{}, err
134134
}
135135
}

pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func ResolveIntentRange(
9999
// If requested, replace point tombstones with range tombstones.
100100
if cArgs.EvalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
101101
if err := storage.ReplacePointTombstonesWithRangeTombstones(ctx,
102-
spanset.DisableReadWriterAssertions(readWriter), ms, args.Key, args.EndKey); err != nil {
102+
spanset.DisableLatchAssertions(readWriter), ms, args.Key, args.EndKey); err != nil {
103103
return result.Result{}, err
104104
}
105105
}

pkg/kv/kvserver/spanset/batch.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,59 @@ func DisableReadWriterAssertions(rw storage.ReadWriter) storage.ReadWriter {
888888
}
889889
}
890890

891+
func disableAssertionHelper(
892+
rw storage.ReadWriter, disableAssertion func(spans *SpanSet),
893+
) storage.ReadWriter {
894+
switch v := rw.(type) {
895+
case *spanSetBatch:
896+
// Create a SpanSet wrapper that will be used to disable a specific
897+
// assertion ephemerally.
898+
spans := &SpanSet{
899+
spans: v.ReadWriter.spanSetReader.spans.spans,
900+
forbiddenSpansMatchers: v.ReadWriter.spanSetReader.spans.forbiddenSpansMatchers,
901+
allowForbidden: v.ReadWriter.spanSetReader.spans.allowForbidden,
902+
allowUndeclared: v.ReadWriter.spanSetReader.spans.allowUndeclared,
903+
}
904+
905+
// Call the function that will disable some assertion.
906+
disableAssertion(spans)
907+
908+
// Create a new spanSetBatch with the modified span sets.
909+
return &spanSetBatch{
910+
ReadWriter: ReadWriter{
911+
spanSetReader: spanSetReader{r: v.b, spans: spans, spansOnly: v.spansOnly, ts: v.ts},
912+
spanSetWriter: spanSetWriter{w: v.b, spans: spans, spansOnly: v.spansOnly, ts: v.ts},
913+
},
914+
b: v.b,
915+
spans: spans,
916+
spansOnly: v.spansOnly,
917+
ts: v.ts,
918+
}
919+
default:
920+
return rw
921+
}
922+
}
923+
924+
// DisableLatchAssertions returns a new batch wrapper with latch assertions
925+
// disabled. It does not modify the original batch. The returned batch shares
926+
// the same underlying storage.Batch but has its own SpanSet wrapper with the
927+
// latch assertion disabled.
928+
func DisableLatchAssertions(rw storage.ReadWriter) storage.ReadWriter {
929+
return disableAssertionHelper(rw, func(spans *SpanSet) {
930+
spans.DisableUndeclaredAccessAssertions()
931+
})
932+
}
933+
934+
// DisableForbiddenSpanAssertionsOnBatch returns a new batch wrapper with
935+
// forbidden span assertions disabled. It does not modify the original batch.
936+
// The returned batch shares the same underlying storage.Batch but has its own
937+
// SpanSet wrapper with the forbidden span assertion disabled.
938+
func DisableForbiddenSpanAssertionsOnBatch(rw storage.ReadWriter) storage.ReadWriter {
939+
return disableAssertionHelper(rw, func(spans *SpanSet) {
940+
spans.DisableForbiddenSpansAssertions()
941+
})
942+
}
943+
891944
// addLockTableSpans adds corresponding lock table spans for the declared
892945
// spans. This is to implicitly allow raw access to separated intents in the
893946
// lock table for any declared keys. Explicitly declaring lock table spans is

pkg/kv/kvserver/spanset/spanset.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,14 @@ type Span struct {
8383
// The Span slice for a particular access and scope contains non-overlapping
8484
// spans in increasing key order after calls to SortAndDedup.
8585
type SpanSet struct {
86-
spans [NumSpanAccess][NumSpanScope][]Span
87-
allowUndeclared bool
86+
spans [NumSpanAccess][NumSpanScope][]Span
87+
// forbiddenSpansMatchers are functions that return an error if the given span
88+
// shouldn't be accessed (forbidden). This allows for complex pattern matching
89+
// like forbidding specific keys across all range IDs without enumerating them
90+
// explicitly.
91+
forbiddenSpansMatchers []func(roachpb.Span) error
92+
allowUndeclared bool
93+
allowForbidden bool
8894
}
8995

9096
var spanSetPool = sync.Pool{
@@ -113,6 +119,8 @@ func (s *SpanSet) Release() {
113119
s.spans[sa][ss] = recycle
114120
}
115121
}
122+
s.forbiddenSpansMatchers = nil
123+
s.allowForbidden = false
116124
s.allowUndeclared = false
117125
spanSetPool.Put(s)
118126
}
@@ -155,7 +163,9 @@ func (s *SpanSet) Copy() *SpanSet {
155163
n.spans[sa][ss] = append(n.spans[sa][ss], s.spans[sa][ss]...)
156164
}
157165
}
166+
n.forbiddenSpansMatchers = append(n.forbiddenSpansMatchers, s.forbiddenSpansMatchers...)
158167
n.allowUndeclared = s.allowUndeclared
168+
n.allowForbidden = s.allowForbidden
159169
return n
160170
}
161171

@@ -201,14 +211,22 @@ func (s *SpanSet) AddMVCC(access SpanAccess, span roachpb.Span, timestamp hlc.Ti
201211
s.spans[access][scope] = append(s.spans[access][scope], Span{Span: span, Timestamp: timestamp})
202212
}
203213

214+
// AddForbiddenMatcher adds a forbidden span matcher. The matcher is a function
215+
// that is called for each span access to check if it should be forbidden.
216+
func (s *SpanSet) AddForbiddenMatcher(matcher func(roachpb.Span) error) {
217+
s.forbiddenSpansMatchers = append(s.forbiddenSpansMatchers, matcher)
218+
}
219+
204220
// Merge merges all spans in s2 into s. s2 is not modified.
205221
func (s *SpanSet) Merge(s2 *SpanSet) {
206222
for sa := SpanAccess(0); sa < NumSpanAccess; sa++ {
207223
for ss := SpanScope(0); ss < NumSpanScope; ss++ {
208224
s.spans[sa][ss] = append(s.spans[sa][ss], s2.spans[sa][ss]...)
209225
}
210226
}
227+
s.forbiddenSpansMatchers = append(s.forbiddenSpansMatchers, s2.forbiddenSpansMatchers...)
211228
s.allowUndeclared = s2.allowUndeclared
229+
s.allowForbidden = s2.allowForbidden
212230
s.SortAndDedup()
213231
}
214232

@@ -331,9 +349,19 @@ func (s *SpanSet) CheckAllowedAt(
331349
func (s *SpanSet) checkAllowed(
332350
access SpanAccess, span roachpb.Span, check func(SpanAccess, Span) bool,
333351
) error {
352+
// Unless explicitly disabled, check if we access any forbidden spans.
353+
if !s.allowForbidden {
354+
// Check if the span is forbidden.
355+
for _, matcher := range s.forbiddenSpansMatchers {
356+
if err := matcher(span); err != nil {
357+
return errors.Errorf("cannot %s span %s: matches forbidden pattern",
358+
access, span)
359+
}
360+
}
361+
}
362+
363+
// Unless explicitly disabled, check if we access any undeclared spans.
334364
if s.allowUndeclared {
335-
// If the request has specified that undeclared spans are allowed, do
336-
// nothing.
337365
return nil
338366
}
339367

@@ -352,6 +380,8 @@ func (s *SpanSet) checkAllowed(
352380
}
353381

354382
return errors.Errorf("cannot %s undeclared span %s\ndeclared:\n%s\nstack:\n%s", access, span, s, debugutil.Stack())
383+
384+
return nil
355385
}
356386

357387
// contains returns whether s1 contains s2. Unlike Span.Contains, this function
@@ -396,3 +426,8 @@ func (s *SpanSet) Validate() error {
396426
func (s *SpanSet) DisableUndeclaredAccessAssertions() {
397427
s.allowUndeclared = true
398428
}
429+
430+
// DisableForbiddenAssertions disables forbidden spans assertions.
431+
func (s *SpanSet) DisableForbiddenSpansAssertions() {
432+
s.allowForbidden = true
433+
}

0 commit comments

Comments
 (0)