Skip to content

Commit b6250fd

Browse files
committed
cspann: make vector delete work during incremental splits
The new split code performs splits of vector index K-means trees using an incremental series of non-transactional steps. This commit ensures that any deletes from the vector index work as expected during all of these steps. In particular, in the DrainingForSplit phase, a partition does not allow vectors to be deleted. Instead, the vector needs to be deleted from one of the target partitions of the split (i.e. the two partitions between which the vectors are divided). Epic: CRDB-42943 Release note: None
1 parent 1128917 commit b6250fd

File tree

10 files changed

+344
-16
lines changed

10 files changed

+344
-16
lines changed

pkg/sql/vecindex/cspann/commontest/storetests.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,6 +1094,64 @@ func (suite *StoreTestSuite) TestTryRemoveFromPartition() {
10941094
}
10951095
}
10961096

1097+
func (suite *StoreTestSuite) TestTryClearPartition() {
1098+
store := suite.makeStore(suite.quantizer)
1099+
if !store.SupportsTry() {
1100+
return
1101+
}
1102+
1103+
doTest := func(treeID int) {
1104+
treeKey := store.MakeTreeKey(suite.T(), treeID)
1105+
1106+
// Partition does not yet exist.
1107+
_, err := store.TryClearPartition(suite.ctx, treeKey, cspann.PartitionKey(99),
1108+
cspann.PartitionMetadata{})
1109+
suite.ErrorIs(err, cspann.ErrPartitionNotFound)
1110+
1111+
// Create partition with some vectors.
1112+
partitionKey, partition := suite.createTestPartition(store, treeKey)
1113+
1114+
// Now clear should work.
1115+
expected := *partition.Metadata()
1116+
count, err := store.TryClearPartition(suite.ctx, treeKey, partitionKey, expected)
1117+
suite.NoError(err)
1118+
suite.Equal(3, count)
1119+
1120+
// Fetch back the partition and validate it.
1121+
partition, err = store.TryGetPartition(suite.ctx, treeKey, partitionKey)
1122+
suite.NoError(err)
1123+
suite.True(partition.Metadata().Equal(&expected))
1124+
suite.Equal(cspann.SecondLevel, partition.Level())
1125+
suite.Equal(vector.T{4, 3}, partition.Centroid())
1126+
suite.Equal([]cspann.ChildKey{}, partition.ChildKeys())
1127+
suite.Equal([]cspann.ValueBytes{}, partition.ValueBytes())
1128+
1129+
// Try to clear with mismatched expected metadata.
1130+
var errConditionFailed *cspann.ConditionFailedError
1131+
metadata := expected
1132+
metadata.StateDetails.State = cspann.DrainingForMergeState
1133+
_, err = store.TryClearPartition(suite.ctx, treeKey, partitionKey, metadata)
1134+
suite.ErrorAs(err, &errConditionFailed)
1135+
suite.True(errConditionFailed.Actual.Equal(&expected))
1136+
1137+
// Try again, this time with correct expected metadata.
1138+
count, err = store.TryClearPartition(suite.ctx, treeKey, partitionKey, expected)
1139+
suite.NoError(err)
1140+
suite.Equal(0, count)
1141+
}
1142+
1143+
suite.Run("default tree", func() {
1144+
doTest(0)
1145+
})
1146+
1147+
if store.AllowMultipleTrees() {
1148+
// Ensure that vectors are independent across trees.
1149+
suite.Run("different tree", func() {
1150+
doTest(1)
1151+
})
1152+
}
1153+
}
1154+
10971155
func (suite *StoreTestSuite) runInTransaction(store TestStore, fn func(tx cspann.Txn)) {
10981156
suite.NoError(store.RunTransaction(suite.ctx, func(tx cspann.Txn) error {
10991157
fn(tx)

pkg/sql/vecindex/cspann/fixup_split.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func (fw *fixupWorker) splitPartition(
245245
// This is the root partition, so remove all of its vectors rather than
246246
// delete the root partition itself. Note that the vectors have already
247247
// been copied to the two target partitions.
248-
err = fw.clearPartition(ctx, partitionKey, partition)
248+
err = fw.clearPartition(ctx, partitionKey, *partition.Metadata())
249249
if err != nil {
250250
return err
251251
}
@@ -374,29 +374,28 @@ func (fw *fixupWorker) addToPartition(
374374
return nil
375375
}
376376

377-
// clearPartition removes all vectors and associated data from the given
378-
// partition, leaving it empty, on the condition that the partition's state has
379-
// not changed unexpectedly. If that's the case, it returns errFixupAborted.
377+
// clearPartition removes all vectors from the given partition. This only
378+
// happens if the partition's metadata has not changed. If it has changed,
379+
// clearPartition returns errFixupAborted.
380380
func (fw *fixupWorker) clearPartition(
381-
ctx context.Context, partitionKey PartitionKey, partition *Partition,
381+
ctx context.Context, partitionKey PartitionKey, metadata PartitionMetadata,
382382
) (err error) {
383-
if partition.Metadata().StateDetails.State.AllowAddOrRemove() {
383+
if metadata.StateDetails.State.AllowAddOrRemove() {
384384
return errors.AssertionFailedf("cannot clear partition in state that allows adds/removes")
385385
}
386386

387387
// Remove all children in the partition.
388-
removed, err := fw.index.store.TryRemoveFromPartition(ctx, fw.treeKey,
389-
partitionKey, partition.ChildKeys(), *partition.Metadata())
388+
count, err := fw.index.store.TryClearPartition(ctx, fw.treeKey, partitionKey, metadata)
390389
if err != nil {
391390
metadata, err := suppressRaceErrors(err)
392391
if err == nil {
393392
// Another worker raced to update the metadata, so abort.
394393
return errors.Wrapf(errFixupAborted,
395-
"clearing % vectors from partition, %d expected %s, found %s", partition.Count(),
394+
"clearing vectors from partition %d, expected %s, found %s",
396395
partitionKey, metadata.StateDetails.String(), metadata.StateDetails.String())
397396
}
398397
return errors.Wrap(err, "clearing vectors")
399-
} else if fw.singleStep && removed {
398+
} else if fw.singleStep && count > 0 {
400399
return errFixupAborted
401400
}
402401

@@ -549,7 +548,7 @@ func (fw *fixupWorker) addToParentPartition(
549548
func (fw *fixupWorker) deletePartition(
550549
ctx context.Context, parentPartitionKey, partitionKey PartitionKey,
551550
) (err error) {
552-
const format = "deleting partition %d, with parent partition %d (state=%d)"
551+
const format = "deleting partition %d (parent=%d, state=%d)"
553552
var parentMetadata PartitionMetadata
554553

555554
defer func() {

pkg/sql/vecindex/cspann/index.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ type Context struct {
131131
// forInsert indicates that this is an insert operation (or a search for
132132
// insert operation).
133133
forInsert bool
134+
// forDelete indicates that this is a delete operation (or a search for
135+
// delete operation).
136+
forDelete bool
134137

135138
tempSearchSet SearchSet
136139
tempSubSearchSet SearchSet
@@ -449,14 +452,15 @@ func (vi *Index) SearchForDelete(
449452
SkipRerank: true,
450453
UpdateStats: true,
451454
}, LeafLevel)
455+
idxCtx.forDelete = true
452456

453457
idxCtx.tempSearchSet = SearchSet{MaxResults: 1, MatchKey: key}
454458

455459
// Search with the base beam size. If that fails to find the vector, try again
456460
// with a larger beam size, in order to minimize the chance of dangling
457461
// vector references in the index.
458462
baseBeamSize := max(vi.options.BaseBeamSize, 1)
459-
for i := 0; i < 2; i++ {
463+
for range 2 {
460464
idxCtx.options.BaseBeamSize = baseBeamSize
461465

462466
err := vi.searchHelper(ctx, idxCtx, &idxCtx.tempSearchSet)
@@ -792,7 +796,7 @@ func (vi *Index) searchHelper(ctx context.Context, idxCtx *Context, searchSet *S
792796
// Compute the Z-score of the candidate list if there are enough
793797
// samples. Otherwise, use the default Z-score of 0.
794798
if len(results) >= vi.options.QualitySamples {
795-
for i := 0; i < vi.options.QualitySamples; i++ {
799+
for i := range vi.options.QualitySamples {
796800
idxCtx.tempQualitySamples[i] = float64(results[i].QuerySquaredDistance)
797801
}
798802
samples := idxCtx.tempQualitySamples[:vi.options.QualitySamples]
@@ -923,6 +927,14 @@ func (vi *Index) searchChildPartitions(
923927
searchSet.RemoveByParent(partitionKey)
924928
}
925929

930+
// If searching for vector to delete, skip partitions that are in a state
931+
// that does not allow add and remove operations.
932+
if idxCtx.forDelete && idxCtx.level == level {
933+
if !idxCtx.tempToSearch[i].StateDetails.State.AllowAddOrRemove() {
934+
searchSet.RemoveByParent(partitionKey)
935+
}
936+
}
937+
926938
// Enqueue background fixup if a split or merge operation needs to be
927939
// started or continued after stalling.
928940
state := idxCtx.tempToSearch[i].StateDetails
@@ -1050,7 +1062,8 @@ func (vi *Index) getFullVectors(
10501062
return nil, err
10511063
}
10521064

1053-
for i := 0; i < len(candidates); i++ {
1065+
i := 0
1066+
for i < len(candidates) {
10541067
candidates[i].Vector = idxCtx.tempVectorsWithKeys[i].Vector
10551068

10561069
// Exclude deleted vectors from results.
@@ -1065,7 +1078,8 @@ func (vi *Index) getFullVectors(
10651078
candidates[i] = candidates[len(candidates)-1]
10661079
candidates[len(candidates)-1] = SearchResult{} // for GC
10671080
candidates = candidates[:len(candidates)-1]
1068-
i--
1081+
} else {
1082+
i++
10691083
}
10701084
}
10711085

pkg/sql/vecindex/cspann/memstore/memstore.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,34 @@ func (s *Store) TryRemoveFromPartition(
544544
return removed, err
545545
}
546546

547+
// TryClearPartition implements the Store interface.
548+
func (s *Store) TryClearPartition(
549+
ctx context.Context,
550+
treeKey cspann.TreeKey,
551+
partitionKey cspann.PartitionKey,
552+
expected cspann.PartitionMetadata,
553+
) (count int, err error) {
554+
memPart := s.lockPartition(treeKey, partitionKey, uniqueOwner, true /* isExclusive */)
555+
if memPart == nil {
556+
// Partition does not exist.
557+
return -1, cspann.ErrPartitionNotFound
558+
}
559+
defer memPart.lock.Release()
560+
561+
// Check precondition.
562+
partition := memPart.lock.partition
563+
existing := partition.Metadata()
564+
if !existing.Equal(&expected) {
565+
return -1, cspann.NewConditionFailedError(*existing)
566+
}
567+
568+
// Remove vectors from the partition and update partition count.
569+
count = partition.Clear()
570+
memPart.count.Store(0)
571+
572+
return count, nil
573+
}
574+
547575
// EnsureUniquePartitionKey checks that the given partition key is not being
548576
// used yet and also ensures it won't be given out in the future. This is used
549577
// for testing.

pkg/sql/vecindex/cspann/partition.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,18 @@ func (p *Partition) Find(childKey ChildKey) int {
253253
return -1
254254
}
255255

256+
// Clear removes all vectors from the partition and returns the number of
257+
// vectors that were cleared. The centroid stays the same.
258+
func (p *Partition) Clear() int {
259+
count := len(p.childKeys)
260+
p.quantizedSet.Clear(p.quantizedSet.GetCentroid())
261+
clear(p.childKeys)
262+
p.childKeys = p.childKeys[:0]
263+
clear(p.valueBytes)
264+
p.valueBytes = p.valueBytes[:0]
265+
return count
266+
}
267+
256268
// CreateEmptyPartition returns an empty partition for the given quantizer and
257269
// level.
258270
func CreateEmptyPartition(quantizer quantize.Quantizer, metadata PartitionMetadata) *Partition {

pkg/sql/vecindex/cspann/partition_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,22 @@ func TestPartition(t *testing.T) {
237237
require.Equal(t, -1, partition.Find(childKey40))
238238
require.False(t, partition.ReplaceWithLastByKey(childKey40))
239239
})
240+
241+
t.Run("test Clear", func(t *testing.T) {
242+
partition := newTestPartition()
243+
require.Equal(t, 3, partition.Clear())
244+
require.Equal(t, 0, partition.Count())
245+
require.Equal(t, []ChildKey{}, partition.ChildKeys())
246+
require.Equal(t, []ValueBytes{}, partition.ValueBytes())
247+
require.Equal(t, []float32{4, 3.33}, testutils.RoundFloats(partition.Centroid(), 2))
248+
249+
// Clear empty partition.
250+
require.Equal(t, 0, partition.Clear())
251+
require.Equal(t, 0, partition.Count())
252+
require.Equal(t, []ChildKey{}, partition.ChildKeys())
253+
require.Equal(t, []ValueBytes{}, partition.ValueBytes())
254+
require.Equal(t, []float32{4, 3.33}, testutils.RoundFloats(partition.Centroid(), 2))
255+
})
240256
}
241257

242258
func roundResults(results SearchResults, prec int) SearchResults {

pkg/sql/vecindex/cspann/store.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,18 @@ type Store interface {
163163
childKeys []ChildKey,
164164
expected PartitionMetadata,
165165
) (removed bool, err error)
166+
167+
// TryClearPartition removes all vectors in the specified partition and
168+
// returns the number of vectors that were cleared. It returns
169+
// ErrPartitionNotFound if the partition does not exist.
170+
//
171+
// Before performing any action, TryClearPartition checks the partition's
172+
// metadata and returns a ConditionFailedError if it is not the same as the
173+
// expected metadata. If the partition does not exist, it returns
174+
// ErrPartitionNotFound.
175+
TryClearPartition(
176+
ctx context.Context, treeKey TreeKey, partitionKey PartitionKey, expected PartitionMetadata,
177+
) (count int, err error)
166178
}
167179

168180
// Txn enables callers to make changes to the stored index in a transactional

0 commit comments

Comments
 (0)