Skip to content

Commit dfd37a7

Browse files
committed
vecindex: loosen RemoveFromPartition semantics
Previously, vectors could not be removed from partitions that were in a draining or deleting state. While this restriction is needed for adding vectors to prevent "lost update" anomalies, it's not needed for removing vectors. A future PR will be able to avoid extra work if this restriction is loosened. Even though it's OK to remove vectors from partitions in any state, that doesn't mean it's *useful* to do so. For example, when deleting a vector from the index, there's no point in deleting it from a partition that is going to be cleared or deleted anyway. Distinguish between these various cases by splitting AllowAddOrRemove into AllowAdd, AllowRemove, and CanSkipRemove. Epic: CRDB-42943 Release note: None
1 parent 67cb06a commit dfd37a7

File tree

12 files changed

+121
-101
lines changed

12 files changed

+121
-101
lines changed

pkg/sql/vecindex/cspann/commontest/storetests.go

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,10 @@ func (suite *StoreTestSuite) TestGetPartitionMetadata() {
137137
CheckPartitionMetadata(suite.T(), metadata, cspann.LeafLevel, vector.T{0, 0},
138138
cspann.PartitionStateDetails{State: cspann.ReadyState})
139139

140-
// Non-root partition does not yet exist, expect error.
141-
_, err = txn.GetPartitionMetadata(
140+
// Non-root partition does not yet exist, expect Missing metadata.
141+
metadata, err = txn.GetPartitionMetadata(
142142
suite.ctx, treeKey, cspann.PartitionKey(99), false /* forUpdate */)
143-
suite.ErrorIs(err, cspann.ErrPartitionNotFound)
143+
suite.Equal(cspann.PartitionMetadata{}, metadata)
144144
})
145145

146146
// Create non-root partition with some vectors in it.
@@ -160,21 +160,19 @@ func (suite *StoreTestSuite) TestGetPartitionMetadata() {
160160
suite.NoError(store.TryUpdatePartitionMetadata(
161161
suite.ctx, treeKey, partitionKey, metadata, expected))
162162

163+
// Ensure latest metadata gets returned.
163164
RunTransaction(suite.ctx, suite.T(), store, func(txn cspann.Txn) {
164-
// If forUpdate = false, GetPartitionMetadata should not error.
165165
metadata, err := txn.GetPartitionMetadata(
166166
suite.ctx, treeKey, partitionKey, false /* forUpdate */)
167167
suite.NoError(err)
168168
details := cspann.PartitionStateDetails{
169169
State: cspann.DrainingForSplitState, Target1: 20, Target2: 30}
170170
CheckPartitionMetadata(suite.T(), metadata, cspann.SecondLevel, vector.T{4, 3}, details)
171171

172-
// If forUpdate = true, GetPartitionMetadata should error.
173-
var errConditionFailed *cspann.ConditionFailedError
174-
_, err = txn.GetPartitionMetadata(suite.ctx, treeKey, partitionKey, true /* forUpdate */)
175-
suite.ErrorAs(err, &errConditionFailed)
176-
CheckPartitionMetadata(suite.T(), errConditionFailed.Actual, cspann.SecondLevel,
177-
vector.T{4, 3}, details)
172+
metadata, err = txn.GetPartitionMetadata(
173+
suite.ctx, treeKey, partitionKey, true /* forUpdate */)
174+
suite.NoError(err)
175+
CheckPartitionMetadata(suite.T(), metadata, cspann.SecondLevel, vector.T{4, 3}, details)
178176
})
179177
suite.NoError(err)
180178
}
@@ -329,16 +327,12 @@ func (suite *StoreTestSuite) TestRemoveFromPartition() {
329327
suite.ctx, treeKey, partitionKey, metadata, expected))
330328

331329
RunTransaction(suite.ctx, suite.T(), store, func(txn cspann.Txn) {
332-
// Try to remove from partition, expect error due to its state.
333-
var errConditionFailed *cspann.ConditionFailedError
330+
// Try to remove from draining partition, expect success.
334331
err := txn.RemoveFromPartition(suite.ctx, treeKey, partitionKey, cspann.SecondLevel,
335332
partitionKey3)
336-
suite.ErrorAs(err, &errConditionFailed)
337-
details := cspann.PartitionStateDetails{
338-
State: cspann.DrainingForSplitState, Target1: 20, Target2: 30}
339-
CheckPartitionMetadata(suite.T(), errConditionFailed.Actual, cspann.SecondLevel,
340-
vector.T{4, 3}, details)
333+
suite.NoError(err)
341334
})
335+
CheckPartitionCount(suite.ctx, suite.T(), store, treeKey, partitionKey, 1)
342336
}
343337

344338
suite.Run("default tree", func() {
@@ -1004,9 +998,27 @@ func (suite *StoreTestSuite) TestTryClearPartition() {
1004998
// Create partition with some vectors.
1005999
partitionKey, partition := suite.createTestPartition(store, treeKey)
10061000

1007-
// Now clear should work.
1001+
// Clear should fail in Ready state.
10081002
expected := *partition.Metadata()
10091003
count, err := store.TryClearPartition(suite.ctx, treeKey, partitionKey, expected)
1004+
suite.Error(err)
1005+
suite.Equal(0, count)
1006+
1007+
// Move to draining state.
1008+
metadata := *partition.Metadata()
1009+
metadata.StateDetails.MakeDrainingForSplit(20, 30)
1010+
suite.NoError(store.TryUpdatePartitionMetadata(
1011+
suite.ctx, treeKey, partitionKey, metadata, expected))
1012+
1013+
// Try to clear with mismatched expected metadata.
1014+
var errConditionFailed *cspann.ConditionFailedError
1015+
_, err = store.TryClearPartition(suite.ctx, treeKey, partitionKey, expected)
1016+
suite.ErrorAs(err, &errConditionFailed)
1017+
suite.True(errConditionFailed.Actual.Equal(&metadata))
1018+
1019+
// Try again, this time with correct expected metadata.
1020+
expected = metadata
1021+
count, err = store.TryClearPartition(suite.ctx, treeKey, partitionKey, expected)
10101022
suite.NoError(err)
10111023
suite.Equal(3, count)
10121024

@@ -1019,15 +1031,7 @@ func (suite *StoreTestSuite) TestTryClearPartition() {
10191031
suite.Len(partition.ChildKeys(), 0)
10201032
suite.Len(partition.ValueBytes(), 0)
10211033

1022-
// Try to clear with mismatched expected metadata.
1023-
var errConditionFailed *cspann.ConditionFailedError
1024-
metadata := expected
1025-
metadata.StateDetails.State = cspann.DrainingForMergeState
1026-
_, err = store.TryClearPartition(suite.ctx, treeKey, partitionKey, metadata)
1027-
suite.ErrorAs(err, &errConditionFailed)
1028-
suite.True(errConditionFailed.Actual.Equal(&expected))
1029-
1030-
// Try again, this time with correct expected metadata.
1034+
// Clear partition with zero vectors.
10311035
count, err = store.TryClearPartition(suite.ctx, treeKey, partitionKey, expected)
10321036
suite.NoError(err)
10331037
suite.Equal(0, count)

pkg/sql/vecindex/cspann/fixup_split.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ func (fw *fixupWorker) splitPartition(
299299
}
300300
}
301301

302-
// Remove the splitting partition from the its parent. Note that we don't
302+
// Remove the splitting partition from its parent. Note that we don't
303303
// delete the partition's metadata record, instead leaving it behind as a
304304
// "tombstone". This prevents other racing workers from resurrecting the
305305
// partition as a zombie, which could otherwise happen like this:
@@ -457,7 +457,7 @@ func (fw *fixupWorker) reassignToSiblings(
457457
for offset, distance := range tempSiblingDistances {
458458
if distance >= minDistance {
459459
continue
460-
} else if !fw.tempMetadataToGet[offset].Metadata.StateDetails.State.AllowAddOrRemove() {
460+
} else if !fw.tempMetadataToGet[offset].Metadata.StateDetails.State.AllowAdd() {
461461
continue
462462
}
463463
siblingOffset = offset
@@ -593,9 +593,9 @@ func (fw *fixupWorker) addToPartition(
593593
valueBytes []ValueBytes,
594594
expected PartitionMetadata,
595595
) (added bool, err error) {
596-
if !expected.StateDetails.State.AllowAddOrRemove() {
596+
if !expected.StateDetails.State.AllowAdd() {
597597
return false, errors.AssertionFailedf(
598-
"cannot add to partition in state that disallows adds/removes")
598+
"cannot add to partition in state %s that disallows adds", expected.StateDetails.State)
599599
}
600600
fw.index.validateVectorsToAdd(expected.Level, vectors)
601601

@@ -623,10 +623,11 @@ func (fw *fixupWorker) addToPartition(
623623
func (fw *fixupWorker) clearPartition(
624624
ctx context.Context, partitionKey PartitionKey, metadata PartitionMetadata,
625625
) (err error) {
626-
if metadata.StateDetails.State.AllowAddOrRemove() {
626+
if metadata.StateDetails.State.AllowAdd() {
627+
// Something's wrong if partition is being cleared in a state that allows
628+
// new vectors to be added.
627629
return errors.AssertionFailedf(
628-
"cannot clear partition %d in state %s that allows adds/removes",
629-
partitionKey, metadata.StateDetails.String())
630+
"cannot add to partition in state %s that allows adds", metadata.StateDetails.State)
630631
}
631632

632633
// Remove all children in the partition.
@@ -823,10 +824,9 @@ func (fw *fixupWorker) removeFromParentPartition(
823824
return errors.Wrapf(err, "getting parent partition %d metadata", parentPartitionKey)
824825
}
825826

826-
if !parentMetadata.StateDetails.State.AllowAddOrRemove() || parentMetadata.Level != parentLevel {
827-
// Child could not be removed from the parent because it doesn't exist or
828-
// it no longer allows deletes, or its level has changed (i.e. in root
829-
// partition case).
827+
if parentMetadata.StateDetails.State.CanSkipRemove() || parentMetadata.Level != parentLevel {
828+
// Child should not be removed from the parent that's draining or will be
829+
// deleted, or if its level has changed (i.e. in root partition case).
830830
return errFixupAborted
831831
}
832832

pkg/sql/vecindex/cspann/index.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,10 @@ func (vi *Index) SearchForInsert(
504504
if err != nil {
505505
return errors.Wrapf(err, "locking metadata for insert into partition %d", partitionKey)
506506
}
507+
if !metadata.StateDetails.State.AllowAdd() {
508+
// The partition does not allow adds, so go to the next candidate.
509+
return NewConditionFailedError(metadata)
510+
}
507511
result.Vector = metadata.Centroid
508512
return nil
509513
}
@@ -533,10 +537,16 @@ func (vi *Index) SearchForDelete(
533537
// When a candidate delete partition is found, lock its metadata for update.
534538
removeFunc := func(ctx context.Context, idxCtx *Context, result *SearchResult) error {
535539
partitionKey := result.ParentPartitionKey
536-
_, err := idxCtx.txn.GetPartitionMetadata(ctx, treeKey, partitionKey, true /* forUpdate */)
540+
metadata, err := idxCtx.txn.GetPartitionMetadata(
541+
ctx, treeKey, partitionKey, true /* forUpdate */)
537542
if err != nil {
538543
return errors.Wrapf(err, "locking metadata for delete from partition %d", partitionKey)
539544
}
545+
if metadata.StateDetails.State.CanSkipRemove() {
546+
// The partition will be cleared or deleted anyway, so no need to
547+
// remove from it. Go to the next candidate.
548+
return NewConditionFailedError(metadata)
549+
}
540550
return nil
541551
}
542552

@@ -685,6 +695,10 @@ func (vi *Index) searchForUpdateHelper(
685695
}
686696
if !ok {
687697
if idxCtx.forInsert {
698+
// Keep aggressively searching for valid insert partition, since
699+
// the only alternative is to fail the operation. This is not
700+
// necessary in the delete case, since it's OK if there are
701+
// dangling vectors in rare cases.
688702
return vi.searchForUpdateHelper(ctx, idxCtx, fn, deleteKey, remainingAttempts)
689703
}
690704
break

pkg/sql/vecindex/cspann/index_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -818,11 +818,12 @@ func (s *testState) loadIndexFromFormat(
818818
}
819819
}
820820

821+
// Always create partition in Ready state so that adds are allowed.
821822
metadata := cspann.PartitionMetadata{
822-
Level: childLevel,
823-
Centroid: centroid,
824-
StateDetails: details,
823+
Level: childLevel,
824+
Centroid: centroid,
825825
}
826+
metadata.StateDetails.MakeReady()
826827
err = s.MemStore.TryCreateEmptyPartition(s.Ctx, treeKey, partitionKey, metadata)
827828
require.NoError(s.T, err)
828829

@@ -834,6 +835,14 @@ func (s *testState) loadIndexFromFormat(
834835
require.True(s.T, added)
835836
}
836837

838+
if details.State != cspann.ReadyState {
839+
// Update the partition's state
840+
expected := metadata
841+
metadata.StateDetails = details
842+
err = s.MemStore.TryUpdatePartitionMetadata(s.Ctx, treeKey, partitionKey, metadata, expected)
843+
require.NoError(s.T, err)
844+
}
845+
837846
return lines, childLevel + 1, centroid, cspann.ChildKey{PartitionKey: partitionKey}
838847
}
839848

pkg/sql/vecindex/cspann/memstore/memstore.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,10 @@ func (s *Store) TryAddToPartition(
467467
if !existing.Equal(&expected) {
468468
return false, cspann.NewConditionFailedError(*existing)
469469
}
470+
if !existing.StateDetails.State.AllowAdd() {
471+
return false, errors.AssertionFailedf(
472+
"cannot add to partition in state %s that disallows adds", existing.StateDetails.State)
473+
}
470474

471475
// Add the vectors to the partition. Ignore any duplicate vectors.
472476
// TODO(andyk): Figure out how to give Store flexible scratch space.
@@ -520,15 +524,19 @@ func (s *Store) TryClearPartition(
520524
memPart := s.lockPartition(treeKey, partitionKey, uniqueOwner, true /* isExclusive */)
521525
if memPart == nil {
522526
// Partition does not exist.
523-
return -1, cspann.ErrPartitionNotFound
527+
return 0, cspann.ErrPartitionNotFound
524528
}
525529
defer memPart.lock.Release()
526530

527531
// Check precondition.
528532
partition := memPart.lock.partition
529533
existing := partition.Metadata()
530534
if !existing.Equal(&expected) {
531-
return -1, cspann.NewConditionFailedError(*existing)
535+
return 0, cspann.NewConditionFailedError(*existing)
536+
}
537+
if existing.StateDetails.State.AllowAdd() {
538+
return 0, errors.AssertionFailedf(
539+
"cannot clear partition in state %s that allows adds", existing.StateDetails.State)
532540
}
533541

534542
// Remove vectors from the partition and update partition count.

pkg/sql/vecindex/cspann/memstore/memstore_txn.go

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,7 @@ func (tx *memTxn) GetPartitionMetadata(
7979
defer memPart.lock.ReleaseShared()
8080
}
8181

82-
// Do not allow updates to the partition if the state doesn't allow it.
83-
metadata := memPart.lock.partition.Metadata()
84-
if forUpdate && !metadata.StateDetails.State.AllowAddOrRemove() {
85-
err = cspann.NewConditionFailedError(*metadata)
86-
return cspann.PartitionMetadata{}, errors.Wrapf(err,
87-
"getting metadata for partition %d (state=%s)",
88-
partitionKey, metadata.StateDetails.State.String())
89-
}
90-
91-
return *metadata, nil
82+
return *memPart.lock.partition.Metadata(), nil
9283
}
9384

9485
// AddToPartition implements the Txn interface.
@@ -117,7 +108,7 @@ func (tx *memTxn) AddToPartition(
117108
// allow it.
118109
partition := memPart.lock.partition
119110
state := partition.Metadata().StateDetails.State
120-
if !state.AllowAddOrRemove() {
111+
if !state.AllowAdd() {
121112
return errors.Wrapf(cspann.NewConditionFailedError(*partition.Metadata()),
122113
"adding to partition %d (state=%s)", partitionKey, state.String())
123114
}
@@ -158,16 +149,8 @@ func (tx *memTxn) RemoveFromPartition(
158149
}
159150
defer memPart.lock.Release()
160151

161-
// Do not allow vectors to be removed from the partition if the state doesn't
162-
// allow it.
163-
partition := memPart.lock.partition
164-
state := partition.Metadata().StateDetails.State
165-
if !state.AllowAddOrRemove() {
166-
return errors.Wrapf(cspann.NewConditionFailedError(*partition.Metadata()),
167-
"removing from partition %d (state=%s)", partitionKey, state.String())
168-
}
169-
170152
// Remove vector from the partition.
153+
partition := memPart.lock.partition
171154
if level != partition.Level() {
172155
return errors.Wrapf(cspann.ErrRestartOperation,
173156
"removing from partition %d (expected: %d, actual: %d)",

pkg/sql/vecindex/cspann/partition_metadata.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,8 @@ func ParsePartitionState(s string) PartitionState {
4545
return MissingState
4646
}
4747

48-
// AllowAddOrRemove returns true if vectors can be added or removed to/from the
49-
// partition in this state.
50-
func (s PartitionState) AllowAddOrRemove() bool {
48+
// AllowAdd returns true if vectors can be added to the partition in this state.
49+
func (s PartitionState) AllowAdd() bool {
5150
switch s {
5251
case ReadyState, SplittingState, MergingState, UpdatingState,
5352
AddingLevelState, RemovingLevelState:
@@ -56,6 +55,18 @@ func (s PartitionState) AllowAddOrRemove() bool {
5655
return false
5756
}
5857

58+
// CanSkipRemove returns true if there's no need to remove vectors from the
59+
// partition because the partition is being drained or is about to be deleted.
60+
func (s PartitionState) CanSkipRemove() bool {
61+
return !s.AllowAdd()
62+
}
63+
64+
// AllowRemove returns true if vectors can be removed from the partition in this
65+
// state.
66+
func (s PartitionState) AllowRemove() bool {
67+
return s != MissingState
68+
}
69+
5970
// String returns the state formatted as a string.
6071
func (s PartitionState) String() string {
6172
switch s {
@@ -104,12 +115,12 @@ const (
104115
UpdatingState
105116
// DrainingForSplitState indicates that the partition is actively moving
106117
// vectors to target split sub-partitions. Searches are allowed, but not
107-
// inserts, deletes, splits, or merges.
118+
// inserts, splits, or merges.
108119
DrainingForSplitState
109120
// DrainingForMergeState indicates that the partition is actively moving
110121
// vectors into other partitions at the same level (or deleting vectors if
111-
// this is the root partition). Searches are allowed, but not inserts,
112-
// deletes, splits, or merges.
122+
// this is the root partition). Searches are allowed, but not inserts, splits,
123+
// or merges.
113124
DrainingForMergeState
114125
// AddingLevelState indicates that a root partition has been drained after a
115126
// split and has had its level increased by one. What remains is to add the
@@ -123,7 +134,7 @@ const (
123134
RemovingLevelState
124135
// DeletingForSplitState indicates that a non-root splitting partition is
125136
// about to be removed from its tree and deleted. Searches are allowed, but
126-
// not inserts, deletes, splits, or merges.
137+
// not inserts, splits, or merges.
127138
DeletingForSplitState
128139
)
129140

pkg/sql/vecindex/cspann/searcher.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -422,16 +422,16 @@ func (s *levelSearcher) searchChildPartitions(
422422
}
423423
s.stats.SearchedPartition(level, count)
424424

425-
// If searching for vector to delete, skip partitions that are in a state
426-
// that does not allow add and remove operations. This is not possible to
427-
// do here for the insert case, because we do not actually search the
428-
// partition in which to insert; we only search its parent and never get
429-
// the metadata for the insert partition itself.
425+
// If searching for vector to delete, skip partitions that don't need
426+
// vectors deleted from them (because they are draining or deleting). This
427+
// is not possible to do here for the insert case, because we do not
428+
// actually search the partition in which to insert; we only search its
429+
// parent and never get the metadata for the insert partition itself.
430430
// TODO(andyk): This should probably be checked in the Store, perhaps by
431431
// passing a "forUpdate" parameter to SearchPartitions, so that the Store
432432
// doesn't even add vectors from partitions that do not allow updates.
433433
if s.idxCtx.forDelete && s.idxCtx.level == level {
434-
if !s.idxCtx.tempToSearch[i].StateDetails.State.AllowAddOrRemove() {
434+
if s.idxCtx.tempToSearch[i].StateDetails.State.CanSkipRemove() {
435435
s.searchSet.RemoveByParent(partitionKey)
436436
}
437437
}

0 commit comments

Comments
 (0)