Skip to content

Commit 37c46db

Browse files
committed
cspann: fix race condition in vector delete code.
The deletion code had a race where it would sometimes attempt to delete a vector from a partition that disallowed deletes. The fix is to convert the Delete and SearchForDelete code paths to use the same retry pattern as Insert and SearchForInsert do. The searchForInsertHelper method has been generalized into a searchForUpdateHelper method that handles both insert and delete cases. Epic: CRDB-42943 Release note: None
1 parent 30fbeb8 commit 37c46db

File tree

2 files changed

+141
-112
lines changed

2 files changed

+141
-112
lines changed

pkg/sql/vecindex/cspann/index.go

Lines changed: 134 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,17 @@ func (vi *Index) Insert(
345345

346346
// Insert the vector into the secondary index.
347347
childKey := ChildKey{KeyBytes: key}
348-
return vi.insertHelper(ctx, idxCtx, childKey, ValueBytes{})
348+
valueBytes := ValueBytes{}
349+
350+
// When a candidate insert partition is found, add the vector to it.
351+
addFunc := func(ctx context.Context, idxCtx *Context, result *SearchResult) error {
352+
partitionKey := result.ChildKey.PartitionKey
353+
return vi.addToPartition(ctx, idxCtx.txn, idxCtx.treeKey,
354+
partitionKey, idxCtx.level-1, idxCtx.randomized, childKey, valueBytes)
355+
}
356+
357+
_, err := vi.searchForUpdateHelper(ctx, idxCtx, addFunc, nil /* deleteKey */)
358+
return err
349359
}
350360

351361
// Delete attempts to remove a vector from the index, given its value and
@@ -360,17 +370,21 @@ func (vi *Index) Insert(
360370
func (vi *Index) Delete(
361371
ctx context.Context, idxCtx *Context, treeKey TreeKey, vec vector.T, key KeyBytes,
362372
) error {
363-
result, err := vi.SearchForDelete(ctx, idxCtx, treeKey, vec, key)
364-
if err != nil {
373+
// Potentially throttle operation if background work is falling behind.
374+
if err := vi.fixups.DelayInsertOrDelete(ctx); err != nil {
365375
return err
366376
}
367-
if result == nil {
368-
return nil
377+
378+
vi.setupDeleteContext(idxCtx, treeKey, vec)
379+
380+
// When a candidate delete partition is found, remove the vector from it.
381+
removeFunc := func(ctx context.Context, idxCtx *Context, result *SearchResult) error {
382+
return vi.removeFromPartition(ctx, idxCtx.txn, idxCtx.treeKey,
383+
result.ParentPartitionKey, idxCtx.level, result.ChildKey)
369384
}
370385

371-
// Remove the vector from its partition in the store.
372-
return vi.removeFromPartition(
373-
ctx, idxCtx.txn, treeKey, result.ParentPartitionKey, LeafLevel, result.ChildKey)
386+
_, err := vi.searchForUpdateHelper(ctx, idxCtx, removeFunc, key)
387+
return err
374388
}
375389

376390
// Search finds vectors in the index that are closest to the given query vector
@@ -417,15 +431,13 @@ func (vi *Index) SearchForInsert(
417431
return nil
418432
}
419433

420-
return vi.searchForInsertHelper(ctx, idxCtx, getFunc)
434+
return vi.searchForUpdateHelper(ctx, idxCtx, getFunc, nil /* deleteKey */)
421435
}
422436

423437
// SearchForDelete finds the leaf partition containing the vector to be deleted.
424438
// It returns a single search result containing the key of that partition, or
425439
// nil if the vector cannot be found. This is useful for callers that directly
426440
// delete KV rows rather than using this library to do it.
427-
// TODO(andyk): This needs to use the searcher, and needs to acquire a lock on
428-
// the partition.
429441
func (vi *Index) SearchForDelete(
430442
ctx context.Context, idxCtx *Context, treeKey TreeKey, vec vector.T, key KeyBytes,
431443
) (*SearchResult, error) {
@@ -434,35 +446,16 @@ func (vi *Index) SearchForDelete(
434446
return nil, err
435447
}
436448

437-
// Don't rerank results, since we just need a key match.
438-
vi.setupContext(idxCtx, treeKey, vec, SearchOptions{
439-
SkipRerank: true,
440-
UpdateStats: true,
441-
}, LeafLevel)
442-
idxCtx.forDelete = true
443-
444-
// Search with the base beam size. If that fails to find the vector, try again
445-
// with a larger beam size, in order to minimize the chance of dangling
446-
// vector references in the index.
447-
baseBeamSize := max(vi.options.BaseBeamSize, 1)
448-
for range 2 {
449-
idxCtx.tempSearchSet = SearchSet{MaxResults: 1, MatchKey: key}
450-
idxCtx.options.BaseBeamSize = baseBeamSize
449+
vi.setupDeleteContext(idxCtx, treeKey, vec)
451450

452-
err := vi.searchHelper(ctx, idxCtx, &idxCtx.tempSearchSet)
453-
if err != nil {
454-
return nil, err
455-
}
456-
results := idxCtx.tempSearchSet.PopResults()
457-
if len(results) == 0 {
458-
// Retry search with significantly higher beam size.
459-
baseBeamSize *= 8
460-
} else {
461-
return &results[0], nil
462-
}
451+
// When a candidate delete partition is found, lock its metadata for update.
452+
removeFunc := func(ctx context.Context, idxCtx *Context, result *SearchResult) error {
453+
partitionKey := result.ParentPartitionKey
454+
_, err := idxCtx.txn.GetPartitionMetadata(ctx, treeKey, partitionKey, true /* forUpdate */)
455+
return err
463456
}
464457

465-
return nil, nil
458+
return vi.searchForUpdateHelper(ctx, idxCtx, removeFunc, key)
466459
}
467460

468461
// SuspendFixups suspends background fixup processing until ProcessFixups is
@@ -519,6 +512,19 @@ func (vi *Index) setupInsertContext(idxCtx *Context, treeKey TreeKey, vec vector
519512
idxCtx.forInsert = true
520513
}
521514

515+
// setupDeleteContext sets up the given context for a delete operation.
516+
func (vi *Index) setupDeleteContext(idxCtx *Context, treeKey TreeKey, vec vector.T) {
517+
// Perform the search using quantized vectors rather than full vectors (i.e.
518+
// skip reranking). Use a larger beam size to make it more likely that we'll
519+
// find the vector to delete.
520+
vi.setupContext(idxCtx, treeKey, vec, SearchOptions{
521+
BaseBeamSize: vi.options.BaseBeamSize * 2,
522+
SkipRerank: true,
523+
UpdateStats: true,
524+
}, LeafLevel)
525+
idxCtx.forDelete = true
526+
}
527+
522528
// setupContext sets up the given context as an operation is beginning.
523529
func (vi *Index) setupContext(
524530
idxCtx *Context, treeKey TreeKey, vec vector.T, options SearchOptions, level Level,
@@ -527,6 +533,7 @@ func (vi *Index) setupContext(
527533
idxCtx.level = level
528534
idxCtx.original = vec
529535
idxCtx.forInsert = false
536+
idxCtx.forDelete = false
530537
idxCtx.options = options
531538
if idxCtx.options.BaseBeamSize == 0 {
532539
idxCtx.options.BaseBeamSize = vi.options.BaseBeamSize
@@ -537,101 +544,110 @@ func (vi *Index) setupContext(
537544
idxCtx.randomized = vi.RandomizeVector(vec, idxCtx.randomized)
538545
}
539546

540-
// insertHelper looks for the best partition in which to add the vector and then
541-
// adds the vector to that partition. This is an internal helper method that can
542-
// be used by callers once they have set up a search context.
543-
func (vi *Index) insertHelper(
544-
ctx context.Context, idxCtx *Context, childKey ChildKey, valueBytes ValueBytes,
545-
) error {
546-
// When a candidate insert partition is found, add the vector to it.
547-
addFunc := func(ctx context.Context, idxCtx *Context, result *SearchResult) error {
548-
partitionKey := result.ChildKey.PartitionKey
549-
return vi.addToPartition(ctx, idxCtx.txn, idxCtx.treeKey,
550-
partitionKey, idxCtx.level-1, idxCtx.randomized, childKey, valueBytes)
551-
}
552-
553-
_, err := vi.searchForInsertHelper(ctx, idxCtx, addFunc)
554-
return err
555-
}
556-
557-
// insertFunc is called by searchForInsertHelper when it has a candidate insert
558-
// partition, i.e. a partition that may allow a vector to be added to it. The
559-
// Insert operation will attempt to directly add the vector, while the
560-
// SearchForInsert operation will only lock the partition for later update. If
561-
// the partition is in a state that does not allow adds or removes, then the
562-
// function will return ConditionFailedError with the latest metadata.
563-
type insertFunc func(ctx context.Context, idxCtx *Context, result *SearchResult) error
564-
565-
// searchForInsertHelper searches for the best partition in which to add a
566-
// vector and calls the insert function. If that returns ConditionFailedError,
567-
// then the partition does not allow adds or removes, so searchForInsertHelper
568-
// tries again with another partition, and so on. Eventually, the search will
569-
// succeed, and searchForInsertHelper returns a search result containing the
570-
// insert partition (never nil).
547+
// updateFunc is called by searchForUpdateHelper when it has a candidate
548+
// partition to update, i.e. a partition that may allow a vector to be added to
549+
// or removed from it. Different operations will take different actions; for
550+
// example, the Insert operation will attempt to directly add the vector to the
551+
// partition, while the SearchForInsert operation will only lock the partition
552+
// for later update. If the partition is in a state that does not allow adds or
553+
// removes, then the function should return ConditionFailedError with the latest
554+
// metadata.
555+
type updateFunc func(ctx context.Context, idxCtx *Context, result *SearchResult) error
556+
557+
// searchForUpdateHelper searches for the best partition in which to add or from
558+
// which to remove a vector, and then calls the update function. If that returns
559+
// ConditionFailedError, then the partition does not allow adds or removes, so
560+
// searchForUpdateHelper tries again with another partition, and so on.
561+
// Eventually, the search will succeed, and searchForUpdateHelper returns a
562+
// search result containing the partition to update (or nil if no partition
563+
// can be found in the Delete case).
571564
//
572-
// There are two cases to consider, depending on whether we're inserting into a
573-
// root partition or a non-root partition:
565+
// There are two cases to consider, depending on whether we're updating a root
566+
// partition or a non-root partition:
574567
//
575-
// 1. Root partition: If the root partition does not allow inserts, then we
576-
// need to instead insert into one of its target partitions. However, there
577-
// are race conditions where those can be splitting in turn, in which case
578-
// we need to retry the search.
579-
// 2. Non-root partition: If the top search result does not allow inserts, then
568+
// 1. Root partition: If the root partition does not allow updates, then we
569+
// need to instead update one of its target partitions. However, there are
570+
// race conditions where those can be splitting in turn, in which case we
571+
// need to retry the search.
572+
// 2. Non-root partition: If the top search result does not allow updates, then
580573
// we try the next best result, and so on.
581574
//
582575
// Note that it's possible for retry to switch back and forth between #1 and #2,
583576
// if we're racing with levels being added to or removed from the tree.
584-
func (vi *Index) searchForInsertHelper(
585-
ctx context.Context, idxCtx *Context, fn insertFunc,
577+
func (vi *Index) searchForUpdateHelper(
578+
ctx context.Context, idxCtx *Context, fn updateFunc, deleteKey KeyBytes,
586579
) (*SearchResult, error) {
587-
const maxAttempts = 20
588-
idxCtx.tempSearchSet = SearchSet{MaxResults: vi.options.QualitySamples}
580+
const maxInsertAttempts = 16
581+
const maxDeleteAttempts = 3
582+
var maxAttempts int
583+
584+
idxCtx.tempSearchSet.Clear()
585+
idxCtx.tempSearchSet.MaxExtraResults = 0
586+
if idxCtx.forInsert {
587+
// Insert case, so get extra candidate partitions in case initial
588+
// candidates don't allow inserts.
589+
idxCtx.tempSearchSet.MaxResults = vi.options.QualitySamples
590+
idxCtx.tempSearchSet.MatchKey = nil
591+
maxAttempts = maxInsertAttempts
592+
} else {
593+
// Delete case, so just get 1 result per batch that matches the key.
594+
// Fetch another batch if first batch doesn't find the vector.
595+
idxCtx.tempSearchSet.MaxResults = 1
596+
idxCtx.tempSearchSet.MatchKey = deleteKey
597+
maxAttempts = maxDeleteAttempts
598+
}
589599
idxCtx.search.Init(vi, idxCtx, &idxCtx.tempSearchSet)
590600
var result *SearchResult
591601
var lastError error
592602

593-
// Loop until we find an insert partition or we've exhausted attempts.
603+
// Loop until we find a partition to update or we've exhausted attempts.
604+
// Each "next batch" operation and each updateFunc callback count as an
605+
// "attempt", since each is separately expensive to do.
594606
attempts := 0
595607
for attempts < maxAttempts {
608+
// Get next partition to check.
609+
result = idxCtx.tempSearchSet.PopBestResult()
596610
if result == nil {
597-
// Get next partition to check.
598-
result = idxCtx.tempSearchSet.PopBestResult()
599-
if result == nil {
600-
// Get next batch of results from the searcher.
601-
ok, err := idxCtx.search.Next(ctx)
602-
if err != nil {
603-
return nil, errors.Wrapf(err, "searching for insert partition")
604-
}
605-
if !ok {
606-
break
607-
}
608-
continue
611+
// Get next batch of results from the searcher.
612+
attempts++
613+
ok, err := idxCtx.search.Next(ctx)
614+
if err != nil {
615+
log.Infof(ctx, "error during update: %v\n", err)
616+
return nil, errors.Wrapf(err, "searching for partition to update")
617+
}
618+
if !ok {
619+
log.Infof(ctx, "could not find result: %v\n", result)
620+
break
609621
}
622+
continue
610623
}
611624

612625
// Check first result.
613626
attempts++
614627
err := fn(ctx, idxCtx, result)
615628
if err == nil {
616-
// This partition supports inserts, so done.
629+
// This partition supports updates, so done.
617630
break
618631
}
619632
lastError = errors.Wrapf(err, "checking result: %+v", result)
620633

621634
var errConditionFailed *ConditionFailedError
622635
if errors.Is(err, ErrRestartOperation) {
623636
// Redo search operation.
624-
log.VEventf(ctx, 2, "restarting search for insert operation: %v", err)
625-
return vi.searchForInsertHelper(ctx, idxCtx, fn)
637+
log.VEventf(ctx, 2, "restarting search for update operation: %v", err)
638+
return vi.searchForUpdateHelper(ctx, idxCtx, fn, deleteKey)
626639
} else if errors.As(err, &errConditionFailed) {
627-
// This partition does not allow adds or removes, so fallback to a
628-
// target partition.
629-
partitionKey := result.ChildKey.PartitionKey
630-
metadata := errConditionFailed.Actual
631-
err = vi.fallbackOnTargets(ctx, idxCtx, &idxCtx.tempSearchSet,
632-
partitionKey, idxCtx.randomized, metadata.StateDetails)
633-
if err != nil {
634-
return nil, err
640+
// This partition does not allow updates, so fallback to a target
641+
// partition if this is an insert operation. This is not necessary in
642+
// the delete case; it's OK if we end up leaving a dangling vector.
643+
if idxCtx.forInsert {
644+
partitionKey := result.ChildKey.PartitionKey
645+
metadata := errConditionFailed.Actual
646+
err = vi.fallbackOnTargets(ctx, idxCtx, &idxCtx.tempSearchSet,
647+
partitionKey, idxCtx.randomized, metadata.StateDetails)
648+
if err != nil {
649+
return nil, err
650+
}
635651
}
636652
} else if errors.Is(err, ErrPartitionNotFound) {
637653
// This partition does not exist, so try next partition. This can happen
@@ -641,23 +657,30 @@ func (vi *Index) searchForInsertHelper(
641657
} else {
642658
return nil, lastError
643659
}
660+
}
644661

645-
// Fetch next result.
646-
result = nil
662+
if idxCtx.forDelete {
663+
// Don't perform the inconsistent scan for the delete case, since we've
664+
// already scanned the partition in order to find the vector to delete and
665+
// enqueued any needed split/merge fixup.
666+
return result, nil
647667
}
648668

649669
if result == nil {
650-
return nil, errors.Wrapf(lastError,
670+
// Inserts are expected to find a partition.
671+
err := errors.Errorf(
651672
"search failed to find a partition (level=%d) that allows inserts", idxCtx.level)
673+
return nil, errors.CombineErrors(err, lastError)
652674
}
653675

654676
// Do an inconsistent scan of the partition to see if it might be ready to
655-
// split. This is necessary because the search does not scan leaf partitions.
656-
// Unless we do this, we may never realize the partition is oversized.
677+
// split or merge. This is necessary because the search does not scan leaf
678+
// partitions. Unless we do this, we may never realize the partition is
679+
// oversized or undersized.
657680
// NOTE: The scan is not performed in the scope of the current transaction,
658681
// so it will not reflect the effects of this operation. That's OK, since it's
659-
// not necessary to split at exactly the point where the partition becomes
660-
// oversized.
682+
// not necessary to split or merge at exactly the point where the partition
683+
// becomes oversized or undersized.
661684
partitionKey := result.ChildKey.PartitionKey
662685
count, err := vi.store.EstimatePartitionCount(ctx, idxCtx.treeKey, partitionKey)
663686
if err != nil {
@@ -716,7 +739,7 @@ func (vi *Index) fallbackOnTargets(
716739

717740
// TODO(andyk): Add support for DrainingForMergeState.
718741
return errors.AssertionFailedf(
719-
"expected state that disallows adds/removes, not %s", state.String())
742+
"expected state that disallows updates, not %s", state.String())
720743
}
721744

722745
// addToPartition calls the store to add the given vector to an existing

pkg/sql/vecindex/cspann/testdata/search-for-delete.ddt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# ----------------------------------------------------------------------
22
# Search tree with multiple partitions and duplicate data.
33
# ----------------------------------------------------------------------
4-
new-index dims=2 min-partition-size=1 max-partition-size=4 beam-size=2
4+
new-index dims=2 min-partition-size=1 max-partition-size=4 beam-size=1
55
vec1: (1, 2)
66
vec2: (7, 4)
77
vec3: (4, 3)
@@ -68,6 +68,12 @@ vec100: (1, 2)
6868
----
6969
vec100: vector not found
7070

71+
# Search for vector with wrong value, which forces internal retry.
72+
search-for-delete
73+
vec1:(0, 9)
74+
----
75+
vec1: partition 7
76+
7177
# Search for duplicate vector.
7278
search-for-delete
7379
vec12

0 commit comments

Comments
 (0)