Skip to content

Commit 30fbeb8

Browse files
committed
cspann: update TestIndexConcurrency test to delete vectors
This commit modifies the TestIndexConcurrency test to delete one vector in each inserted batch. The test interleaves concurrent inserts, deletes, searches, and splits on multiple goroutines in order to find concurrency bugs. The modification found multiple issues that are fixed in subsequent commits. Epic: CRDB-42943 Release note: None
1 parent 90b56ac commit 30fbeb8

File tree

2 files changed

+32
-44
lines changed

2 files changed

+32
-44
lines changed

pkg/sql/vecindex/cspann/index_test.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ func TestIndexConcurrency(t *testing.T) {
946946
defer stopper.Stop(ctx)
947947

948948
// Load features.
949-
features := testutils.LoadFeatures(t, 1000)
949+
features := testutils.LoadFeatures(t, 2000)
950950

951951
// Trim feature dimensions from 512 to 4, in order to make the test run
952952
// faster and hit more interesting concurrency combinations.
@@ -962,6 +962,8 @@ func TestIndexConcurrency(t *testing.T) {
962962
for i := range 10 {
963963
log.Infof(ctx, "iteration %d", i)
964964

965+
// Set small partition size and beam size to trigger frequent splits and
966+
// merges.
965967
options := cspann.IndexOptions{
966968
MinPartitionSize: 2,
967969
MaxPartitionSize: 8,
@@ -974,10 +976,10 @@ func TestIndexConcurrency(t *testing.T) {
974976
index, err := cspann.NewIndex(ctx, store, quantizer, seed, &options, stopper)
975977
require.NoError(t, err)
976978

977-
buildIndex(ctx, t, store, index, vectors, primaryKeys)
979+
expected := buildIndex(ctx, t, store, index, vectors, primaryKeys)
978980

979981
vectorCount := validateIndex(ctx, t, store)
980-
require.Equal(t, vectors.Count, vectorCount)
982+
require.Equal(t, expected, vectorCount)
981983

982984
index.Close()
983985
}
@@ -990,11 +992,11 @@ func buildIndex(
990992
index *cspann.Index,
991993
vectors vector.Set,
992994
primaryKeys []cspann.KeyBytes,
993-
) {
994-
var insertCount atomic.Uint64
995+
) int {
996+
var insertCount, deleteCount atomic.Uint64
995997

996998
// Insert block of vectors within the scope of a transaction.
997-
insertBlock := func(idxCtx *cspann.Context, start, end int) {
999+
insertVectors := func(idxCtx *cspann.Context, start, end int) {
9981000
for i := start; i < end; i++ {
9991001
commontest.RunTransaction(ctx, t, store, func(txn cspann.Txn) {
10001002
idxCtx.Init(txn)
@@ -1006,7 +1008,17 @@ func buildIndex(
10061008
}
10071009
}
10081010

1009-
// Insert vectors into the store on multiple goroutines.
1011+
deleteVector := func(idxCtx *cspann.Context, vec vector.T, key cspann.KeyBytes) {
1012+
commontest.RunTransaction(ctx, t, store, func(txn cspann.Txn) {
1013+
idxCtx.Init(txn)
1014+
require.NoError(t, index.Delete(ctx, idxCtx, nil /* treeKey */, vec, key))
1015+
store.DeleteVector(key)
1016+
})
1017+
deleteCount.Add(1)
1018+
}
1019+
1020+
// Insert vectors into the store on multiple goroutines. Delete the first
1021+
// vector in each block.
10101022
var wait sync.WaitGroup
10111023
procs := runtime.GOMAXPROCS(-1)
10121024
countPerProc := (vectors.Count + procs) / procs
@@ -1021,18 +1033,24 @@ func buildIndex(
10211033
// block of vectors. Run any pending fixups after each block.
10221034
var idxCtx cspann.Context
10231035
for j := start; j < end; j += blockSize {
1024-
insertBlock(&idxCtx, j, min(j+blockSize, end))
1036+
insertVectors(&idxCtx, j, min(j+blockSize, end))
1037+
deleteVector(&idxCtx, vectors.At(j), primaryKeys[j])
10251038
index.ProcessFixups()
10261039
}
10271040
}(i, end)
10281041
}
10291042

1043+
logProgress := func() {
1044+
log.Infof(ctx, "%d vectors inserted, %d vectors deleted",
1045+
insertCount.Load(), deleteCount.Load())
1046+
}
1047+
10301048
info := log.Every(time.Second)
10311049
for int(insertCount.Load()) < vectors.Count {
10321050
time.Sleep(10 * time.Millisecond)
10331051

10341052
if info.ShouldLog() {
1035-
log.Infof(ctx, "%d vectors inserted", insertCount.Load())
1053+
logProgress()
10361054
}
10371055

10381056
// Fail on foreground goroutine if any background goroutines failed.
@@ -1046,7 +1064,9 @@ func buildIndex(
10461064
// Process any remaining fixups.
10471065
index.ProcessFixups()
10481066

1049-
log.Infof(ctx, "%d vectors inserted", vectors.Count)
1067+
logProgress()
1068+
1069+
return int(insertCount.Load() - deleteCount.Load())
10501070
}
10511071

10521072
func validateIndex(ctx context.Context, t *testing.T, store *memstore.Store) int {
@@ -1083,15 +1103,13 @@ func validateIndex(ctx context.Context, t *testing.T, store *memstore.Store) int
10831103
refs := make([]cspann.VectorWithKey, len(childKeys))
10841104
for i := range childKeys {
10851105
refs[i].Key = childKeys[i]
1086-
deDup.TryAdd(childKeys[i])
10871106
}
10881107
err := txn.GetFullVectors(ctx, treeKey, refs)
10891108
require.NoError(t, err)
10901109
for i := range refs {
1091-
if refs[i].Vector == nil {
1092-
panic("vector is nil")
1110+
if refs[i].Vector != nil {
1111+
deDup.TryAdd(refs[i].Key)
10931112
}
1094-
require.NotNil(t, refs[i].Vector)
10951113
}
10961114
})
10971115

pkg/sql/vecindex/cspann/testdata/test.ddt

Lines changed: 0 additions & 30 deletions
This file was deleted.

0 commit comments

Comments
 (0)