Skip to content

Commit fbe90de

Browse files
committed
cspann: update C-SPANN index code to tolerate missing partitions
Previously, when index operations like search or insert came across a missing partition, they failed with an error, as this should never be possible. However, with non-transactional splits, it's now acceptable for partitions to be missing. This commit changes the index code to tolerate this condition. It also updates transactional Store methods to support the new semantics. Epic: CRDB-42943 Release note: None
1 parent 56214a3 commit fbe90de

File tree

9 files changed

+111
-110
lines changed

9 files changed

+111
-110
lines changed

pkg/sql/vecindex/cspann/commontest/storetests.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (suite *StoreTestSuite) TestRunTransaction() {
111111
suite.ErrorContains(store.RunTransaction(suite.ctx, func(tx cspann.Txn) error {
112112
toSearch := []cspann.PartitionToSearch{{Key: cspann.RootKey}}
113113
searchSet := cspann.SearchSet{MaxResults: 1}
114-
_, err := tx.SearchPartitions(suite.ctx, treeKey, toSearch, vector.T{1, -1}, &searchSet)
114+
err := tx.SearchPartitions(suite.ctx, treeKey, toSearch, vector.T{1, -1}, &searchSet)
115115
suite.NoError(err)
116116
suite.Equal(1, toSearch[0].Count)
117117
return errors.New("abort")
@@ -260,16 +260,15 @@ func (suite *StoreTestSuite) TestNonRootPartition() {
260260
// Search partition.
261261
toSearch := []cspann.PartitionToSearch{{Key: partitionKey}}
262262
searchSet := cspann.SearchSet{MaxResults: 1}
263-
searchLevel, err := txn.SearchPartitions(
264-
suite.ctx, treeKey, toSearch, vector.T{5, -1}, &searchSet)
263+
err = txn.SearchPartitions(suite.ctx, treeKey, toSearch, vector.T{5, -1}, &searchSet)
265264
suite.NoError(err)
266-
suite.Equal(cspann.SecondLevel, searchLevel)
267265
result1 := cspann.SearchResult{
268266
QuerySquaredDistance: 17, ErrorBound: 0, CentroidDistance: 0,
269267
ParentPartitionKey: partitionKey, ChildKey: partitionKey3, ValueBytes: valueBytes3}
270268
results := searchSet.PopResults()
271269
RoundResults(results, 4)
272270
suite.Equal(cspann.SearchResults{result1}, results)
271+
suite.Equal(cspann.SecondLevel, toSearch[0].Level)
273272
suite.Equal(2, toSearch[0].Count)
274273
})
275274
}
@@ -308,17 +307,18 @@ func (suite *StoreTestSuite) TestSearchMultiplePartitions() {
308307

309308
searchSet := cspann.SearchSet{MaxResults: 2}
310309
toSearch := []cspann.PartitionToSearch{{Key: cspann.RootKey}, {Key: partitionKey}}
311-
level, err := txn.SearchPartitions(suite.ctx, treeKey, toSearch, vec4, &searchSet)
310+
err = txn.SearchPartitions(suite.ctx, treeKey, toSearch, vec4, &searchSet)
312311
suite.NoError(err)
313-
suite.Equal(cspann.LeafLevel, level)
314312
result1 := cspann.SearchResult{
315313
QuerySquaredDistance: 24, ErrorBound: 24.08, CentroidDistance: 3.16,
316314
ParentPartitionKey: partitionKey, ChildKey: primaryKey1, ValueBytes: valueBytes1}
317315
result2 := cspann.SearchResult{
318316
QuerySquaredDistance: 29, ErrorBound: 0, CentroidDistance: 5,
319317
ParentPartitionKey: cspann.RootKey, ChildKey: primaryKey3, ValueBytes: valueBytes3}
320318
suite.Equal(cspann.SearchResults{result1, result2}, RoundResults(searchSet.PopResults(), 2))
319+
suite.Equal(cspann.LeafLevel, toSearch[0].Level)
321320
suite.Equal(3, toSearch[0].Count)
321+
suite.Equal(cspann.LeafLevel, toSearch[1].Level)
322322
suite.Equal(2, toSearch[1].Count)
323323
})
324324
}
@@ -602,6 +602,7 @@ func (suite *StoreTestSuite) TestGetFullVectors() {
602602
{Key: cspann.ChildKey{KeyBytes: cspann.KeyBytes{0}}},
603603
{Key: cspann.ChildKey{PartitionKey: partitionKey}},
604604
{Key: cspann.ChildKey{KeyBytes: key3}},
605+
{Key: cspann.ChildKey{PartitionKey: cspann.PartitionKey(99)}}, // No such partition.
605606
}
606607
err := txn.GetFullVectors(suite.ctx, treeKey, results)
607608
suite.NoError(err)
@@ -612,6 +613,7 @@ func (suite *StoreTestSuite) TestGetFullVectors() {
612613
suite.Nil(results[4].Vector)
613614
suite.Equal(vector.T{4, 3}, results[5].Vector)
614615
suite.Equal(vec3, results[6].Vector)
616+
suite.Nil(results[7].Vector)
615617

616618
// Grab another set of vectors to ensure that saved state is properly reset.
617619
results = []cspann.VectorWithKey{
@@ -1228,10 +1230,10 @@ func (suite *StoreTestSuite) testEmptyOrMissingRoot(store TestStore, treeID int,
12281230
RunTransaction(suite.ctx, suite.T(), store, func(txn cspann.Txn) {
12291231
searchSet := cspann.SearchSet{MaxResults: 2}
12301232
toSearch := []cspann.PartitionToSearch{{Key: cspann.RootKey}}
1231-
level, err := txn.SearchPartitions(suite.ctx, treeKey, toSearch, vector.T{1, 1}, &searchSet)
1233+
err := txn.SearchPartitions(suite.ctx, treeKey, toSearch, vector.T{1, 1}, &searchSet)
12321234
suite.NoError(err)
1233-
suite.Equal(cspann.LeafLevel, level)
12341235
suite.Nil(searchSet.PopResults())
1236+
suite.Equal(cspann.LeafLevel, toSearch[0].Level)
12351237
suite.Equal(0, toSearch[0].Count)
12361238
})
12371239
})
@@ -1345,10 +1347,8 @@ func (suite *StoreTestSuite) testLeafPartition(
13451347
// Search partition.
13461348
searchSet := cspann.SearchSet{MaxResults: 2}
13471349
toSearch := []cspann.PartitionToSearch{{Key: partitionKey}}
1348-
searchLevel, err := txn.SearchPartitions(
1349-
suite.ctx, treeKey, toSearch, vector.T{1, 1}, &searchSet)
1350+
err = txn.SearchPartitions(suite.ctx, treeKey, toSearch, vector.T{1, 1}, &searchSet)
13501351
suite.NoError(err)
1351-
suite.Equal(cspann.LeafLevel, searchLevel)
13521352
result1 := cspann.SearchResult{
13531353
QuerySquaredDistance: 1, ErrorBound: 0,
13541354
CentroidDistance: testutils.RoundFloat(num32.L2Distance(vec1, centroid), 4),
@@ -1367,6 +1367,7 @@ func (suite *StoreTestSuite) testLeafPartition(
13671367
results := searchSet.PopResults()
13681368
RoundResults(results, 4)
13691369
suite.Equal(cspann.SearchResults{result1, result2}, results)
1370+
suite.Equal(cspann.LeafLevel, toSearch[0].Level)
13701371
suite.Equal(3, toSearch[0].Count)
13711372

13721373
// Ensure partition metadata is updated.
@@ -1407,10 +1408,8 @@ func (suite *StoreTestSuite) testLeafPartition(
14071408
// Search partition.
14081409
searchSet := cspann.SearchSet{MaxResults: 1}
14091410
toSearch := []cspann.PartitionToSearch{{Key: partitionKey}}
1410-
searchLevel, err := txn.SearchPartitions(
1411-
suite.ctx, treeKey, toSearch, vector.T{10, -5}, &searchSet)
1411+
err = txn.SearchPartitions(suite.ctx, treeKey, toSearch, vector.T{10, -5}, &searchSet)
14121412
suite.NoError(err)
1413-
suite.Equal(cspann.LeafLevel, searchLevel)
14141413
result1 := cspann.SearchResult{
14151414
QuerySquaredDistance: 25, ErrorBound: 0,
14161415
CentroidDistance: testutils.RoundFloat(num32.L2Distance(vec4, centroid), 4),
@@ -1423,6 +1422,7 @@ func (suite *StoreTestSuite) testLeafPartition(
14231422
results := searchSet.PopResults()
14241423
RoundResults(results, 4)
14251424
suite.Equal(cspann.SearchResults{result1}, results)
1425+
suite.Equal(cspann.LeafLevel, toSearch[0].Level)
14261426
suite.Equal(3, toSearch[0].Count)
14271427
})
14281428
})
@@ -1470,16 +1470,15 @@ func (suite *StoreTestSuite) setRootPartition(store TestStore, treeID int) {
14701470
// Search partition.
14711471
searchSet := cspann.SearchSet{MaxResults: 1}
14721472
toSearch := []cspann.PartitionToSearch{{Key: cspann.RootKey}}
1473-
searchLevel, err := txn.SearchPartitions(
1474-
suite.ctx, treeKey, toSearch, vector.T{5, 5}, &searchSet)
1473+
err = txn.SearchPartitions(suite.ctx, treeKey, toSearch, vector.T{5, 5}, &searchSet)
14751474
suite.NoError(err)
1476-
suite.Equal(cspann.SecondLevel, searchLevel)
14771475
result1 := cspann.SearchResult{
14781476
QuerySquaredDistance: 5, ErrorBound: 0, CentroidDistance: 3.1623,
14791477
ParentPartitionKey: cspann.RootKey, ChildKey: partitionKey2, ValueBytes: valueBytes2}
14801478
results := searchSet.PopResults()
14811479
RoundResults(results, 4)
14821480
suite.Equal(cspann.SearchResults{result1}, results)
1481+
suite.Equal(cspann.SecondLevel, toSearch[0].Level)
14831482
suite.Equal(2, toSearch[0].Count)
14841483
})
14851484
}

pkg/sql/vecindex/cspann/fixup_worker.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,10 @@ func (fw *fixupWorker) deleteVector(
758758
// Root partition's level has been updated, so just abort.
759759
return nil
760760
} else if err != nil {
761+
if errors.Is(err, ErrPartitionNotFound) {
762+
log.VEventf(ctx, 2, "partition %d no longer exists, do not delete vector", partitionKey)
763+
return nil
764+
}
761765
return errors.Wrapf(err, "getting root partition's level")
762766
}
763767
}

pkg/sql/vecindex/cspann/index.go

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,21 @@ func (vi *Index) searchForInsertHelper(
605605
var seen map[PartitionKey]struct{}
606606
maxResults := 2
607607
allowRetry := true
608+
609+
// Check the next partition in the result list. If there are no more
610+
// partitions to check, then expand the search to a wider set of partitions
611+
// by increasing MaxResults.
612+
checkNextPartition := func(partitionKey PartitionKey) {
613+
results = results[1:]
614+
if len(results) == 0 {
615+
if seen == nil {
616+
seen = make(map[PartitionKey]struct{})
617+
}
618+
seen[partitionKey] = struct{}{}
619+
maxResults *= 2
620+
}
621+
}
622+
608623
for {
609624
// If there are no more results, get more now.
610625
if len(results) == 0 {
@@ -626,8 +641,10 @@ func (vi *Index) searchForInsertHelper(
626641
return nil, err
627642
}
628643
original = idxCtx.tempSearchSet.PopResults()
629-
if len(original) < 1 {
630-
return nil, errors.AssertionFailedf("SearchForInsert should return at least one result")
644+
if len(original) == 0 {
645+
// No results, so try again with more results at each search level.
646+
maxResults *= 2
647+
continue
631648
}
632649
results = original
633650
allowRetry = false
@@ -669,18 +686,12 @@ func (vi *Index) searchForInsertHelper(
669686
return nil, err
670687
}
671688
} else {
672-
// This is a non-root partition, so check the next partition in the
673-
// result list. If there are no more partitions to check, then expand
674-
// the search to a wider set of partitions by increasing MaxResults.
675-
results = results[1:]
676-
if len(results) == 0 {
677-
if seen == nil {
678-
seen = make(map[PartitionKey]struct{})
679-
}
680-
seen[partitionKey] = struct{}{}
681-
maxResults *= 2
682-
}
689+
// This is a non-root partition, so try next partition.
690+
checkNextPartition(partitionKey)
683691
}
692+
} else if errors.Is(err, ErrPartitionNotFound) {
693+
// This partition does not exist, so try next partition.
694+
checkNextPartition(partitionKey)
684695
} else {
685696
return nil, err
686697
}
@@ -740,19 +751,19 @@ func (vi *Index) fallbackOnTargets(
740751
"fetching centroids for target partitions %d and %d, for splitting partition %d",
741752
state.Target1, state.Target2, partitionKey)
742753
}
743-
if len(tempResults) != 2 {
744-
return nil, errors.AssertionFailedf(
745-
"expected to get two centroids for state %s", state.String())
746-
}
747754

748-
// Order by the distance of the centroids from the query vector.
749-
dist1 := num32.L2SquaredDistance(vec, tempResults[0].Vector)
750-
dist2 := num32.L2SquaredDistance(vec, tempResults[1].Vector)
755+
// Calculate the distance of the query vector to the centroids.
756+
for i := range tempResults {
757+
tempResults[i].QuerySquaredDistance = num32.L2SquaredDistance(vec, tempResults[i].Vector)
758+
}
751759

752-
tempResults[0].QuerySquaredDistance = dist1
753-
tempResults[1].QuerySquaredDistance = dist2
760+
if len(tempResults) < 2 {
761+
// One or both of the target partitions have been deleted.
762+
return tempResults, nil
763+
}
754764

755-
if dist1 > dist2 {
765+
// Order by the distance of the centroids from the query vector.
766+
if tempResults[0].QuerySquaredDistance > tempResults[1].QuerySquaredDistance {
756767
// Swap results.
757768
tempResult := tempResults[0]
758769
tempResults[0] = tempResults[1]
@@ -848,12 +859,6 @@ func (vi *Index) searchHelper(ctx context.Context, idxCtx *Context, searchSet *S
848859

849860
for {
850861
results := subSearchSet.PopResults()
851-
if len(results) == 0 && searchLevel > LeafLevel {
852-
// This should never happen, as it means that interior partition(s)
853-
// have no children. The vector deletion logic should prevent that.
854-
panic(errors.AssertionFailedf(
855-
"interior partition(s) on level %d has no children", searchLevel))
856-
}
857862

858863
var zscore float64
859864
if searchLevel > LeafLevel {
@@ -974,13 +979,17 @@ func (vi *Index) searchChildPartitions(
974979
}
975980
}
976981

977-
level, err = idxCtx.txn.SearchPartitions(
982+
err = idxCtx.txn.SearchPartitions(
978983
ctx, idxCtx.treeKey, idxCtx.tempToSearch, idxCtx.randomized, searchSet)
979984
if err != nil {
980985
return 0, err
981986
}
982987

983988
for i := range parentResults {
989+
if level == InvalidLevel {
990+
level = idxCtx.tempToSearch[i].Level
991+
}
992+
984993
count := idxCtx.tempToSearch[i].Count
985994
searchSet.Stats.SearchedPartition(level, count)
986995

@@ -1130,11 +1139,15 @@ func (vi *Index) getFullVectors(
11301139
for i < len(candidates) {
11311140
candidates[i].Vector = idxCtx.tempVectorsWithKeys[i].Vector
11321141

1133-
// Exclude deleted vectors from results.
1142+
// Exclude deleted child keys from results.
11341143
if candidates[i].Vector == nil {
1135-
// Vector was deleted, so add fixup to delete it.
1136-
vi.fixups.AddDeleteVector(
1137-
ctx, candidates[i].ParentPartitionKey, candidates[i].ChildKey.KeyBytes)
1144+
// TODO(andyk): Need to create an DeletePartitionKey fixup to handle
1145+
// the case of a dangling partition key.
1146+
if candidates[i].ChildKey.KeyBytes != nil {
1147+
// Vector was deleted, so add fixup to delete it.
1148+
vi.fixups.AddDeleteVector(
1149+
ctx, candidates[i].ParentPartitionKey, candidates[i].ChildKey.KeyBytes)
1150+
}
11381151

11391152
// Move the last candidate to the current position and reduce size
11401153
// of slice by one.

pkg/sql/vecindex/cspann/memstore/memstore_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func TestInMemoryStoreConcurrency(t *testing.T) {
129129
commontest.RunTransaction(ctx, t, store, func(txn2 cspann.Txn) {
130130
searchSet := cspann.SearchSet{MaxResults: 1}
131131
toSearch := []cspann.PartitionToSearch{{Key: cspann.RootKey}}
132-
_, err := txn2.SearchPartitions(ctx, treeKey, toSearch, vector.T{0, 0}, &searchSet)
132+
err := txn2.SearchPartitions(ctx, treeKey, toSearch, vector.T{0, 0}, &searchSet)
133133
require.NoError(t, err)
134134
result1 := cspann.SearchResult{
135135
QuerySquaredDistance: 25, ErrorBound: 0, CentroidDistance: 5,

pkg/sql/vecindex/cspann/memstore/memstore_txn.go

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -231,19 +231,21 @@ func (tx *memTxn) SearchPartitions(
231231
toSearch []cspann.PartitionToSearch,
232232
queryVector vector.T,
233233
searchSet *cspann.SearchSet,
234-
) (level cspann.Level, err error) {
234+
) error {
235235
for i := range toSearch {
236-
var searchLevel cspann.Level
237-
238236
memPart, ok := tx.store.getPartition(treeKey, toSearch[i].Key)
239237
if !ok {
240238
if toSearch[i].Key == cspann.RootKey {
241239
// Root partition has not yet been created, so it must be empty.
242-
searchLevel = cspann.LeafLevel
240+
toSearch[i].Level = cspann.LeafLevel
243241
toSearch[i].StateDetails = cspann.MakeReadyDetails()
244242
toSearch[i].Count = 0
245243
} else {
246-
return cspann.InvalidLevel, cspann.ErrPartitionNotFound
244+
// Partition does not exist, so return InvalidLevel, MissingState
245+
// and Count=0.
246+
toSearch[i].Level = cspann.InvalidLevel
247+
toSearch[i].StateDetails = cspann.PartitionStateDetails{}
248+
toSearch[i].Count = 0
247249
}
248250
} else {
249251
// Acquire shared lock on partition and search it. Note that we don't
@@ -255,23 +257,14 @@ func (tx *memTxn) SearchPartitions(
255257
defer memPart.lock.ReleaseShared()
256258

257259
partition := memPart.lock.partition
260+
toSearch[i].Level = partition.Level()
258261
toSearch[i].StateDetails = partition.Metadata().StateDetails
259-
searchLevel, toSearch[i].Count = partition.Search(
260-
&tx.workspace, toSearch[i].Key, queryVector, searchSet)
262+
toSearch[i].Count = partition.Search(&tx.workspace, toSearch[i].Key, queryVector, searchSet)
261263
}()
262264
}
263-
264-
if i == 0 {
265-
level = searchLevel
266-
} else if level != searchLevel {
267-
// Callers should only search for partitions at the same level.
268-
panic(errors.AssertionFailedf(
269-
"caller already searched a partition at level %d, cannot search at level %d",
270-
level, searchLevel))
271-
}
272265
}
273266

274-
return level, nil
267+
return nil
275268
}
276269

277270
// GetFullVectors implements the Txn interface.
@@ -286,14 +279,13 @@ func (tx *memTxn) GetFullVectors(
286279
if ref.Key.PartitionKey != cspann.InvalidKey {
287280
// Get the partition's centroid.
288281
memPart, ok := tx.store.getPartitionLocked(treeKey, ref.Key.PartitionKey)
289-
if !ok {
290-
return errors.Wrapf(cspann.ErrPartitionNotFound,
291-
"getting partition %d centroid", ref.Key.PartitionKey)
282+
if ok {
283+
// Don't need to acquire lock to call the Centroid method, since it
284+
// is immutable and thread-safe.
285+
ref.Vector = memPart.lock.partition.Centroid()
286+
} else {
287+
ref.Vector = nil
292288
}
293-
294-
// Don't need to acquire lock to call the Centroid method, since it
295-
// is immutable and thread-safe.
296-
ref.Vector = memPart.lock.partition.Centroid()
297289
} else {
298290
vector, ok := tx.store.mu.vectors[string(refs[i].Key.KeyBytes)]
299291
if ok {

pkg/sql/vecindex/cspann/partition.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,12 @@ func (p *Partition) ValueBytes() []ValueBytes {
130130
}
131131

132132
// Search estimates the set of data vectors that are nearest to the given query
133-
// vector and returns them in the given search set. Search also returns this
134-
// partition's level in the K-means tree and the count of quantized vectors in
135-
// the partition.
133+
// vector and returns them in the given search set. Search also returns the
134+
// count of quantized vectors in the partition.
136135
func (p *Partition) Search(
137136
w *workspace.T, partitionKey PartitionKey, queryVector vector.T, searchSet *SearchSet,
138-
) (level Level, count int) {
139-
count = p.Count()
137+
) int {
138+
count := p.Count()
140139
tempFloats := w.AllocFloats(count * 2)
141140
defer w.FreeFloats(tempFloats)
142141

@@ -161,7 +160,7 @@ func (p *Partition) Search(
161160
searchSet.Add(&searchSet.tempResult)
162161
}
163162

164-
return p.Level(), count
163+
return count
165164
}
166165

167166
// Add quantizes the given vector as part of this partition. If a vector with

0 commit comments

Comments
 (0)