Skip to content

Commit 88a72c1

Browse files
craig[bot]andy-kimball
andcommitted
Merge #142793
142793: vecindex: check partition size in fixup processor r=andy-kimball a=andy-kimball Previously, we checked the size of a partition after adding or removing a vector to/from it. The main reason for doing this was to decide whether the partition needed to be split or merged. However, this added to the contention footprint of the user's transaction. This commit moves the partition size check to the background processor, as part of the SplitOrMergeFixup, and makes the size scan INCONSISTENT. While this can return stale data, it's OK if a split or merge gets delayed because of that. If the inconsistent check indicates that a split or merge may be necessary, we run a consistent scan to ensure that it's really necessary. Besides being used to check for splits/merges, the partition size scan was used to ensure that we don't create an unbalanced K-means tree by moving the last vector of a non-root partition to a sibling partition. Now that RemoveFromPartition no longer returns a count, we instead use a new "skipLonelyVector" option when searching. This option stops the search from returning any vectors that are last in their partition, which prevents the creation of empty non-root partitions. Epic: CRDB-42943 Release note: None Co-authored-by: Andrew Kimball <[email protected]>
2 parents 44c8875 + c6136e1 commit 88a72c1

31 files changed

+622
-441
lines changed

pkg/sql/vecindex/BUILD.bazel

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ go_test(
3939
srcs = [
4040
"manager_test.go",
4141
"searcher_test.go",
42+
"vecindex_test.go",
4243
],
43-
data = glob(["testdata/**"]),
44+
data = ["//pkg/sql/vecindex/cspann:features_10000"],
4445
embed = [":vecindex"],
4546
deps = [
4647
"//pkg/base",
@@ -63,6 +64,7 @@ go_test(
6364
"//pkg/sql/types",
6465
"//pkg/sql/vecindex/cspann",
6566
"//pkg/sql/vecindex/cspann/quantize",
67+
"//pkg/sql/vecindex/cspann/testutils",
6668
"//pkg/sql/vecindex/vecpb",
6769
"//pkg/sql/vecindex/vecstore",
6870
"//pkg/testutils/serverutils",

pkg/sql/vecindex/cspann/BUILD.bazel

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
33
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
44

55
filegroup(
6-
name = "testdata",
7-
srcs = glob(["testdata/**"]),
8-
visibility = ["//pkg/sql/vecindex/cspann:__subpackages__"],
6+
name = "features_10000",
7+
srcs = glob(["testdata/features_10000.gob"]),
8+
visibility = ["//visibility:public"],
99
)
1010

1111
go_library(
@@ -55,7 +55,7 @@ go_test(
5555
"partition_test.go",
5656
"search_set_test.go",
5757
],
58-
data = ["//pkg/sql/vecindex/cspann:testdata"],
58+
data = glob(["testdata/**"]),
5959
embed = [":cspann"],
6060
deps = [
6161
"//pkg/sql/vecindex/cspann/commontest",

pkg/sql/vecindex/cspann/commontest/storetests.go

Lines changed: 176 additions & 153 deletions
Large diffs are not rendered by default.

pkg/sql/vecindex/cspann/commontest/utils.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,24 @@ import (
1818

1919
// CheckPartitionMetadata tests the correctness of the given metadata's fields.
2020
func CheckPartitionMetadata(
21-
t *testing.T, metadata cspann.PartitionMetadata, level cspann.Level, centroid vector.T, count int,
21+
t *testing.T, metadata cspann.PartitionMetadata, level cspann.Level, centroid vector.T,
2222
) {
2323
require.Equal(t, level, metadata.Level)
2424
require.Equal(t, []float32(centroid), testutils.RoundFloats(metadata.Centroid, 2))
25-
require.Equal(t, count, metadata.Count)
25+
}
26+
27+
// CheckPartitionCount tests the size of a partition.
28+
func CheckPartitionCount(
29+
ctx context.Context,
30+
t *testing.T,
31+
store cspann.Store,
32+
treeKey cspann.TreeKey,
33+
partitionKey cspann.PartitionKey,
34+
expectedCount int,
35+
) {
36+
count, err := store.EstimatePartitionCount(ctx, treeKey, partitionKey)
37+
require.NoError(t, err)
38+
require.Equal(t, expectedCount, count)
2639
}
2740

2841
// BeginTransaction starts a new transaction for the given store and returns it.

pkg/sql/vecindex/cspann/fixup_processor.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import (
2323
type fixupType int
2424

2525
const (
26-
// splitOrMergeFixup is a fixup that includes the key of a partition to
27-
// split or merge as well as the key of its parent partition (if it exists).
28-
// Whether a partition is split or merged depends on its size.
26+
// splitOrMergeFixup is a fixup that includes the key of a partition that
27+
// may need to be split or merged, as well as the key of its parent partition
28+
// (if it exists). Whether a partition is split or merged depends on its size.
2929
splitOrMergeFixup fixupType = iota + 1
3030
// vectorDeleteFixup is a fixup that includes the primary key of a vector to
3131
// delete from the index, as well as the key of the partition that contains
@@ -266,20 +266,9 @@ func (fp *FixupProcessor) DelayInsertOrDelete(ctx context.Context) error {
266266
return nil
267267
}
268268

269-
// AddSplit enqueues a split fixup for later processing.
270-
func (fp *FixupProcessor) AddSplit(
271-
ctx context.Context, treeKey TreeKey, parentPartitionKey PartitionKey, partitionKey PartitionKey,
272-
) {
273-
fp.addFixup(ctx, fixup{
274-
TreeKey: treeKey,
275-
Type: splitOrMergeFixup,
276-
ParentPartitionKey: parentPartitionKey,
277-
PartitionKey: partitionKey,
278-
})
279-
}
280-
281-
// AddMerge enqueues a merge fixup for later processing.
282-
func (fp *FixupProcessor) AddMerge(
269+
// AddSplitOrMergeCheck enqueues a fixup to check whether a split or merge is
270+
// needed for the given partition.
271+
func (fp *FixupProcessor) AddSplitOrMergeCheck(
283272
ctx context.Context, treeKey TreeKey, parentPartitionKey PartitionKey, partitionKey PartitionKey,
284273
) {
285274
fp.addFixup(ctx, fixup{

pkg/sql/vecindex/cspann/fixup_worker.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,18 @@ func (fw *fixupWorker) Start(ctx context.Context) {
105105
func (fw *fixupWorker) splitOrMergePartition(
106106
ctx context.Context, parentPartitionKey PartitionKey, partitionKey PartitionKey,
107107
) (err error) {
108+
// Do a quick, inconsistent scan of the partition, in order to see if it may
109+
// need to be split or merged.
110+
count, err := fw.index.store.EstimatePartitionCount(ctx, fw.treeKey, partitionKey)
111+
if err != nil {
112+
return errors.Wrapf(err, "counting vectors in partition %d", partitionKey)
113+
}
114+
split := count > fw.index.options.MaxPartitionSize
115+
merge := partitionKey != RootKey && count < fw.index.options.MinPartitionSize
116+
if !split && !merge {
117+
return nil
118+
}
119+
108120
// Run the split or merge within a transaction.
109121
fw.txn, err = fw.index.store.BeginTransaction(ctx)
110122
if err != nil {
@@ -128,9 +140,10 @@ func (fw *fixupWorker) splitOrMergePartition(
128140
return errors.Wrapf(err, "getting partition %d to split or merge", partitionKey)
129141
}
130142

131-
// Don't split or merge the partition if its size is within bounds.
132-
split := partition.Count() > fw.index.options.MaxPartitionSize
133-
merge := partition.Count() < fw.index.options.MinPartitionSize
143+
// Re-check the size of the partition now that it's locked, using a consistent
144+
// scan, so that we are not acting based on stale information.
145+
split = partition.Count() > fw.index.options.MaxPartitionSize
146+
merge = partitionKey != RootKey && partition.Count() < fw.index.options.MinPartitionSize
134147
if !split && !merge {
135148
log.VEventf(ctx, 2, "partition %d size is within bounds, do not split or merge", partitionKey)
136149
return nil
@@ -213,14 +226,14 @@ func (fw *fixupWorker) splitPartition(
213226
partitionKey, parentPartitionKey)
214227
}
215228

216-
metadata, err := fw.index.removeFromPartition(
217-
ctx, fw.txn, fw.treeKey, parentPartitionKey, childKey)
229+
err := fw.index.removeFromPartition(ctx, fw.txn, fw.treeKey, parentPartitionKey, childKey)
218230
if err != nil {
219231
return errors.Wrapf(err, "removing splitting partition %d from its parent %d",
220232
partitionKey, parentPartitionKey)
221233
}
222234

223-
if metadata.Count != 0 {
235+
// Only attempt to move vectors if there are siblings.
236+
if parentPartition.Count() > 1 {
224237
// Move any vectors to sibling partitions that have closer centroids.
225238
// Lazily get parent vectors only if they're actually needed.
226239
var parentVectors vector.Set
@@ -474,8 +487,8 @@ func (fw *fixupWorker) moveVectorsToSiblings(
474487
// there instead.
475488
childKey := split.Partition.ChildKeys()[i]
476489
valueBytes := split.Partition.ValueBytes()[i]
477-
err = fw.index.addToPartition(ctx, fw.txn, fw.treeKey,
478-
parentPartitionKey, siblingPartitionKey, vector, childKey, valueBytes)
490+
err = fw.index.addToPartition(
491+
ctx, fw.txn, fw.treeKey, siblingPartitionKey, vector, childKey, valueBytes)
479492
if err != nil {
480493
return errors.Wrapf(err, "moving vector to partition %d", siblingPartitionKey)
481494
}
@@ -504,6 +517,11 @@ func (fw *fixupWorker) linkNearbyVectors(
504517
idxCtx.level = partition.Level()
505518
idxCtx.randomized = partition.Centroid()
506519

520+
// Ensure that the search never returns the last remaining vector in a
521+
// non-leaf partition, in order to avoid moving it and creating an empty
522+
// non-leaf partition, which is not allowed by a balanced K-means tree.
523+
idxCtx.ignoreLonelyVector = partition.Level() != LeafLevel
524+
507525
// Don't link more vectors than the number of remaining slots in the split
508526
// partition, to avoid triggering another split.
509527
maxResults := fw.index.options.MaxPartitionSize - partition.Count()
@@ -542,24 +560,12 @@ func (fw *fixupWorker) linkNearbyVectors(
542560
}
543561

544562
// Remove the vector from the other partition.
545-
metadata, err := fw.index.removeFromPartition(
563+
err = fw.index.removeFromPartition(
546564
ctx, fw.txn, fw.treeKey, result.ParentPartitionKey, result.ChildKey)
547565
if err != nil {
548566
return errors.Wrapf(err, "removing vector from nearby partition %d during split of %d",
549567
result.ParentPartitionKey, oldPartitionKey)
550568
}
551-
if metadata.Count == 0 && partition.Level() > LeafLevel {
552-
// Removing the vector will result in an empty non-leaf partition, which
553-
// is not allowed, as the K-means tree would not be fully balanced. Add
554-
// the vector back to the partition. This is a very rare case and that
555-
// partition is likely to be merged away regardless.
556-
_, err = fw.txn.AddToPartition(
557-
ctx, fw.treeKey, result.ParentPartitionKey, vector, result.ChildKey, result.ValueBytes)
558-
if err != nil {
559-
return errors.Wrapf(err, "adding vector to splitting partition %d", oldPartitionKey)
560-
}
561-
continue
562-
}
563569

564570
// Add the vector to the split partition.
565571
partition.Add(&fw.workspace, vector, result.ChildKey, result.ValueBytes)
@@ -627,7 +633,7 @@ func (fw *fixupWorker) mergePartition(
627633
partitionKey, parentPartitionKey)
628634
return nil
629635
}
630-
_, err = fw.index.removeFromPartition(ctx, fw.txn, fw.treeKey, parentPartitionKey, childKey)
636+
err = fw.index.removeFromPartition(ctx, fw.txn, fw.treeKey, parentPartitionKey, childKey)
631637
if err != nil {
632638
return errors.Wrapf(err, "remove partition %d from parent partition %d",
633639
partitionKey, parentPartitionKey)
@@ -687,7 +693,7 @@ func (fw *fixupWorker) deleteVector(
687693
return nil
688694
}
689695

690-
_, err = fw.index.removeFromPartition(ctx, fw.txn, fw.treeKey, partitionKey, childKey)
696+
err = fw.index.removeFromPartition(ctx, fw.txn, fw.treeKey, partitionKey, childKey)
691697
if errors.Is(err, ErrPartitionNotFound) {
692698
log.VEventf(ctx, 2, "partition %d no longer exists, do not delete vector", partitionKey)
693699
return nil

pkg/sql/vecindex/cspann/index.go

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ type Context struct {
115115
// randomized is the original vector after it has been randomized by applying
116116
// a random orthogonal transformation (ROT).
117117
randomized vector.T
118+
// ignoreLonelyVector, if true, prohibits searches from returning a vector
119+
// that is the last remaining in its partition. This is used to avoid moving
120+
// the last remaining vector to another partition, thereby creating an empty
121+
// non-leaf partition, which is not allowed in a balanced K-means tree.
122+
ignoreLonelyVector bool
118123

119124
tempSearchSet SearchSet
120125
tempSubSearchSet SearchSet
@@ -364,9 +369,8 @@ func (vi *Index) Delete(
364369
}
365370

366371
// Remove the vector from its partition in the store.
367-
_, err = vi.removeFromPartition(
372+
return vi.removeFromPartition(
368373
ctx, idxCtx.txn, treeKey, result.ParentPartitionKey, result.ChildKey)
369-
return err
370374
}
371375

372376
// Search finds vectors in the index that are closest to the given query vector
@@ -404,16 +408,12 @@ func (vi *Index) SearchForInsert(
404408
return nil, err
405409
}
406410

407-
// Now fetch the centroid of the insert partition. This has the side effect
408-
// of checking the size of the partition, in case it's over-sized.
411+
// Now fetch the centroid of the insert partition.
409412
partitionKey := result.ChildKey.PartitionKey
410413
metadata, err := idxCtx.txn.GetPartitionMetadata(ctx, treeKey, partitionKey, true /* forUpdate */)
411414
if err != nil {
412415
return nil, err
413416
}
414-
if metadata.Count > vi.options.MaxPartitionSize {
415-
vi.fixups.AddSplit(ctx, treeKey, result.ParentPartitionKey, partitionKey)
416-
}
417417

418418
result.Vector = metadata.Centroid
419419
return result, nil
@@ -474,18 +474,11 @@ func (vi *Index) ProcessFixups() {
474474
vi.fixups.Process()
475475
}
476476

477-
// ForceSplit enqueues a split fixup. It is used for testing.
478-
func (vi *Index) ForceSplit(
477+
// ForceSplitOrMerge enqueues a split or merge fixup. It is used for testing.
478+
func (vi *Index) ForceSplitOrMerge(
479479
ctx context.Context, treeKey TreeKey, parentPartitionKey PartitionKey, partitionKey PartitionKey,
480480
) {
481-
vi.fixups.AddSplit(ctx, treeKey, parentPartitionKey, partitionKey)
482-
}
483-
484-
// ForceMerge enqueues a merge fixup. It is used for testing.
485-
func (vi *Index) ForceMerge(
486-
ctx context.Context, treeKey TreeKey, parentPartitionKey PartitionKey, partitionKey PartitionKey,
487-
) {
488-
vi.fixups.AddMerge(ctx, treeKey, parentPartitionKey, partitionKey)
481+
vi.fixups.AddSplitOrMergeCheck(ctx, treeKey, parentPartitionKey, partitionKey)
489482
}
490483

491484
// setupInsertContext sets up the given context for an insert operation. Before
@@ -526,10 +519,9 @@ func (vi *Index) insertHelper(
526519
if err != nil {
527520
return err
528521
}
529-
parentPartitionKey := result.ParentPartitionKey
530522
partitionKey := result.ChildKey.PartitionKey
531-
err = vi.addToPartition(ctx, idxCtx.txn, idxCtx.treeKey, parentPartitionKey,
532-
partitionKey, idxCtx.randomized, childKey, valueBytes)
523+
err = vi.addToPartition(
524+
ctx, idxCtx.txn, idxCtx.treeKey, partitionKey, idxCtx.randomized, childKey, valueBytes)
533525
if errors.Is(err, ErrRestartOperation) {
534526
return vi.insertHelper(ctx, idxCtx, childKey, valueBytes)
535527
}
@@ -551,7 +543,11 @@ func (vi *Index) searchForInsertHelper(
551543
return nil, errors.AssertionFailedf(
552544
"SearchForInsert should return exactly one result, got %d", len(results))
553545
}
554-
return &results[0], err
546+
547+
vi.fixups.AddSplitOrMergeCheck(
548+
ctx, idxCtx.treeKey, results[0].ParentPartitionKey, results[0].ChildKey.PartitionKey)
549+
550+
return &results[0], nil
555551
}
556552

557553
// addToPartition calls the store to add the given vector to an existing
@@ -561,36 +557,31 @@ func (vi *Index) addToPartition(
561557
ctx context.Context,
562558
txn Txn,
563559
treeKey TreeKey,
564-
parentPartitionKey PartitionKey,
565560
partitionKey PartitionKey,
566561
vec vector.T,
567562
childKey ChildKey,
568563
valueBytes ValueBytes,
569564
) error {
570-
metadata, err := txn.AddToPartition(ctx, treeKey, partitionKey, vec, childKey, valueBytes)
565+
err := txn.AddToPartition(ctx, treeKey, partitionKey, vec, childKey, valueBytes)
571566
if err != nil {
572567
return errors.Wrapf(err, "adding vector to partition %d", partitionKey)
573568
}
574-
if metadata.Count > vi.options.MaxPartitionSize {
575-
vi.fixups.AddSplit(ctx, treeKey, parentPartitionKey, partitionKey)
576-
}
577569
return vi.stats.OnAddOrRemoveVector(ctx)
578570
}
579571

580572
// removeFromPartition calls the store to remove a vector, by its key, from an
581573
// existing partition.
582574
func (vi *Index) removeFromPartition(
583575
ctx context.Context, txn Txn, treeKey TreeKey, partitionKey PartitionKey, childKey ChildKey,
584-
) (metadata PartitionMetadata, err error) {
585-
metadata, err = txn.RemoveFromPartition(ctx, treeKey, partitionKey, childKey)
576+
) error {
577+
err := txn.RemoveFromPartition(ctx, treeKey, partitionKey, childKey)
586578
if err != nil {
587-
return PartitionMetadata{},
588-
errors.Wrapf(err, "removing vector from partition %d", partitionKey)
579+
return errors.Wrapf(err, "removing vector from partition %d", partitionKey)
589580
}
590-
if err := vi.stats.OnAddOrRemoveVector(ctx); err != nil {
591-
return PartitionMetadata{}, err
581+
if err = vi.stats.OnAddOrRemoveVector(ctx); err != nil {
582+
return err
592583
}
593-
return metadata, nil
584+
return nil
594585
}
595586

596587
// searchHelper contains the core search logic for the K-means tree. It begins
@@ -768,11 +759,19 @@ func (vi *Index) searchChildPartitions(
768759
count := idxCtx.tempCounts[i]
769760
searchSet.Stats.SearchedPartition(level, count)
770761

762+
// If one of the searched partitions has only 1 vector remaining, do not
763+
// return that vector when "ignoreLonelyVector" is true.
764+
if idxCtx.ignoreLonelyVector && idxCtx.level == level && count == 1 {
765+
searchSet.RemoveResults(parentResults[i].ChildKey.PartitionKey)
766+
}
767+
771768
partitionKey := parentResults[i].ChildKey.PartitionKey
772769
if count < vi.options.MinPartitionSize && partitionKey != RootKey {
773-
vi.fixups.AddMerge(ctx, idxCtx.treeKey, parentResults[i].ParentPartitionKey, partitionKey)
770+
vi.fixups.AddSplitOrMergeCheck(
771+
ctx, idxCtx.treeKey, parentResults[i].ParentPartitionKey, partitionKey)
774772
} else if count > vi.options.MaxPartitionSize {
775-
vi.fixups.AddSplit(ctx, idxCtx.treeKey, parentResults[i].ParentPartitionKey, partitionKey)
773+
vi.fixups.AddSplitOrMergeCheck(
774+
ctx, idxCtx.treeKey, parentResults[i].ParentPartitionKey, partitionKey)
776775
}
777776
}
778777

pkg/sql/vecindex/cspann/index_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestIndex(t *testing.T) {
7979
case "delete":
8080
return state.Delete(d)
8181

82-
case "force-split", "force-merge":
82+
case "force-split-or-merge":
8383
return state.ForceSplitOrMerge(d)
8484

8585
case "recall":
@@ -475,10 +475,8 @@ func (s *testState) ForceSplitOrMerge(d *datadriven.TestData) string {
475475
}
476476
}
477477

478-
if d.Cmd == "force-split" {
479-
s.Index.ForceSplit(s.Ctx, treeKey, parentPartitionKey, partitionKey)
480-
} else {
481-
s.Index.ForceMerge(s.Ctx, treeKey, parentPartitionKey, partitionKey)
478+
if d.Cmd == "force-split-or-merge" {
479+
s.Index.ForceSplitOrMerge(s.Ctx, treeKey, parentPartitionKey, partitionKey)
482480
}
483481

484482
// Ensure the fixup runs.

0 commit comments

Comments
 (0)