Skip to content

Commit 67cb06a

Browse files
craig[bot]andy-kimball
andcommitted
Merge #147953
147953: cspann: reassign splitting partition vectors to sibling partitions r=drewkimball a=andy-kimball #### cspann: reassign splitting partition vectors to sibling partitions When a partition splits, its vectors are assigned to either the new left or new right sub-partition. However, in some cases a vector is actually closer to a different sibling partition. This commit checks for that case and reassigns vectors to the sibling partitions with the closest centroid. #### cspann: change TryGetPartitionMetadata to handle batches Previously, TryGetPartitionMetadata fetched metadata for a single partition. This commit changes that to instead fetch metadata for a batch of multiple partitions in one call. This will be used by a future commit. Co-authored-by: Andrew Kimball <[email protected]>
2 parents 651f996 + a7b529f commit 67cb06a

File tree

13 files changed

+686
-217
lines changed

13 files changed

+686
-217
lines changed

pkg/sql/vecindex/cspann/commontest/storetests.go

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,11 @@ type MakeStoreFunc func(quantizer quantize.Quantizer) TestStore
7272
type StoreTestSuite struct {
7373
suite.Suite
7474

75-
ctx context.Context
76-
makeStore MakeStoreFunc
77-
rootQuantizer quantize.Quantizer
78-
quantizer quantize.Quantizer
75+
ctx context.Context
76+
makeStore MakeStoreFunc
77+
rootQuantizer quantize.Quantizer
78+
quantizer quantize.Quantizer
79+
nextPartitionKey cspann.PartitionKey
7980
}
8081

8182
// NewStoreTestSuite constructs a new suite of tests that run against
@@ -84,10 +85,12 @@ type StoreTestSuite struct {
8485
// tests.
8586
func NewStoreTestSuite(ctx context.Context, makeStore MakeStoreFunc) *StoreTestSuite {
8687
return &StoreTestSuite{
87-
ctx: ctx,
88-
makeStore: makeStore,
89-
rootQuantizer: quantize.NewUnQuantizer(2, vecpb.L2SquaredDistance),
90-
quantizer: quantize.NewRaBitQuantizer(2, 42, vecpb.L2SquaredDistance)}
88+
ctx: ctx,
89+
makeStore: makeStore,
90+
rootQuantizer: quantize.NewUnQuantizer(2, vecpb.L2SquaredDistance),
91+
quantizer: quantize.NewRaBitQuantizer(2, 42, vecpb.L2SquaredDistance),
92+
nextPartitionKey: cspann.RootKey + 1,
93+
}
9194
}
9295

9396
func (suite *StoreTestSuite) TestRunTransaction() {
@@ -696,31 +699,38 @@ func (suite *StoreTestSuite) TestTryGetPartitionMetadata() {
696699

697700
doTest := func(treeID int) {
698701
treeKey := store.MakeTreeKey(suite.T(), treeID)
699-
partitionKey := cspann.PartitionKey(10)
700-
701-
// Partition does not yet exist.
702-
_, err := store.TryGetPartitionMetadata(suite.ctx, treeKey, partitionKey)
703-
suite.ErrorIs(err, cspann.ErrPartitionNotFound)
704702

705-
// Create partition with some vectors in it.
706-
partitionKey, partition := suite.createTestPartition(store, treeKey)
703+
// Create two partition with vectors in them.
704+
partitionKey1, partition1 := suite.createTestPartition(store, treeKey)
705+
partitionKey2, partition2 := suite.createTestPartition(store, treeKey)
707706

708-
// Fetch back only the metadata and validate it.
709-
partitionMetadata, err := store.TryGetPartitionMetadata(suite.ctx, treeKey, partitionKey)
707+
// Fetch metadata for the partitions, along with one that doesn't exist.
708+
toGet := []cspann.PartitionMetadataToGet{
709+
{Key: partitionKey1},
710+
{Key: cspann.PartitionKey(9999)},
711+
{Key: partitionKey2},
712+
}
713+
err := store.TryGetPartitionMetadata(suite.ctx, treeKey, toGet)
710714
suite.NoError(err)
711-
suite.True(partitionMetadata.Equal(partition.Metadata()))
715+
716+
// Validate that partition 9999 does not exist.
717+
suite.Equal(cspann.PartitionMetadata{}, toGet[1].Metadata)
718+
719+
// Validate metadata for other partitions.
720+
suite.True(partition1.Metadata().Equal(&toGet[0].Metadata))
721+
suite.True(partition2.Metadata().Equal(&toGet[2].Metadata))
712722

713723
// Update the metadata and verify we get the updated values.
714-
expected := *partition.Metadata()
724+
expected := toGet[0].Metadata
715725
metadata := expected
716726
metadata.StateDetails.MakeUpdating(30)
717727
suite.NoError(store.TryUpdatePartitionMetadata(
718-
suite.ctx, treeKey, partitionKey, metadata, expected))
728+
suite.ctx, treeKey, partitionKey1, metadata, expected))
719729

720730
// Fetch updated metadata and validate.
721-
partitionMetadata, err = store.TryGetPartitionMetadata(suite.ctx, treeKey, partitionKey)
731+
err = store.TryGetPartitionMetadata(suite.ctx, treeKey, toGet[:1])
722732
suite.NoError(err)
723-
suite.True(partitionMetadata.Equal(&metadata))
733+
suite.True(toGet[0].Metadata.Equal(&metadata))
724734
}
725735

726736
suite.Run("default tree", func() {
@@ -1039,7 +1049,8 @@ func (suite *StoreTestSuite) TestTryClearPartition() {
10391049
func (suite *StoreTestSuite) createTestPartition(
10401050
store TestStore, treeKey cspann.TreeKey,
10411051
) (cspann.PartitionKey, *cspann.Partition) {
1042-
partitionKey := cspann.PartitionKey(10)
1052+
partitionKey := suite.nextPartitionKey
1053+
suite.nextPartitionKey++
10431054
metadata := cspann.MakeReadyPartitionMetadata(cspann.SecondLevel, vector.T{4, 3})
10441055
suite.NoError(store.TryCreateEmptyPartition(suite.ctx, treeKey, partitionKey, metadata))
10451056
vectors := vector.MakeSet(2)

pkg/sql/vecindex/cspann/fixup_split.go

Lines changed: 162 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package cspann
77

88
import (
99
"context"
10+
"math"
1011
"slices"
1112

1213
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/cspann/workspace"
@@ -209,6 +210,7 @@ func (fw *fixupWorker) splitPartition(
209210
return errors.Wrapf(errFixupAborted, "reloading partition, metadata timestamp changed")
210211
}
211212
} else if metadata.StateDetails.State != MissingState {
213+
// Fetch metadata for already created left and right partitions.
212214
leftMetadata, err = fw.getPartitionMetadata(ctx, leftPartitionKey)
213215
if err != nil {
214216
return err
@@ -226,6 +228,15 @@ func (fw *fixupWorker) splitPartition(
226228
return err
227229
}
228230

231+
// While most vectors will be assigned to the new sub-partitions, some may
232+
// need to be reassigned to siblings that are closer.
233+
vectors, err = fw.reassignToSiblings(
234+
ctx, parentPartitionKey, partitionKey, partition, vectors,
235+
leftPartitionKey, rightPartitionKey)
236+
if err != nil {
237+
return err
238+
}
239+
229240
// If still updating the sub-partitions, then distribute vectors among them.
230241
leftState := leftMetadata.StateDetails.State
231242
rightState := rightMetadata.StateDetails.State
@@ -345,6 +356,149 @@ func (fw *fixupWorker) splitPartition(
345356
return nil
346357
}
347358

359+
// reassignToSiblings checks if the vectors in a splitting partition need to be
360+
// assigned to partitions other than the left and right sub-partitions. If a
361+
// vector is closer to a sibling partition's centroid than it is to the left or
362+
// right partitions' centroids, then it will be added to the sibling partition.
363+
// It is also removed from "sourcePartition" and from "sourceVectors".
364+
//
365+
// reassignToSiblings returns the source vectors, minus any reassigned vectors.
366+
func (fw *fixupWorker) reassignToSiblings(
367+
ctx context.Context,
368+
parentPartitionKey, sourcePartitionKey PartitionKey,
369+
sourcePartition *Partition,
370+
sourceVectors vector.Set,
371+
leftPartitionKey, rightPartitionKey PartitionKey,
372+
) (vector.Set, error) {
373+
// No siblings if this is the root.
374+
if parentPartitionKey == InvalidKey {
375+
return sourceVectors, nil
376+
}
377+
378+
// Fetch parent partition. If it does not exist, then it could be because
379+
// another agent completed the split or because the parent was itself split.
380+
// In either case, abort this split. If it's not yet done, it will be
381+
// restarted at a later time with the new parent.
382+
parentPartition, err := fw.getPartition(ctx, parentPartitionKey)
383+
if err != nil {
384+
return vector.Set{}, err
385+
}
386+
if parentPartition == nil {
387+
return vector.Set{}, errors.Wrapf(errFixupAborted,
388+
"parent partition %d of partition %d no longer exists",
389+
parentPartitionKey, sourcePartitionKey)
390+
}
391+
392+
// Remove the splitting partition, since vectors cannot be assigned to it. If
393+
// it is not a child of the parent, then abort the split. This can happen if
394+
// another agent has completed the split or if the splitting partition has
395+
// been re-parented when a new level was added to the tree.
396+
if !parentPartition.ReplaceWithLastByKey(ChildKey{PartitionKey: sourcePartitionKey}) {
397+
return vector.Set{}, errors.Wrapf(errFixupAborted,
398+
"partition %d is no longer a child of parent partition %d",
399+
sourcePartitionKey, parentPartitionKey)
400+
}
401+
402+
// Lazily get sibling metadata only if it's actually needed.
403+
fw.tempMetadataToGet = fw.tempMetadataToGet[:0]
404+
getSiblingMetadata := func() ([]PartitionMetadataToGet, error) {
405+
if len(fw.tempMetadataToGet) == 0 {
406+
fw.tempMetadataToGet = ensureSliceLen(fw.tempMetadataToGet, parentPartition.Count())
407+
for i := range len(fw.tempMetadataToGet) {
408+
fw.tempMetadataToGet[i].Key = parentPartition.ChildKeys()[i].PartitionKey
409+
}
410+
err = fw.index.store.TryGetPartitionMetadata(ctx, fw.treeKey, fw.tempMetadataToGet)
411+
if err != nil {
412+
return nil, errors.Wrapf(err,
413+
"getting partition metadata for %d siblings of partition %d (parent=%d)",
414+
len(fw.tempMetadataToGet)-1, sourcePartitionKey, parentPartitionKey)
415+
}
416+
}
417+
return fw.tempMetadataToGet, nil
418+
}
419+
420+
tempSiblingDistances := fw.workspace.AllocFloats(parentPartition.Count())
421+
defer fw.workspace.FreeFloats(tempSiblingDistances)
422+
tempSiblingErrorBounds := fw.workspace.AllocFloats(parentPartition.Count())
423+
defer fw.workspace.FreeFloats(tempSiblingErrorBounds)
424+
425+
for i := 0; i < sourceVectors.Count; i++ {
426+
// Check whether the vector is closer to a sibling centroid than its own
427+
// new centroid.
428+
vec := sourceVectors.At(i)
429+
parentPartition.Quantizer().EstimateDistances(&fw.workspace,
430+
parentPartition.QuantizedSet(), vec, tempSiblingDistances, tempSiblingErrorBounds)
431+
432+
var leftDistance, rightDistance float32
433+
minDistance := float32(math.MaxFloat32)
434+
for offset, childKey := range parentPartition.ChildKeys() {
435+
minDistance = min(minDistance, tempSiblingDistances[offset])
436+
if childKey.PartitionKey == leftPartitionKey {
437+
leftDistance = tempSiblingDistances[offset]
438+
} else if childKey.PartitionKey == rightPartitionKey {
439+
rightDistance = tempSiblingDistances[offset]
440+
}
441+
}
442+
if minDistance >= leftDistance && minDistance >= rightDistance {
443+
// Could not find a closer sibling, so done with this vector.
444+
continue
445+
}
446+
447+
// Lazily fetch metadata for sibling partitions.
448+
allSiblingMetadata, err := getSiblingMetadata()
449+
if err != nil {
450+
return vector.Set{}, err
451+
}
452+
453+
// Find nearest sibling that allows inserts and is closer than either the
454+
// left or right sub-partitions.
455+
siblingOffset := -1
456+
minDistance = min(leftDistance, rightDistance)
457+
for offset, distance := range tempSiblingDistances {
458+
if distance >= minDistance {
459+
continue
460+
} else if !fw.tempMetadataToGet[offset].Metadata.StateDetails.State.AllowAddOrRemove() {
461+
continue
462+
}
463+
siblingOffset = offset
464+
minDistance = distance
465+
}
466+
if siblingOffset == -1 {
467+
// No closer sibling could be found, so return.
468+
continue
469+
}
470+
471+
// Attempt to insert into the partition.
472+
siblingPartitionKey := parentPartition.ChildKeys()[siblingOffset].PartitionKey
473+
siblingMetadata := allSiblingMetadata[siblingOffset].Metadata
474+
childKey := sourcePartition.ChildKeys()[i : i+1]
475+
valueBytes := sourcePartition.ValueBytes()[i : i+1]
476+
_, err = fw.addToPartition(ctx, siblingPartitionKey, vec.AsSet(),
477+
childKey, valueBytes, siblingMetadata)
478+
if err != nil {
479+
allSiblingMetadata[siblingOffset].Metadata, err = suppressRaceErrors(err)
480+
if err == nil {
481+
// Another worker raced to update the metadata, so just skip.
482+
continue
483+
}
484+
return vector.Set{}, errors.Wrapf(err,
485+
"adding vector from splitting partition %d to partition %d",
486+
sourcePartitionKey, siblingPartitionKey)
487+
}
488+
489+
// Add succeeded, so remove the vector from the splitting partition.
490+
sourceVectors.ReplaceWithLast(i)
491+
sourcePartition.ReplaceWithLast(i)
492+
i--
493+
494+
log.VEventf(ctx, 3,
495+
"reassigning vector from splitting partition %d (parent=%d) to sibling partition %d",
496+
sourcePartitionKey, parentPartitionKey, siblingPartitionKey)
497+
}
498+
499+
return sourceVectors, nil
500+
}
501+
348502
// getPartition returns the partition with the given key, or nil if it does not
349503
// exist.
350504
func (fw *fixupWorker) getPartition(
@@ -365,14 +519,13 @@ func (fw *fixupWorker) getPartition(
365519
func (fw *fixupWorker) getPartitionMetadata(
366520
ctx context.Context, partitionKey PartitionKey,
367521
) (PartitionMetadata, error) {
368-
metadata, err := fw.index.store.TryGetPartitionMetadata(ctx, fw.treeKey, partitionKey)
522+
fw.tempMetadataToGet = ensureSliceLen(fw.tempMetadataToGet, 1)
523+
fw.tempMetadataToGet[0].Key = partitionKey
524+
err := fw.index.store.TryGetPartitionMetadata(ctx, fw.treeKey, fw.tempMetadataToGet)
369525
if err != nil {
370-
metadata, err = suppressRaceErrors(err)
371-
if err != nil {
372-
return PartitionMetadata{}, errors.Wrapf(err, "getting metadata for partition %d", partitionKey)
373-
}
526+
return PartitionMetadata{}, errors.Wrapf(err, "getting metadata for partition %d", partitionKey)
374527
}
375-
return metadata, nil
528+
return fw.tempMetadataToGet[0].Metadata, nil
376529
}
377530

378531
// updateMetadata updates the given partition's metadata record, on the
@@ -569,6 +722,7 @@ func (fw *fixupWorker) createSplitSubPartition(
569722
err = fw.index.store.TryCreateEmptyPartition(ctx, fw.treeKey, partitionKey, targetMetadata)
570723
if err != nil {
571724
targetMetadata, err = suppressRaceErrors(err)
725+
centroid = targetMetadata.Centroid
572726
if err != nil {
573727
return PartitionMetadata{}, errors.Wrap(err, "creating empty sub-partition")
574728
}
@@ -586,8 +740,7 @@ func (fw *fixupWorker) createSplitSubPartition(
586740
// Load parent metadata to verify that it's in a state that allows inserts.
587741
parentMetadata, err := fw.getPartitionMetadata(ctx, parentPartitionKey)
588742
if err != nil {
589-
return PartitionMetadata{}, errors.Wrapf(err,
590-
"getting parent partition %d metadata", parentPartitionKey)
743+
return PartitionMetadata{}, err
591744
}
592745

593746
parentLevel := sourceMetadata.Level + 1
@@ -631,7 +784,7 @@ func (fw *fixupWorker) addToParentPartition(
631784
centroid = tempCentroid
632785
}
633786

634-
// Add the target partition key to the root paritition.
787+
// Add the target partition key to the parent partition.
635788
fw.tempChildKey[0] = ChildKey{PartitionKey: partitionKey}
636789
fw.tempValueBytes[0] = nil
637790
added, err := fw.addToPartition(ctx, parentPartitionKey,

pkg/sql/vecindex/cspann/fixup_worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ type fixupWorker struct {
9595
tempVectorsWithKeys []VectorWithKey
9696
tempChildKey [1]ChildKey
9797
tempValueBytes [1]ValueBytes
98+
tempMetadataToGet []PartitionMetadataToGet
9899
}
99100

100101
// ewFixupWorker returns a new worker for the given processor.

pkg/sql/vecindex/cspann/index_test.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -799,20 +799,23 @@ func (s *testState) loadIndexFromFormat(
799799
childLevel := cspann.LeafLevel
800800
childVectors := vector.MakeSet(len(centroid))
801801
childKeys := []cspann.ChildKey(nil)
802-
childIndent := 0
803802

804-
// Loop over children.
805-
for len(lines) > 0 && len(lines[0]) > indent {
806-
if strings.HasSuffix(lines[0], "│") {
807-
childIndent = len(lines[0])
808-
lines = lines[1:]
803+
if len(lines) > 0 && strings.HasSuffix(lines[0], "│") {
804+
// There are children, so loop over them.
805+
childIndent := len(lines[0]) - len("│")
806+
for len(lines) > 0 && len(lines[0]) > childIndent {
807+
remainder := lines[0][childIndent:]
808+
if remainder == "│" {
809+
// Skip line.
810+
lines = lines[1:]
811+
continue
812+
} else if strings.HasPrefix(remainder, "├") || strings.HasPrefix(remainder, "└") {
813+
var childVector vector.T
814+
lines, childLevel, childVector, childKey = s.loadIndexFromFormat(treeKey, lines, childIndent)
815+
childVectors.Add(childVector)
816+
childKeys = append(childKeys, childKey)
817+
}
809818
}
810-
require.Greater(s.T, childIndent, 0)
811-
812-
var childVector vector.T
813-
lines, childLevel, childVector, childKey = s.loadIndexFromFormat(treeKey, lines, childIndent)
814-
childVectors.Add(childVector)
815-
childKeys = append(childKeys, childKey)
816819
}
817820

818821
metadata := cspann.PartitionMetadata{

0 commit comments

Comments
 (0)