Skip to content

Commit c8605bb

Browse files
committed
cspann: fix race conditions in index split operation
This commit fixes two race conditions in the index split operation: 1. After setting the state of the left sub-partition to Ready, say that the split unexpectedly fails. Now say that the left sub- partition itself splits and is deleted. When the original split resumes, it will not be able to get the centroid for the left sub-partition, which is needed to run the K-means clustering algorithm. 2. As described by #1, it's possible that a splitting partition references target sub-partitions that are now missing from the index. This will trigger PartitionNotFound errors in insert code paths. The fixes are: 1. Update the logic so that vectors are first copied to the left and right sub-partitions before either sub-partition's state is updated from Updating to Ready. Only Ready sub-partitions can be split, so this should prevent race condition #1. 2. Update the insert logic so that searches of non-root partitions return multiple results, to make it extremely likely that a suitable insert partition will be found. For root partitions, check split target sub-partitions instead, since the splitting partition does not share a parent with its sub-partitions. Epic: CRDB-42943 Release note: None
1 parent 2733799 commit c8605bb

File tree

4 files changed

+323
-145
lines changed

4 files changed

+323
-145
lines changed

pkg/sql/vecindex/cspann/fixup_split.go

Lines changed: 43 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ import (
3939
// but instead are "forwarded" to the closest target sub-partition.
4040
// 7. Reload the splitting partition's vectors and copy the "left" subset to
4141
// the left sub-partition.
42-
// 8. Update the left sub-partition's state from Updating to Ready.
43-
// 9. Copy the "right" subset of vectors to the right sub-partition.
44-
// 10. Update the right sub-partition's state from Updating to Ready. At this
42+
// 8. Copy the "right" subset of vectors to the right sub-partition. At this
4543
// point, the splitting vectors are duplicated in the index. Any searches
4644
// will filter out duplicates.
45+
// 9. Update the left sub-partition's state from Updating to Ready.
46+
// 10. Update the right sub-partition's state from Updating to Ready.
4747
// 11. Remove the splitting partition from its parent. The duplicates are no
4848
// longer visible to searches.
4949
// 12. Delete the splitting partition from the index.
@@ -227,10 +227,30 @@ func (fw *fixupWorker) splitPartition(
227227
return err
228228
}
229229

230-
// Add vectors to nearest partition.
231-
err = fw.copyToSplitSubPartitions(ctx, partition, vectors, leftMetadata, rightMetadata)
232-
if err != nil {
233-
return err
230+
// If still updating the sub-partitions, then distribute vectors among them.
231+
if leftMetadata.StateDetails.State == UpdatingState {
232+
err = fw.copyToSplitSubPartitions(ctx, partition, vectors, leftMetadata, rightMetadata)
233+
if err != nil {
234+
return err
235+
}
236+
}
237+
238+
// Update sub-partition states from Updating to Ready.
239+
if leftMetadata.StateDetails.State == UpdatingState {
240+
expected := leftMetadata
241+
leftMetadata.StateDetails = MakeReadyDetails()
242+
err = fw.updateMetadata(ctx, leftPartitionKey, leftMetadata, expected)
243+
if err != nil {
244+
return err
245+
}
246+
}
247+
if rightMetadata.StateDetails.State == UpdatingState {
248+
expected := rightMetadata
249+
rightMetadata.StateDetails = MakeReadyDetails()
250+
err = fw.updateMetadata(ctx, rightPartitionKey, rightMetadata, expected)
251+
if err != nil {
252+
return err
253+
}
234254
}
235255

236256
// Check whether the splitting partition is the root.
@@ -615,8 +635,7 @@ func (fw *fixupWorker) deletePartition(
615635
}
616636

617637
// copyToSplitSubPartitions copies the given set of vectors to left and right
618-
// sub-partitions, based on which centroid they're closer to. It also updates
619-
// the state of each sub-partition from Updating to Ready.
638+
// sub-partitions, based on which centroid they're closer to.
620639
func (fw *fixupWorker) copyToSplitSubPartitions(
621640
ctx context.Context,
622641
sourcePartition *Partition,
@@ -659,57 +678,25 @@ func (fw *fixupWorker) copyToSplitSubPartitions(
659678
// transactional; if an error occurs, any vectors already added may not be
660679
// rolled back. This is OK, since the vectors are still present in the
661680
// source partition.
662-
if leftMetadata.StateDetails.State == UpdatingState {
663-
leftPartitionKey := sourceState.Target1
664-
err = fw.copyVectorsToSubPartition(ctx,
665-
leftPartitionKey, leftMetadata, leftVectors, leftChildKeys, leftValueBytes)
666-
if err != nil {
667-
return err
668-
}
669-
}
670-
if rightMetadata.StateDetails.State == UpdatingState {
671-
if sourcePartition.Level() != LeafLevel && vectors.Count == 1 {
672-
// This should have been a merge, not a split, but we're too far into the
673-
// split operation to back out now, so avoid an empty non-root partition by
674-
// duplicating the last remaining vector in both partitions.
675-
rightVectors = leftVectors
676-
rightChildKeys = leftChildKeys
677-
rightValueBytes = leftValueBytes
678-
}
679-
680-
rightPartitionKey := sourceState.Target2
681-
err = fw.copyVectorsToSubPartition(ctx,
682-
rightPartitionKey, rightMetadata, rightVectors, rightChildKeys, rightValueBytes)
683-
if err != nil {
684-
return err
685-
}
686-
}
687-
688-
return nil
689-
}
690-
691-
// copyVectorsToSubPartition copies the given set of vectors, along with
692-
// associated keys and values, to a split sub-partition with the given key. The
693-
// vectors will only be added if the partition's metadata matches the expected
694-
// value. It also updates the state of the partition from Updating to Ready.
695-
func (fw *fixupWorker) copyVectorsToSubPartition(
696-
ctx context.Context,
697-
partitionKey PartitionKey,
698-
metadata PartitionMetadata,
699-
vectors vector.Set,
700-
childKeys []ChildKey,
701-
valueBytes []ValueBytes,
702-
) error {
703-
// Add vectors to sub-partition, as long as metadata matches.
704-
err := fw.addToPartition(ctx, partitionKey, vectors, childKeys, valueBytes, metadata)
681+
leftPartitionKey := sourceState.Target1
682+
err = fw.addToPartition(ctx,
683+
leftPartitionKey, leftVectors, leftChildKeys, leftValueBytes, leftMetadata)
705684
if err != nil {
706685
return err
707686
}
708687

709-
// Update partition state from Updating to Ready.
710-
expected := metadata
711-
metadata.StateDetails = MakeReadyDetails()
712-
err = fw.updateMetadata(ctx, partitionKey, metadata, expected)
688+
if sourcePartition.Level() != LeafLevel && vectors.Count == 1 {
689+
// This should have been a merge, not a split, but we're too far into the
690+
// split operation to back out now, so avoid an empty non-root partition by
691+
// duplicating the last remaining vector in both partitions.
692+
rightVectors = leftVectors
693+
rightChildKeys = leftChildKeys
694+
rightValueBytes = leftValueBytes
695+
}
696+
697+
rightPartitionKey := sourceState.Target2
698+
err = fw.addToPartition(ctx,
699+
rightPartitionKey, rightVectors, rightChildKeys, rightValueBytes, rightMetadata)
713700
if err != nil {
714701
return err
715702
}

pkg/sql/vecindex/cspann/index.go

Lines changed: 103 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -579,46 +579,107 @@ type insertFunc func(ctx context.Context, idxCtx *Context, result *SearchResult)
579579
// tries again with another partition, and so on. Eventually, the search will
580580
// succeed, and searchForInsertHelper returns a search result containing the
581581
// insert partition (never nil).
582+
//
583+
// There are two cases to consider, depending on whether we're inserting into a
584+
// root partition or a non-root partition:
585+
//
586+
// 1. Root partition: If the root partition does not allow inserts, then we
587+
// need to instead insert into one of its target partitions. However, there
588+
// are race conditions where those can be splitting in turn, in which case
589+
// we need to retry the search.
590+
// 2. Non-root partition: If the top search result does not allow inserts, then
591+
// we need to try a different result. We start by fetching 2 results, so
592+
// that there's a backup. If both results do not allow inserts, we widen the
593+
// search to 4 results, then to 8, and so on (up to 32, upon which we return
594+
// an error).
595+
//
596+
// Note that it's possible for retry to switch back and forth between #1 and #2,
597+
// if we're racing with levels being added to or removed from the tree.
582598
func (vi *Index) searchForInsertHelper(
583599
ctx context.Context, idxCtx *Context, fn insertFunc,
584600
) (*SearchResult, error) {
585-
// In most cases, the top result is the best insert partition. However, if
586-
// that partition does not allow inserts, we need to fall back on a target
587-
// paritition.
588-
idxCtx.tempSearchSet = SearchSet{MaxResults: 1}
589-
err := vi.searchHelper(ctx, idxCtx, &idxCtx.tempSearchSet)
590-
if err != nil {
591-
return nil, err
592-
}
593-
results := idxCtx.tempSearchSet.PopResults()
594-
if len(results) != 1 {
595-
return nil, errors.AssertionFailedf(
596-
"SearchForInsert should return exactly one result, got %d", len(results))
597-
}
598-
result := &results[0]
599-
600-
// Loop until we find an insert partition.
601-
var metadata PartitionMetadata
601+
// Loop until we find an insert partition. Fetch two candidate partitions to
602+
// start so that we rarely need to re-search, even in the case where the first
603+
// candidate partition does not allow inserts.
604+
var original, results []SearchResult
605+
var seen map[PartitionKey]struct{}
606+
maxResults := 2
607+
allowRetry := true
602608
for {
603-
err = fn(ctx, idxCtx, result)
609+
// If there are no more results, get more now.
610+
if len(results) == 0 {
611+
if !allowRetry || maxResults > 32 {
612+
// This is an extreme edge case where none of the partitions we've
613+
// checked allow inserts.
614+
// TODO(andyk): Do we need to do something better here?
615+
return nil, errors.Newf(
616+
"search failed to find a partition (level=%d) that allows inserts: %v",
617+
idxCtx.level, original)
618+
}
619+
620+
// In most cases, the top result is the best insert partition. However, if
621+
// that partition does not allow inserts, we need to fall back to another
622+
// partition. Set MaxResults in the same way as searchHelper does.
623+
idxCtx.tempSearchSet = SearchSet{MaxResults: maxResults}
624+
err := vi.searchHelper(ctx, idxCtx, &idxCtx.tempSearchSet)
625+
if err != nil {
626+
return nil, err
627+
}
628+
original = idxCtx.tempSearchSet.PopResults()
629+
if len(original) < 1 {
630+
return nil, errors.AssertionFailedf("SearchForInsert should return at least one result")
631+
}
632+
results = original
633+
allowRetry = false
634+
}
635+
636+
// Check first result.
637+
partitionKey := results[0].ChildKey.PartitionKey
638+
if seen != nil {
639+
// Don't re-check partitions we've already checked.
640+
if _, ok := seen[partitionKey]; ok {
641+
results = results[1:]
642+
continue
643+
}
644+
}
645+
646+
// Allow retry since there's at least one result we haven't yet seen.
647+
allowRetry = true
648+
649+
err := fn(ctx, idxCtx, &results[0])
604650
if err == nil {
605-
// Partition supports inserts.
651+
// This partition supports inserts, so done.
606652
break
607653
}
608654

609655
var errConditionFailed *ConditionFailedError
610656
if errors.Is(err, ErrRestartOperation) {
611-
// Entire operation needs to be restarted.
612-
return vi.searchForInsertHelper(ctx, idxCtx, fn)
657+
// Redo search operation.
658+
results = results[:0]
659+
continue
613660
} else if errors.As(err, &errConditionFailed) {
614-
// This partition does not allow adds or removes, so fallback on a
615-
// target partition.
616-
metadata = errConditionFailed.Actual
617-
partitionKey := results[0].ChildKey.PartitionKey
618-
result, err = vi.fallbackOnTargets(ctx, idxCtx, partitionKey,
619-
idxCtx.randomized, metadata.StateDetails, results)
620-
if err != nil {
621-
return nil, err
661+
// This partition does not allow adds or removes, so fallback to
662+
// another partition.
663+
if partitionKey == RootKey {
664+
// This is the root partition, so fallback to its target partitions.
665+
metadata := errConditionFailed.Actual
666+
results, err = vi.fallbackOnTargets(ctx, idxCtx,
667+
partitionKey, idxCtx.randomized, metadata.StateDetails, results)
668+
if err != nil {
669+
return nil, err
670+
}
671+
} else {
672+
// This is a non-root partition, so check the next partition in the
673+
// result list. If there are no more partitions to check, then expand
674+
// the search to a wider set of partitions by increasing MaxResults.
675+
results = results[1:]
676+
if len(results) == 0 {
677+
if seen == nil {
678+
seen = make(map[PartitionKey]struct{})
679+
}
680+
seen[partitionKey] = struct{}{}
681+
maxResults *= 2
682+
}
622683
}
623684
} else {
624685
return nil, err
@@ -642,24 +703,22 @@ func (vi *Index) searchForInsertHelper(
642703
results[0].ParentPartitionKey, partitionKey, false /* singleStep */)
643704
}
644705

645-
return result, nil
706+
return &results[0], nil
646707
}
647708

648709
// fallbackOnTargets is called when none of the partitions returned by a search
649710
// allow inserting a vector, because they are in a Draining state. Instead, the
650711
// search needs to continue with the target partitions of the split (or merge).
651-
// fallbackOnTargets returns a search result for the target that's closest to
652-
// the query vector.
653-
// NOTE: "tempResults" is reused within this method and a pointer to one of its
654-
// entries is returned as the best result.
712+
// fallbackOnTargets returns an ordered list of search results for the targets.
713+
// NOTE: "tempResults" is overwritten within this method by results.
655714
func (vi *Index) fallbackOnTargets(
656715
ctx context.Context,
657716
idxCtx *Context,
658717
partitionKey PartitionKey,
659718
vec vector.T,
660719
state PartitionStateDetails,
661720
tempResults []SearchResult,
662-
) (*SearchResult, error) {
721+
) ([]SearchResult, error) {
663722
if state.State == DrainingForSplitState {
664723
// Synthesize one search result for each split target partition to pass
665724
// to getFullVectors.
@@ -677,7 +736,9 @@ func (vi *Index) fallbackOnTargets(
677736
var err error
678737
tempResults, err = vi.getFullVectors(ctx, idxCtx, tempResults)
679738
if err != nil {
680-
return nil, err
739+
return nil, errors.Wrapf(err,
740+
"fetching centroids for target partitions %d and %d, for splitting partition %d",
741+
state.Target1, state.Target2, partitionKey)
681742
}
682743
if len(tempResults) != 2 {
683744
return nil, errors.AssertionFailedf(
@@ -691,10 +752,13 @@ func (vi *Index) fallbackOnTargets(
691752
tempResults[0].QuerySquaredDistance = dist1
692753
tempResults[1].QuerySquaredDistance = dist2
693754

694-
if dist1 < dist2 {
695-
return &tempResults[0], nil
755+
if dist1 > dist2 {
756+
// Swap results.
757+
tempResult := tempResults[0]
758+
tempResults[0] = tempResults[1]
759+
tempResults[1] = tempResult
696760
}
697-
return &tempResults[1], nil
761+
return tempResults, nil
698762
}
699763

700764
// TODO(andyk): Add support for DrainingForMergeState.

0 commit comments

Comments
 (0)