Skip to content

Commit 2e1cad3

Browse files
committed
spanset: Add overlaps() helper function, and add tests for Contains and Overlaps
Analogous to how we have a spanset helper contains() that understands the special span representation: [x-eps,x). This commit adds another helper to detect overlapping spans. Moreover, this commit adds tests to verify the behaviour of these two helper functions. References: #156537 Release note: None
1 parent a445cdc commit 2e1cad3

File tree

2 files changed

+286
-6
lines changed

2 files changed

+286
-6
lines changed

pkg/kv/kvserver/spanset/spanset.go

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ func (s *SpanSet) checkAllowed(
370370

371371
for ac := access; ac < NumSpanAccess; ac++ {
372372
for _, cur := range s.spans[ac][scope] {
373-
if contains(cur.Span, span) && check(ac, cur) {
373+
if Contains(cur.Span, span) && check(ac, cur) {
374374
return nil
375375
}
376376
}
@@ -382,11 +382,20 @@ func (s *SpanSet) checkAllowed(
382382
return nil
383383
}
384384

385-
// contains returns whether s1 contains s2. Unlike Span.Contains, this function
386-
// supports spans with a nil start key and a non-nil end key (e.g. "[nil, c)").
387-
// In this form, s2.Key (inclusive) is considered to be the previous key to
388-
// s2.EndKey (exclusive).
389-
func contains(s1, s2 roachpb.Span) bool {
385+
// Contains returns whether s1 contains s2. Unlike Span.Contains, this function
386+
// supports spans (both s1 and s2) with a nil start key and a non-nil end
387+
// key (e.g. "[nil, c)"). In this form, the span with nil Key is considered to
388+
// represent: [EndKey.Prev(), EndKey).
389+
func Contains(s1, s2 roachpb.Span) bool {
390+
if s1.Key == nil {
391+
// In this case, s1 can only contain s2 is a point span that equals s1.
392+
if s2.Key == nil {
393+
return s1.EndKey.Compare(s2.EndKey) == 0
394+
}
395+
return s2.Key.IsPrev(s1.EndKey) &&
396+
(s2.EndKey == nil || s1.EndKey.Compare(s2.EndKey) == 0)
397+
}
398+
390399
if s2.Key != nil {
391400
// The common case.
392401
return s1.Contains(s2)
@@ -402,6 +411,43 @@ func contains(s1, s2 roachpb.Span) bool {
402411
return s1.Key.Compare(s2.EndKey) < 0 && s1.EndKey.Compare(s2.EndKey) >= 0
403412
}
404413

414+
// Overlaps returns whether s1 overlaps s2. Unlike Span.Overlaps, this function
415+
// supports spans with a nil start key and a non-nil end key (e.g. "[nil, c)").
416+
// In this form, the span with nil Key is considered to represent:
417+
// [EndKey.Prev(), EndKey).
418+
func Overlaps(s1, s2 roachpb.Span) bool {
419+
// If both keys have a nil start, the spans overlap if the end keys are the
420+
// same.
421+
if s1.Key == nil && s2.Key == nil {
422+
return s1.EndKey.Compare(s2.EndKey) == 0
423+
}
424+
425+
// Handle the case where s1.Key is nil but not s2.Key by swapping and
426+
// recursing. This way, the rest of the function assumes that s1.Key is not
427+
// nil.
428+
if s1.Key == nil && s2.Key != nil {
429+
return Overlaps(s2, s1)
430+
}
431+
432+
// The common case: both spans have non-nil start keys.
433+
if s2.Key != nil {
434+
return s1.Overlaps(s2)
435+
}
436+
437+
// s1.Key is not nil, but s2.Key is nil.
438+
// The following is equivalent to:
439+
// s1.Overlaps(roachpb.Span{Key: s2.EndKey.Prev()})
440+
441+
if s1.EndKey == nil {
442+
// s1 is a point span, overlaps with s2 iff s1.Key is the prev of s2.EndKey
443+
return s1.Key.IsPrev(s2.EndKey)
444+
}
445+
446+
// s1 is [s1.Key, s1.EndKey), s2 is [s2.EndKey.Prev(), s2.EndKey)
447+
// They overlap iff s2.EndKey.Prev() is in [s1.Key, s1.EndKey).
448+
return s1.Key.Compare(s2.EndKey) < 0 && s1.EndKey.Compare(s2.EndKey) >= 0
449+
}
450+
405451
// Validate returns an error if any spans that have been added to the set
406452
// are invalid.
407453
func (s *SpanSet) Validate() error {

pkg/kv/kvserver/spanset/spanset_test.go

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,3 +394,237 @@ func TestSpanSetWriteImpliesRead(t *testing.T) {
394394
t.Errorf("expected to be allowed to read rwSpan, error: %+v", err)
395395
}
396396
}
397+
398+
// Test that Contains correctly determines if s1 contains s2, including
399+
// support for spans with nil start/end keys.
400+
func TestContains(t *testing.T) {
401+
defer leaktest.AfterTest(t)()
402+
403+
testCases := []struct {
404+
name string
405+
s1 roachpb.Span
406+
s2 roachpb.Span
407+
expected bool
408+
}{
409+
{
410+
name: "s1 contains s2 exactly",
411+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
412+
s2: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
413+
expected: true,
414+
},
415+
{
416+
name: "s1 contains s2",
417+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
418+
s2: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
419+
expected: true,
420+
},
421+
{
422+
name: "s1 contains s2 start point span",
423+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
424+
s2: roachpb.Span{Key: roachpb.Key("a")},
425+
expected: true,
426+
},
427+
{
428+
name: "s1 contains s2 end point span",
429+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
430+
s2: roachpb.Span{EndKey: roachpb.Key("d")},
431+
expected: true,
432+
},
433+
{
434+
name: "s1 point contains s2 point same key",
435+
s1: roachpb.Span{Key: roachpb.Key("a")},
436+
s2: roachpb.Span{Key: roachpb.Key("a")},
437+
expected: true,
438+
},
439+
{
440+
name: "s1 with nil start contains s2 with nil end",
441+
s1: roachpb.Span{EndKey: roachpb.Key("d").Next()},
442+
s2: roachpb.Span{Key: roachpb.Key("d")},
443+
expected: true,
444+
},
445+
{
446+
name: "s1 with nil start contains s2",
447+
s1: roachpb.Span{EndKey: roachpb.Key("d").Next()},
448+
s2: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("d").Next()},
449+
expected: true,
450+
},
451+
{
452+
name: "s1 with nil start contains s2 with nil start",
453+
s1: roachpb.Span{EndKey: roachpb.Key("d")},
454+
s2: roachpb.Span{EndKey: roachpb.Key("d")},
455+
expected: true,
456+
},
457+
{
458+
name: "s1 does not contain s2 - disjoint",
459+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
460+
s2: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("f")},
461+
expected: false,
462+
},
463+
{
464+
name: "s1 does not contain s2 - partial overlap",
465+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
466+
s2: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")},
467+
expected: false,
468+
},
469+
{
470+
name: "s1 does not contain s2 - s2 larger",
471+
s1: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
472+
s2: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
473+
expected: false,
474+
},
475+
{
476+
name: "s1 does not contain s2 point - outside range",
477+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
478+
s2: roachpb.Span{Key: roachpb.Key("d")},
479+
expected: false,
480+
},
481+
{
482+
name: "s1 with nil end does not contain disjoint span",
483+
s1: roachpb.Span{Key: roachpb.Key("a")},
484+
s2: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")},
485+
expected: false,
486+
},
487+
{
488+
name: "s1 with nil start does not contain disjoint span",
489+
s1: roachpb.Span{EndKey: roachpb.Key("b")},
490+
s2: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")},
491+
expected: false,
492+
},
493+
{
494+
name: "s1 with nil end does not contain s2 with nil start",
495+
s1: roachpb.Span{Key: roachpb.Key("a")},
496+
s2: roachpb.Span{Key: roachpb.Key("b")},
497+
expected: false,
498+
},
499+
{
500+
name: "s1 with nil start does not contain s2 with nil end",
501+
s1: roachpb.Span{EndKey: roachpb.Key("a")},
502+
s2: roachpb.Span{EndKey: roachpb.Key("d")},
503+
expected: false,
504+
},
505+
{
506+
name: "s1 with nil start does not contain s2",
507+
s1: roachpb.Span{EndKey: roachpb.Key("d")},
508+
s2: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
509+
expected: false,
510+
},
511+
}
512+
513+
for _, tc := range testCases {
514+
t.Run(tc.name, func(t *testing.T) {
515+
require.Equal(t, tc.expected, Contains(tc.s1, tc.s2))
516+
})
517+
}
518+
}
519+
520+
// Test that Overlaps correctly determines if s1 overlaps s2, including
521+
// support for spans with nil start/end keys.
522+
func TestOverlaps(t *testing.T) {
523+
defer leaktest.AfterTest(t)()
524+
525+
testCases := []struct {
526+
name string
527+
s1 roachpb.Span
528+
s2 roachpb.Span
529+
expected bool
530+
}{
531+
{
532+
name: "s1 overlaps s2 exactly",
533+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
534+
s2: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
535+
expected: true,
536+
},
537+
{
538+
name: "s1 overlaps s2 partial",
539+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
540+
s2: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")},
541+
expected: true,
542+
},
543+
{
544+
name: "s1 contains s2",
545+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
546+
s2: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
547+
expected: true,
548+
},
549+
{
550+
name: "s2 contains s1",
551+
s1: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
552+
s2: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
553+
expected: true,
554+
},
555+
{
556+
name: "s1 overlaps s2 with nil end",
557+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
558+
s2: roachpb.Span{Key: roachpb.Key("a")},
559+
expected: true,
560+
},
561+
{
562+
name: "s1 overlaps s2 with nil start",
563+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
564+
s2: roachpb.Span{EndKey: roachpb.Key("d")},
565+
expected: true,
566+
},
567+
{
568+
name: "s1 point overlaps s2 point same key",
569+
s1: roachpb.Span{Key: roachpb.Key("a")},
570+
s2: roachpb.Span{Key: roachpb.Key("a")},
571+
expected: true,
572+
},
573+
{
574+
name: "s1 point overlaps s2 point same end key",
575+
s1: roachpb.Span{EndKey: roachpb.Key("a")},
576+
s2: roachpb.Span{EndKey: roachpb.Key("a")},
577+
expected: true,
578+
},
579+
{
580+
name: "s1 with nil start overlaps s2",
581+
s1: roachpb.Span{EndKey: roachpb.Key("d").Next()},
582+
s2: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("d").Next()},
583+
expected: true,
584+
},
585+
{
586+
name: "s1 does not overlap s2",
587+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
588+
s2: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("f")},
589+
expected: false,
590+
},
591+
{
592+
name: "s1 does not overlap s2 - adjacent",
593+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
594+
s2: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
595+
expected: false,
596+
},
597+
{
598+
name: "s1 does not overlap s2 with nil end key",
599+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
600+
s2: roachpb.Span{Key: roachpb.Key("c")},
601+
expected: false,
602+
},
603+
{
604+
name: "s1 does not overlap s2 with nil start key",
605+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
606+
s2: roachpb.Span{EndKey: roachpb.Key("a")},
607+
expected: false,
608+
},
609+
{
610+
name: "s1 does not overlap s2 with nil start key",
611+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
612+
s2: roachpb.Span{EndKey: roachpb.Key("d")},
613+
expected: false,
614+
},
615+
{
616+
name: "s1 with nil start overlaps s2 with nil end",
617+
s1: roachpb.Span{EndKey: roachpb.Key("d").Next()},
618+
s2: roachpb.Span{Key: roachpb.Key("d")},
619+
expected: true,
620+
},
621+
}
622+
623+
for _, tc := range testCases {
624+
t.Run(tc.name, func(t *testing.T) {
625+
require.Equal(t, tc.expected, Overlaps(tc.s1, tc.s2))
626+
// Overlaps should be commutative.
627+
require.Equal(t, tc.expected, Overlaps(tc.s2, tc.s1))
628+
})
629+
}
630+
}

0 commit comments

Comments
 (0)