Skip to content

Commit 75e1394

Browse files
committed
vecindex: add TryMoveVector method to Store interface
Add a new TryMoveVector to the Store interface. This method moves a single vector from one partition to another as an atomic operation, provided the destination metadata is as expected. Epic: CRDB-42943 Release note: None
1 parent 73cc435 commit 75e1394

File tree

6 files changed

+349
-25
lines changed

6 files changed

+349
-25
lines changed

pkg/sql/vecindex/cspann/commontest/storetests.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
package commontest
77

88
import (
9+
"cmp"
910
"context"
11+
"slices"
1012
"testing"
1113

1214
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -987,6 +989,135 @@ func (suite *StoreTestSuite) TestTryRemoveFromPartition() {
987989
}
988990
}
989991

992+
func (suite *StoreTestSuite) TestTryMoveVector() {
993+
store := suite.makeStore(suite.quantizer)
994+
defer store.Close(suite.T())
995+
996+
doTest := func(treeID int) {
997+
treeKey := store.MakeTreeKey(suite.T(), treeID)
998+
999+
// Create source partition with some vectors.
1000+
sourcePartitionKey, _ := suite.createTestPartition(store, treeKey)
1001+
1002+
// Create empty target partition.
1003+
targetPartitionKey := cspann.PartitionKey(20)
1004+
metadata := cspann.PartitionMetadata{
1005+
Level: cspann.SecondLevel,
1006+
Centroid: vector.T{2, 4},
1007+
}
1008+
metadata.StateDetails.MakeReady()
1009+
suite.NoError(store.TryCreateEmptyPartition(suite.ctx, treeKey, targetPartitionKey, metadata))
1010+
targetPartition, err := store.TryGetPartition(suite.ctx, treeKey, targetPartitionKey)
1011+
suite.NoError(err)
1012+
1013+
// Source partition does not yet exist.
1014+
expected := *targetPartition.Metadata()
1015+
moved, err := store.TryMoveVector(
1016+
suite.ctx, treeKey, cspann.PartitionKey(99), targetPartitionKey,
1017+
vec1, partitionKey1, valueBytes1, expected)
1018+
suite.NoError(err)
1019+
suite.False(moved)
1020+
1021+
// Destination partition does not yet exist.
1022+
moved, err = store.TryMoveVector(
1023+
suite.ctx, treeKey, sourcePartitionKey, cspann.PartitionKey(99),
1024+
vec1, partitionKey1, valueBytes1, cspann.PartitionMetadata{})
1025+
suite.NoError(err)
1026+
suite.False(moved)
1027+
1028+
// Source partition is the same as destination partition.
1029+
moved, err = store.TryMoveVector(
1030+
suite.ctx, treeKey, sourcePartitionKey, sourcePartitionKey,
1031+
vec1, partitionKey1, valueBytes1, expected)
1032+
suite.NoError(err)
1033+
suite.False(moved)
1034+
1035+
// Now move should work.
1036+
moved, err = store.TryMoveVector(
1037+
suite.ctx, treeKey, sourcePartitionKey, targetPartitionKey,
1038+
vec1, partitionKey1, valueBytes1, expected)
1039+
suite.NoError(err)
1040+
suite.True(moved)
1041+
1042+
// Fetch back the target partition and validate it.
1043+
targetPartition, err = store.TryGetPartition(suite.ctx, treeKey, targetPartitionKey)
1044+
suite.NoError(err)
1045+
suite.Equal([]cspann.ChildKey{partitionKey1}, targetPartition.ChildKeys())
1046+
suite.Equal([]cspann.ValueBytes{valueBytes1}, targetPartition.ValueBytes())
1047+
1048+
// Try to move again, but with mismatched expected metadata.
1049+
var errConditionFailed *cspann.ConditionFailedError
1050+
metadata = expected
1051+
metadata.StateDetails.State = cspann.DrainingForMergeState
1052+
moved, err = store.TryMoveVector(
1053+
suite.ctx, treeKey, sourcePartitionKey, targetPartitionKey,
1054+
vec1, partitionKey3, valueBytes3, metadata)
1055+
suite.ErrorAs(err, &errConditionFailed)
1056+
suite.False(moved)
1057+
suite.True(errConditionFailed.Actual.Equal(&expected))
1058+
1059+
// Try again, this time with correct expected metadata.
1060+
moved, err = store.TryMoveVector(
1061+
suite.ctx, treeKey, sourcePartitionKey, targetPartitionKey,
1062+
vec1, partitionKey3, valueBytes3, expected)
1063+
suite.NoError(err)
1064+
suite.True(moved)
1065+
1066+
// Fetch back the source partition and validate it.
1067+
sourcePartition, err := store.TryGetPartition(suite.ctx, treeKey, sourcePartitionKey)
1068+
suite.NoError(err)
1069+
suite.Equal([]cspann.ChildKey{partitionKey2}, sourcePartition.ChildKeys())
1070+
suite.Equal([]cspann.ValueBytes{valueBytes2}, sourcePartition.ValueBytes())
1071+
1072+
// Try to move a vector that no longer exists in the source partition.
1073+
moved, err = store.TryMoveVector(
1074+
suite.ctx, treeKey, sourcePartitionKey, targetPartitionKey,
1075+
vec1, partitionKey3, valueBytes3, expected)
1076+
suite.NoError(err)
1077+
suite.False(moved)
1078+
1079+
// Try to move a vector that already exists in the target partition.
1080+
added, err := store.TryAddToPartition(
1081+
suite.ctx, treeKey, targetPartitionKey, vec2.AsSet(),
1082+
[]cspann.ChildKey{partitionKey2}, []cspann.ValueBytes{valueBytes2}, expected)
1083+
suite.NoError(err)
1084+
suite.True(added)
1085+
1086+
moved, err = store.TryMoveVector(
1087+
suite.ctx, treeKey, sourcePartitionKey, targetPartitionKey,
1088+
vec1, partitionKey2, valueBytes2, expected)
1089+
suite.NoError(err)
1090+
suite.False(moved)
1091+
1092+
// Ensure that the vector was not removed from the source partition.
1093+
sourcePartition, err = store.TryGetPartition(suite.ctx, treeKey, sourcePartitionKey)
1094+
suite.NoError(err)
1095+
suite.Equal([]cspann.ChildKey{partitionKey2}, sourcePartition.ChildKeys())
1096+
suite.Equal([]cspann.ValueBytes{valueBytes2}, sourcePartition.ValueBytes())
1097+
1098+
// Ensure that the target partition now has all three vectors.
1099+
targetPartition, err = store.TryGetPartition(suite.ctx, treeKey, targetPartitionKey)
1100+
suite.NoError(err)
1101+
suite.Equal(3, targetPartition.Count())
1102+
childKeys := slices.Clone(targetPartition.ChildKeys())
1103+
slices.SortFunc(childKeys, func(a, b cspann.ChildKey) int {
1104+
return cmp.Compare(a.PartitionKey, b.PartitionKey)
1105+
})
1106+
suite.Equal([]cspann.ChildKey{partitionKey1, partitionKey2, partitionKey3}, childKeys)
1107+
}
1108+
1109+
suite.Run("default tree", func() {
1110+
doTest(0)
1111+
})
1112+
1113+
if store.AllowMultipleTrees() {
1114+
// Ensure that vectors are independent across trees.
1115+
suite.Run("different tree", func() {
1116+
doTest(1)
1117+
})
1118+
}
1119+
}
1120+
9901121
func (suite *StoreTestSuite) TestTryClearPartition() {
9911122
store := suite.makeStore(suite.quantizer)
9921123
defer store.Close(suite.T())

pkg/sql/vecindex/cspann/memstore/memstore.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,76 @@ func (s *Store) TryRemoveFromPartition(
514514
return removed, err
515515
}
516516

517+
// TryMoveVector implements the Store interface.
518+
func (s *Store) TryMoveVector(
519+
ctx context.Context,
520+
treeKey cspann.TreeKey,
521+
sourcePartitionKey, targetPartitionKey cspann.PartitionKey,
522+
vec vector.T,
523+
childKey cspann.ChildKey,
524+
valueBytes cspann.ValueBytes,
525+
expected cspann.PartitionMetadata,
526+
) (moved bool, err error) {
527+
if sourcePartitionKey == targetPartitionKey {
528+
// No-op move.
529+
return false, nil
530+
}
531+
532+
// Always lock the partition with the lower key first, in order to avoid
533+
// deadlocks.
534+
partitionKey1 := sourcePartitionKey
535+
partitionKey2 := targetPartitionKey
536+
if partitionKey2 < partitionKey1 {
537+
partitionKey1, partitionKey2 = partitionKey2, partitionKey1
538+
}
539+
sourceMemPart := s.lockPartition(treeKey, partitionKey1, uniqueOwner, true /* isExclusive */)
540+
if sourceMemPart == nil {
541+
// Partition does not exist.
542+
return false, nil
543+
}
544+
defer sourceMemPart.lock.Release()
545+
546+
targetMemPart := s.lockPartition(treeKey, partitionKey2, uniqueOwner, true /* isExclusive */)
547+
if targetMemPart == nil {
548+
// Partition does not exist.
549+
return false, nil
550+
}
551+
defer targetMemPart.lock.Release()
552+
553+
if targetPartitionKey < sourcePartitionKey {
554+
sourceMemPart, targetMemPart = targetMemPart, sourceMemPart
555+
}
556+
557+
// Check precondition against target partition.
558+
targetPartition := targetMemPart.lock.partition
559+
existing := targetPartition.Metadata()
560+
if !existing.Equal(&expected) {
561+
return false, cspann.NewConditionFailedError(*existing)
562+
}
563+
564+
// Ensure vector is in the source partition, but don't remove it yet.
565+
sourcePartition := sourceMemPart.lock.partition
566+
if sourcePartition.Find(childKey) == -1 {
567+
return false, nil
568+
}
569+
570+
// Add vector to the target partition.
571+
// TODO(andyk): Figure out how to give Store flexible scratch space.
572+
var w workspace.T
573+
added := targetPartition.Add(&w, vec, childKey, valueBytes, false /* overwrite */)
574+
if !added {
575+
// Vector already exists in the target partition.
576+
return false, nil
577+
}
578+
targetMemPart.count.Store(int64(targetPartition.Count()))
579+
580+
// Remove vector from the source partition.
581+
_ = sourcePartition.ReplaceWithLastByKey(childKey)
582+
sourceMemPart.count.Store(int64(sourcePartition.Count()))
583+
584+
return true, nil
585+
}
586+
517587
// TryClearPartition implements the Store interface.
518588
func (s *Store) TryClearPartition(
519589
ctx context.Context,

pkg/sql/vecindex/cspann/memstore/memstore_lock.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,63 +40,63 @@ type memLock struct {
4040

4141
// IsAcquiredBy returns true if the lock is exclusively owned by the given
4242
// owner, or false if not.
43-
func (pl *memLock) IsAcquiredBy(owner uint64) bool {
44-
return owner != uniqueOwner && pl.exclusiveOwner.Load() == owner
43+
func (l *memLock) IsAcquiredBy(owner uint64) bool {
44+
return owner != uniqueOwner && l.exclusiveOwner.Load() == owner
4545
}
4646

4747
// Acquire obtains exclusive write access to the resource protected by this
4848
// lock. The same owner can obtain the lock multiple times. The caller must
4949
// ensure that Release is called for each call to Acquire.
50-
func (pl *memLock) Acquire(owner uint64) {
51-
if owner != uniqueOwner && pl.exclusiveOwner.Load() == owner {
50+
func (l *memLock) Acquire(owner uint64) {
51+
if owner != uniqueOwner && l.exclusiveOwner.Load() == owner {
5252
// Exclusive lock has already been acquired by this owner.
53-
pl.mu.reentrancy++
53+
l.mu.reentrancy++
5454
return
5555
}
5656

5757
// Block until exclusive lock is acquired.
58-
pl.mu.Lock()
59-
pl.exclusiveOwner.Store(owner) //nolint:deferunlockcheck
58+
l.mu.Lock()
59+
l.exclusiveOwner.Store(owner) //nolint:deferunlockcheck
6060
}
6161

6262
// AcquireShared obtains shared read access to the resource protected by this
6363
// lock. The same owner can obtain the lock multiple times. The caller must
6464
// ensure that ReleaseShared is called for each all to AcquireShared.
65-
func (pl *memLock) AcquireShared(owner uint64) {
66-
if owner != uniqueOwner && pl.exclusiveOwner.Load() == owner {
65+
func (l *memLock) AcquireShared(owner uint64) {
66+
if owner != uniqueOwner && l.exclusiveOwner.Load() == owner {
6767
// Exclusive lock has already been acquired by this owner.
68-
pl.mu.reentrancy++
68+
l.mu.reentrancy++
6969
return
7070
}
7171

7272
// Block until shared lock is acquired.
73-
pl.mu.RLock()
73+
l.mu.RLock()
7474
}
7575

7676
// Release unlocks exclusive write access to the protected resource obtained by
7777
// a call to Acquire. If the same owner made multiple Acquire calls, the lock
7878
// isn't released until Release is called the same number of times.
79-
func (pl *memLock) Release() {
80-
if pl.mu.reentrancy > 0 {
81-
pl.mu.reentrancy--
79+
func (l *memLock) Release() {
80+
if l.mu.reentrancy > 0 {
81+
l.mu.reentrancy--
8282
return
8383
}
8484

8585
// No remaining reentrancy, so release lock.
86-
pl.exclusiveOwner.Store(uniqueOwner)
87-
pl.mu.Unlock()
86+
l.exclusiveOwner.Store(uniqueOwner)
87+
l.mu.Unlock()
8888
}
8989

9090
// ReleaseShared unlocks shared read access to the protected resource obtained
9191
// by a call to AcquireShared. If the same owner made multiple AcquireShared
9292
// calls, the lock isn't released until ReleaseShared is called the same number
9393
// of times.
94-
func (pl *memLock) ReleaseShared() {
95-
if pl.mu.reentrancy > 0 {
96-
pl.mu.reentrancy--
94+
func (l *memLock) ReleaseShared() {
95+
if l.mu.reentrancy > 0 {
96+
l.mu.reentrancy--
9797
return
9898
}
9999

100100
// No remaining reentrancy, so release lock.
101-
pl.mu.RUnlock()
101+
l.mu.RUnlock()
102102
}

pkg/sql/vecindex/cspann/memstore/memstore_txn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ func (tx *memTxn) GetFullVectors(
225225
func(vectorWithKey *cspann.VectorWithKey) {
226226
// Lock the partition to read its data.
227227
memPart := tx.store.lockPartition(
228-
treeKey, vectorWithKey.Key.PartitionKey, uniqueOwner, false /* isExclusive */)
228+
treeKey, vectorWithKey.Key.PartitionKey, tx.id, false /* isExclusive */)
229229
if memPart != nil {
230230
defer memPart.lock.ReleaseShared()
231231
vectorWithKey.Vector = memPart.lock.partition.Centroid()

pkg/sql/vecindex/cspann/store.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,28 @@ type Store interface {
158158
expected PartitionMetadata,
159159
) (removed bool, err error)
160160

161+
// TryMoveVector attempts to move a single vector from the source partition to
162+
// the target partition, as an atommic operation. If either partition does not
163+
// exist, it returns moved=false. If the vector does not exist in the source
164+
// partition, or already exists in the target partition, it returns
165+
// moved=false.
166+
//
167+
// Before performing any action, TryMoveVector checks the target partition's
168+
// metadata and returns a ConditionFailedError if it is not the same as the
169+
// expected metadata.
170+
//
171+
// NOTE: The source and target partitions must be on the same level. Results
172+
// are not defined if this is not true.
173+
TryMoveVector(
174+
ctx context.Context,
175+
treeKey TreeKey,
176+
sourcePartitionKey, targetPartitionKey PartitionKey,
177+
vec vector.T,
178+
childKey ChildKey,
179+
valueBytes ValueBytes,
180+
expected PartitionMetadata,
181+
) (moved bool, err error)
182+
161183
// TryClearPartition removes all vectors in the specified partition and
162184
// returns the number of vectors that were cleared. It returns
163185
// ErrPartitionNotFound if the partition does not exist.

0 commit comments

Comments
 (0)