Skip to content

Commit 2d3aabf

Browse files
committed
spanset: Add overlaps() helper function, and add tests for Contains and Overlaps
Analogous to how we have a spanset helper contains() that understands the special span representation: [x-eps,x). This commit adds another helper to detect overlapping spans. Moreover, this commit adds tests to verify the behaviour of these two helper functions. References: #156537 Release note: None
1 parent e118836 commit 2d3aabf

File tree

2 files changed

+221
-3
lines changed

2 files changed

+221
-3
lines changed

pkg/kv/kvserver/spanset/spanset.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ func (s *SpanSet) checkAllowed(
370370

371371
for ac := access; ac < NumSpanAccess; ac++ {
372372
for _, cur := range s.spans[ac][scope] {
373-
if contains(cur.Span, span) && check(ac, cur) {
373+
if Contains(cur.Span, span) && check(ac, cur) {
374374
return nil
375375
}
376376
}
@@ -382,11 +382,11 @@ func (s *SpanSet) checkAllowed(
382382
return nil
383383
}
384384

385-
// contains returns whether s1 contains s2. Unlike Span.Contains, this function
385+
// Contains returns whether s1 contains s2. Unlike Span.Contains, this function
386386
// supports spans with a nil start key and a non-nil end key (e.g. "[nil, c)").
387387
// In this form, s2.Key (inclusive) is considered to be the previous key to
388388
// s2.EndKey (exclusive).
389-
func contains(s1, s2 roachpb.Span) bool {
389+
func Contains(s1, s2 roachpb.Span) bool {
390390
if s2.Key != nil {
391391
// The common case.
392392
return s1.Contains(s2)
@@ -402,6 +402,26 @@ func contains(s1, s2 roachpb.Span) bool {
402402
return s1.Key.Compare(s2.EndKey) < 0 && s1.EndKey.Compare(s2.EndKey) >= 0
403403
}
404404

405+
// Overlaps returns whether s1 overlaps s2. Unlike Span.Overlaps, this function
406+
// supports spans with a nil start key and a non-nil end key (e.g. "[nil, c)").
407+
// In this form, s2.Key (inclusive) is considered to be the previous key to
408+
// s2.EndKey (exclusive).
409+
func Overlaps(s1, s2 roachpb.Span) bool {
410+
if s2.Key != nil {
411+
// The common case.
412+
return s1.Overlaps(s2)
413+
}
414+
415+
// The following is equivalent to:
416+
// s1.Overlaps(roachpb.Span{Key: s2.EndKey.Prev()})
417+
418+
if s1.EndKey == nil {
419+
return s1.Key.IsPrev(s2.EndKey)
420+
}
421+
422+
return s1.Key.Compare(s2.EndKey) < 0 && s1.EndKey.Compare(s2.EndKey) >= 0
423+
}
424+
405425
// Validate returns an error if any spans that have been added to the set
406426
// are invalid.
407427
func (s *SpanSet) Validate() error {

pkg/kv/kvserver/spanset/spanset_test.go

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,3 +394,201 @@ func TestSpanSetWriteImpliesRead(t *testing.T) {
394394
t.Errorf("expected to be allowed to read rwSpan, error: %+v", err)
395395
}
396396
}
397+
398+
// Test that Contains correctly determines if s1 contains s2, including
399+
// support for spans with nil start/end keys.
400+
func TestContains(t *testing.T) {
401+
defer leaktest.AfterTest(t)()
402+
403+
testCases := []struct {
404+
name string
405+
s1 roachpb.Span
406+
s2 roachpb.Span
407+
expected bool
408+
}{
409+
{
410+
name: "s1 contains s2 exactly",
411+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
412+
s2: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
413+
expected: true,
414+
},
415+
{
416+
name: "s1 contains s2",
417+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
418+
s2: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
419+
expected: true,
420+
},
421+
{
422+
name: "s1 contains s2 start point span",
423+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
424+
s2: roachpb.Span{Key: roachpb.Key("a")},
425+
expected: true,
426+
},
427+
{
428+
name: "s1 contains s2 end point span",
429+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
430+
s2: roachpb.Span{EndKey: roachpb.Key("d")},
431+
expected: true,
432+
},
433+
{
434+
name: "s1 point contains s2 point same key",
435+
s1: roachpb.Span{Key: roachpb.Key("a")},
436+
s2: roachpb.Span{Key: roachpb.Key("a")},
437+
expected: true,
438+
},
439+
{
440+
name: "s1 does not contain s2 - disjoint",
441+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
442+
s2: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("f")},
443+
expected: false,
444+
},
445+
{
446+
name: "s1 does not contain s2 - partial overlap",
447+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
448+
s2: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")},
449+
expected: false,
450+
},
451+
{
452+
name: "s1 does not contain s2 - s2 larger",
453+
s1: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
454+
s2: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
455+
expected: false,
456+
},
457+
{
458+
name: "s1 does not contain s2 point - outside range",
459+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
460+
s2: roachpb.Span{Key: roachpb.Key("d")},
461+
expected: false,
462+
},
463+
{
464+
name: "s1 with nil end does not contain disjoint span",
465+
s1: roachpb.Span{Key: roachpb.Key("a")},
466+
s2: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")},
467+
expected: false,
468+
},
469+
{
470+
name: "s1 with nil start does not contain disjoint span",
471+
s1: roachpb.Span{EndKey: roachpb.Key("b")},
472+
s2: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")},
473+
expected: false,
474+
},
475+
{
476+
name: "s1 with nil end does not contain s2 with nil start",
477+
s1: roachpb.Span{Key: roachpb.Key("a")},
478+
s2: roachpb.Span{Key: roachpb.Key("b")},
479+
expected: false,
480+
},
481+
{
482+
name: "s1 with nil start does not contain s2 with nil end",
483+
s1: roachpb.Span{EndKey: roachpb.Key("a")},
484+
s2: roachpb.Span{EndKey: roachpb.Key("d")},
485+
expected: false,
486+
},
487+
}
488+
489+
for _, tc := range testCases {
490+
t.Run(tc.name, func(t *testing.T) {
491+
require.Equal(t, tc.expected, Contains(tc.s1, tc.s2))
492+
})
493+
}
494+
}
495+
496+
// Test that Overlaps correctly determines if s1 overlaps s2, including
497+
// support for spans with nil start/end keys.
498+
func TestOverlaps(t *testing.T) {
499+
defer leaktest.AfterTest(t)()
500+
501+
testCases := []struct {
502+
name string
503+
s1 roachpb.Span
504+
s2 roachpb.Span
505+
expected bool
506+
}{
507+
{
508+
name: "s1 overlaps s2 exactly",
509+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
510+
s2: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
511+
expected: true,
512+
},
513+
{
514+
name: "s1 overlaps s2 partial",
515+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
516+
s2: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")},
517+
expected: true,
518+
},
519+
{
520+
name: "s1 contains s2",
521+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
522+
s2: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
523+
expected: true,
524+
},
525+
{
526+
name: "s2 contains s1",
527+
s1: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
528+
s2: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
529+
expected: true,
530+
},
531+
{
532+
name: "s1 overlaps s2 with nil end",
533+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
534+
s2: roachpb.Span{Key: roachpb.Key("a")},
535+
expected: true,
536+
},
537+
{
538+
name: "s1 overlaps s2 with nil start",
539+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")},
540+
s2: roachpb.Span{EndKey: roachpb.Key("d")},
541+
expected: true,
542+
},
543+
{
544+
name: "s1 point overlaps s2 point same key",
545+
s1: roachpb.Span{Key: roachpb.Key("a")},
546+
s2: roachpb.Span{Key: roachpb.Key("a")},
547+
expected: true,
548+
},
549+
{
550+
name: "s1 point overlaps s2 point same end key",
551+
s1: roachpb.Span{EndKey: roachpb.Key("a")},
552+
s2: roachpb.Span{EndKey: roachpb.Key("a")},
553+
expected: true,
554+
},
555+
{
556+
name: "s1 does not overlap s2",
557+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
558+
s2: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("f")},
559+
expected: false,
560+
},
561+
{
562+
name: "s1 does not overlap s2 - adjacent",
563+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
564+
s2: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
565+
expected: false,
566+
},
567+
{
568+
name: "s1 does not overlap s2 with nil end key",
569+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
570+
s2: roachpb.Span{Key: roachpb.Key("c")},
571+
expected: false,
572+
},
573+
{
574+
name: "s1 does not overlap s2 with nil start key",
575+
s1: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
576+
s2: roachpb.Span{EndKey: roachpb.Key("a")},
577+
expected: false,
578+
},
579+
{
580+
name: "s1 point does not overlap s2 - different keys",
581+
s1: roachpb.Span{Key: roachpb.Key("a")},
582+
s2: roachpb.Span{Key: roachpb.Key("b")},
583+
expected: false,
584+
},
585+
}
586+
587+
for _, tc := range testCases {
588+
t.Run(tc.name, func(t *testing.T) {
589+
require.Equal(t, tc.expected, Overlaps(tc.s1, tc.s2))
590+
// Overlaps should be commutative.
591+
require.Equal(t, Overlaps(tc.s1, tc.s2), Overlaps(tc.s2, tc.s1))
592+
})
593+
}
594+
}

0 commit comments

Comments
 (0)