Skip to content

Commit 6705d50

Browse files
craig[bot]angles-n-daemonsandy-kimball
committed
143517: split: add SplitStatistics function to Decider r=angles-n-daemons a=angles-n-daemons split: add SplitStatistics function to Decider So that external callers can be aware of the internal statistics on the load based splitters, this PR adds a fairly straightforward "SplitStatistics" function to the replica split deciders. Fixes: #138760 Epic: CRDB-43150 Release note: None 143965: cspann: fix partition data sorting bug r=drewkimball a=andy-kimball When sorting partition data into left and right groupings, vectors can be sorted in a different order than associated child keys and value bytes. This commit updates the logic to operate on all the partition data at once, not just the vectors. Epic: CRDB-42943 Release note: None Co-authored-by: Brian Dillmann <[email protected]> Co-authored-by: Andrew Kimball <[email protected]>
3 parents d1bfdaa + 9fec634 + 5775bd6 commit 6705d50

File tree

10 files changed

+465
-221
lines changed

10 files changed

+465
-221
lines changed

pkg/kv/kvserver/split/decider.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ type PopularKey struct {
2828
Frequency float64
2929
}
3030

31+
type SplitStatistics struct {
32+
AccessDirection float64
33+
PopularKey PopularKey
34+
}
35+
3136
type LoadBasedSplitter interface {
3237
redact.SafeFormatter
3338
// Record informs the LoadBasedSplitter about where the span lies with regard
@@ -359,6 +364,21 @@ func (d *Decider) MaybeSplitKey(ctx context.Context, now time.Time) roachpb.Key
359364
return key
360365
}
361366

367+
// SplitStatistics gets the split stats of the current replica if load-based
368+
// splitting has been engaged.
369+
func (d *Decider) SplitStatistics() *SplitStatistics {
370+
d.mu.Lock()
371+
defer d.mu.Unlock()
372+
373+
if d.mu.splitFinder != nil {
374+
return &SplitStatistics{
375+
AccessDirection: d.mu.splitFinder.AccessDirection(),
376+
PopularKey: d.mu.splitFinder.PopularKey(),
377+
}
378+
}
379+
return nil
380+
}
381+
362382
// Reset deactivates any current attempt at determining a split key. The method
363383
// also discards any historical stat tracking information.
364384
func (d *Decider) Reset(now time.Time) {

pkg/kv/kvserver/split/decider_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,110 @@ func TestMaxStatTracker(t *testing.T) {
374374
require.Equal(t, 1, mt.curIdx)
375375
}
376376

377+
func TestSplitStatisticsGeneral(t *testing.T) {
378+
defer leaktest.AfterTest(t)()
379+
for _, test := range []struct {
380+
name string
381+
useWeighted bool
382+
expected *SplitStatistics
383+
}{
384+
{"unweighted", false, &SplitStatistics{
385+
AccessDirection: 0.4945791444904396,
386+
PopularKey: PopularKey{
387+
Key: keys.SystemSQLCodec.TablePrefix(uint32(52)),
388+
Frequency: 0.05,
389+
},
390+
}},
391+
{"weighted", true, &SplitStatistics{
392+
AccessDirection: 0.3885786802030457,
393+
PopularKey: PopularKey{
394+
Key: keys.SystemSQLCodec.TablePrefix(uint32(111)),
395+
Frequency: 0.05,
396+
},
397+
}},
398+
} {
399+
t.Run(test.name, func(t *testing.T) {
400+
rand := rand.New(rand.NewPCG(11, 11))
401+
timeStart := 1000
402+
403+
var decider Decider
404+
loadSplitConfig := testLoadSplitConfig{
405+
randSource: rand,
406+
useWeighted: test.useWeighted,
407+
statRetention: time.Second,
408+
statThreshold: 1,
409+
}
410+
411+
Init(&decider, &loadSplitConfig, &LoadSplitterMetrics{
412+
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
413+
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
414+
}, SplitCPU)
415+
416+
for i := 1; i <= 1000; i++ {
417+
k := i
418+
if i > 500 {
419+
k = 500 - i
420+
}
421+
decider.Record(context.Background(), ms(timeStart+i*50), ld(1), func() roachpb.Span {
422+
return roachpb.Span{Key: keys.SystemSQLCodec.TablePrefix(uint32(k))}
423+
})
424+
}
425+
426+
assert.Equal(t, decider.SplitStatistics(), test.expected)
427+
})
428+
}
429+
}
430+
431+
func TestSplitStatisticsPopularKey(t *testing.T) {
432+
defer leaktest.AfterTest(t)()
433+
for _, test := range []struct {
434+
name string
435+
useWeighted bool
436+
expected *SplitStatistics
437+
}{
438+
{"unweighted", false, &SplitStatistics{
439+
AccessDirection: 1,
440+
PopularKey: PopularKey{
441+
Key: keys.SystemSQLCodec.TablePrefix(uint32(100)),
442+
Frequency: 1,
443+
},
444+
}},
445+
{"weighted", true, &SplitStatistics{
446+
AccessDirection: 1,
447+
PopularKey: PopularKey{
448+
Key: keys.SystemSQLCodec.TablePrefix(uint32(100)),
449+
Frequency: 1,
450+
},
451+
}},
452+
} {
453+
t.Run(test.name, func(t *testing.T) {
454+
rand := rand.New(rand.NewPCG(11, 11))
455+
timeStart := 1000
456+
457+
var decider Decider
458+
loadSplitConfig := testLoadSplitConfig{
459+
randSource: rand,
460+
useWeighted: test.useWeighted,
461+
statRetention: time.Second,
462+
statThreshold: 1,
463+
}
464+
465+
Init(&decider, &loadSplitConfig, &LoadSplitterMetrics{
466+
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
467+
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
468+
}, SplitCPU)
469+
470+
for i := 1; i <= 1000; i++ {
471+
decider.Record(context.Background(), ms(timeStart+i*50), ld(1), func() roachpb.Span {
472+
return roachpb.Span{Key: keys.SystemSQLCodec.TablePrefix(uint32(100))}
473+
})
474+
}
475+
476+
assert.Equal(t, decider.SplitStatistics(), test.expected)
477+
})
478+
}
479+
}
480+
377481
func TestDeciderMetrics(t *testing.T) {
378482
defer leaktest.AfterTest(t)()
379483
rng := rand.New(rand.NewPCG(11, 11))

pkg/sql/vecindex/cspann/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ go_test(
5353
"childkey_dedup_test.go",
5454
"cspannpb_test.go",
5555
"fixup_processor_test.go",
56+
"fixup_split_test.go",
5657
"fixup_worker_test.go",
5758
"index_stats_test.go",
5859
"index_test.go",

pkg/sql/vecindex/cspann/fixup_split.go

Lines changed: 62 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (fw *fixupWorker) splitPartition(
139139
}
140140

141141
log.VEventf(ctx, 2, "splitting partition %d with %d vectors (parent=%d, state=%s)",
142-
partitionKey, parentPartitionKey, partition.Count(), metadata.StateDetails.String())
142+
partitionKey, partition.Count(), parentPartitionKey, metadata.StateDetails.String())
143143

144144
// Update partition's state to Splitting.
145145
if metadata.StateDetails.State == ReadyState {
@@ -454,13 +454,13 @@ func (fw *fixupWorker) createSplitSubPartition(
454454
partitionKey PartitionKey,
455455
centroid vector.T,
456456
) (targetMetadata PartitionMetadata, err error) {
457-
const format = "creating split sub-partition %d, with parent %d"
457+
const format = "creating split sub-partition %d (source=%d, parent=%d)"
458458

459459
defer func() {
460-
err = errors.Wrapf(err, format, partitionKey, parentPartitionKey)
460+
err = errors.Wrapf(err, format, partitionKey, sourcePartitionKey, parentPartitionKey)
461461
}()
462462

463-
log.VEventf(ctx, 2, format, partitionKey, parentPartitionKey)
463+
log.VEventf(ctx, 2, format, partitionKey, sourcePartitionKey, parentPartitionKey)
464464

465465
// Create an empty partition in the Updating state.
466466
targetMetadata = PartitionMetadata{
@@ -548,15 +548,15 @@ func (fw *fixupWorker) addToParentPartition(
548548
func (fw *fixupWorker) deletePartition(
549549
ctx context.Context, parentPartitionKey, partitionKey PartitionKey,
550550
) (err error) {
551-
const format = "deleting partition %d (parent=%d, state=%d)"
551+
const format = "deleting partition %d (parent=%d, state=%s)"
552552
var parentMetadata PartitionMetadata
553553

554554
defer func() {
555555
err = errors.Wrapf(err, format,
556-
partitionKey, parentPartitionKey, parentMetadata.StateDetails.State)
556+
partitionKey, parentPartitionKey, parentMetadata.StateDetails.String())
557557
}()
558558

559-
// Load parent partition to verify that it's in a state that allows inserts.
559+
// Load parent partition to verify that it's in a state that allows removes.
560560
var parentPartition *Partition
561561
parentPartition, err = fw.getPartition(ctx, parentPartitionKey)
562562
if err != nil {
@@ -566,7 +566,8 @@ func (fw *fixupWorker) deletePartition(
566566
parentMetadata = *parentPartition.Metadata()
567567
}
568568

569-
log.VEventf(ctx, 2, format, partitionKey, parentPartitionKey, parentMetadata.StateDetails.State)
569+
log.VEventf(ctx, 2, format,
570+
partitionKey, parentPartitionKey, parentMetadata.StateDetails.String())
570571

571572
if !parentMetadata.StateDetails.State.AllowAddOrRemove() {
572573
// Child could not be removed from the parent because it doesn't exist or
@@ -640,21 +641,16 @@ func (fw *fixupWorker) copyToSplitSubPartitions(
640641
leftOffsets, rightOffsets = kmeans.AssignPartitions(
641642
vectors, leftMetadata.Centroid, rightMetadata.Centroid, tempOffsets)
642643

643-
// Sort vectors into contiguous left and right groupings.
644-
sortVectors(&fw.workspace, vectors, leftOffsets, rightOffsets)
644+
// Assign vectors and associated keys and values into contiguous left and right groupings.
645+
childKeys := slices.Clone(sourcePartition.ChildKeys())
646+
valueBytes := slices.Clone(sourcePartition.ValueBytes())
647+
splitPartitionData(&fw.workspace, vectors, childKeys, valueBytes, leftOffsets, rightOffsets)
645648
leftVectors := vectors
646649
rightVectors := leftVectors.SplitAt(len(leftOffsets))
647-
648-
childKeys := make([]ChildKey, vectors.Count)
649-
valueBytes := make([]ValueBytes, vectors.Count)
650-
leftChildKeys := copyByOffsets(
651-
sourcePartition.ChildKeys(), childKeys[:len(leftOffsets)], leftOffsets)
652-
rightChildKeys := copyByOffsets(
653-
sourcePartition.ChildKeys(), childKeys[len(leftOffsets):], rightOffsets)
654-
leftValueBytes := copyByOffsets(
655-
sourcePartition.ValueBytes(), valueBytes[:len(leftOffsets)], leftOffsets)
656-
rightValueBytes := copyByOffsets(
657-
sourcePartition.ValueBytes(), valueBytes[len(leftOffsets):], rightOffsets)
650+
leftChildKeys := childKeys[:len(leftOffsets)]
651+
rightChildKeys := childKeys[len(leftOffsets):]
652+
leftValueBytes := valueBytes[:len(leftOffsets)]
653+
rightValueBytes := valueBytes[len(leftOffsets):]
658654

659655
log.VEventf(ctx, 2, format,
660656
len(leftOffsets), sourceState.Target1, len(rightOffsets), sourceState.Target2)
@@ -740,63 +736,63 @@ func suppressRaceErrors(err error) (PartitionMetadata, error) {
740736
return PartitionMetadata{}, err
741737
}
742738

743-
// sortVectors sorts the input vectors in-place, according to the provided left
744-
// and right offsets, which reference vectors by position. Vectors at left
745-
// offsets are sorted at the beginning of the slice, followed by vectors at
746-
// right offsets. The internal ordering among left and right vectors is not
747-
// defined.
739+
// splitPartitionData groups the provided partition data according to the left
740+
// and right offsets. All data referenced by left offsets will be moved to the
741+
// left of each set or slice. All data referenced by right offsets will be moved
742+
// to the right. The internal ordering of elements on each side is not defined.
748743
//
749-
// NOTE: The left and right offsets are modified in-place with the updated
750-
// positions of the vectors.
751-
func sortVectors(w *workspace.T, vectors vector.Set, leftOffsets, rightOffsets []uint64) {
744+
// TODO(andyk): Passing in left and right offsets makes this overly complex. It
745+
// would be better to pass an assignments slice of the same length as the
746+
// partition data, where 0=left and 1=right.
747+
func splitPartitionData(
748+
w *workspace.T,
749+
vectors vector.Set,
750+
childKeys []ChildKey,
751+
valueBytes []ValueBytes,
752+
leftOffsets, rightOffsets []uint64,
753+
) {
752754
tempVector := w.AllocFloats(vectors.Dims)
753755
defer w.FreeFloats(tempVector)
754756

755-
// Sort left and right offsets.
756-
slices.Sort(leftOffsets)
757-
slices.Sort(rightOffsets)
758-
759-
// Any left offsets that point beyond the end of the left list indicate that
760-
// a vector needs to be moved from the right half of vectors to the left half.
761-
// The reverse is true for right offsets. Because the left and right offsets
762-
// are in sorted order, out-of-bounds offsets must be at the end of the left
763-
// list and the beginning of the right list. Therefore, the algorithm just
764-
// needs to iterate over those out-of-bounds offsets and swap the positions
765-
// of the referenced vectors.
766-
li := len(leftOffsets) - 1
767-
ri := 0
768-
769-
var rightToLeft, leftToRight vector.T
770-
for li >= 0 {
771-
left := int(leftOffsets[li])
772-
if left < len(leftOffsets) {
773-
break
757+
left := 0
758+
right := 0
759+
for {
760+
// Find a misplaced "right" element from the left side.
761+
var leftOffset int
762+
for {
763+
if left >= len(leftOffsets) {
764+
return
765+
}
766+
leftOffset = int(leftOffsets[left])
767+
left++
768+
if leftOffset >= len(leftOffsets) {
769+
break
770+
}
774771
}
775772

776-
right := int(rightOffsets[ri])
777-
if right >= len(leftOffsets) {
778-
panic(errors.AssertionFailedf(
779-
"expected equal number of left and right offsets that need to be swapped"))
773+
// There must be a misplaced "left" element from the right side.
774+
var rightOffset int
775+
for {
776+
rightOffset = int(rightOffsets[right])
777+
right++
778+
if rightOffset < len(leftOffsets) {
779+
break
780+
}
780781
}
781782

782-
// Swap vectors.
783-
rightToLeft = vectors.At(left)
784-
leftToRight = vectors.At(right)
783+
// Swap the two elements.
784+
rightToLeft := vectors.At(leftOffset)
785+
leftToRight := vectors.At(rightOffset)
785786
copy(tempVector, rightToLeft)
786787
copy(rightToLeft, leftToRight)
787788
copy(leftToRight, tempVector)
788789

789-
leftOffsets[li] = uint64(left)
790-
rightOffsets[ri] = uint64(right)
791-
792-
li--
793-
ri++
794-
}
795-
}
790+
tempChildKey := childKeys[leftOffset]
791+
childKeys[leftOffset] = childKeys[rightOffset]
792+
childKeys[rightOffset] = tempChildKey
796793

797-
func copyByOffsets[T any](source, target []T, offsets []uint64) []T {
798-
for i := range offsets {
799-
target[i] = source[offsets[i]]
794+
tempValueBytes := valueBytes[leftOffset]
795+
valueBytes[leftOffset] = valueBytes[rightOffset]
796+
valueBytes[rightOffset] = tempValueBytes
800797
}
801-
return target
802798
}

0 commit comments

Comments
 (0)