Skip to content

Commit e60506c

Browse files
committed
cspann: add ChildKey de-duplicator
Add a new de-duplicator class for ChildKeys. Duplicate ChildKeys are going to become much more common with non-transactional fixups. Add an O(N) implementation that minimizes allocations by avoiding hashing KeyBytes values as strings. Instead, it hashes KeyBytes into a uint64 value, which is then hashed using a regular Go map. uint64 collisions are handled by rehashing. Use the new de-duper in the index. In a later PR, it will be used directly in SearchSet as well to detect duplicates as early as possible. Epic: CRDB-42943 Release note: None
1 parent 32facf3 commit e60506c

File tree

14 files changed

+410
-154
lines changed

14 files changed

+410
-154
lines changed

pkg/sql/vecindex/cspann/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ filegroup(
1111
go_library(
1212
name = "cspann",
1313
srcs = [
14+
"childkey_dedup.go",
1415
"cspannpb.go",
1516
"fixup_processor.go",
1617
"fixup_split.go",
@@ -49,6 +50,7 @@ go_library(
4950
go_test(
5051
name = "cspann_test",
5152
srcs = [
53+
"childkey_dedup_test.go",
5254
"cspannpb_test.go",
5355
"fixup_processor_test.go",
5456
"fixup_worker_test.go",
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package cspann
7+
8+
import (
9+
"bytes"
10+
"hash/maphash"
11+
12+
"github.com/cockroachdb/errors"
13+
)
14+
15+
// hashKeyFunc is a function type for hashing KeyBytes.
16+
type hashKeyFunc func(KeyBytes) uint64
17+
18+
// childKeyDeDup provides de-duplication for ChildKey values. It supports both
19+
// PartitionKey and KeyBytes child keys efficiently without making unnecessary
20+
// allocations.
21+
type childKeyDeDup struct {
22+
// initialCapacity is used to initialize the size of the data structures used
23+
// by the de-duplicator.
24+
initialCapacity int
25+
26+
// partitionKeys is used for PartitionKey deduplication.
27+
partitionKeys map[PartitionKey]struct{}
28+
29+
// keyBytesMap maps from a KeyBytes hash to the actual KeyBytes for direct
30+
// lookup and deduplication.
31+
keyBytesMap map[uint64]KeyBytes
32+
33+
// seed pseudo-randomizes the hash function used by the de-duplicator.
34+
seed maphash.Seed
35+
36+
// hashKeyBytes is the function used to hash KeyBytes. This is primarily used
37+
// for testing to override the default hash function.
38+
hashKeyBytes hashKeyFunc
39+
}
40+
41+
// Init initializes the de-duplicator.
42+
func (dd *childKeyDeDup) Init(capacity int) {
43+
dd.initialCapacity = capacity
44+
dd.seed = maphash.MakeSeed()
45+
dd.hashKeyBytes = dd.defaultHashKeyBytes
46+
dd.Clear()
47+
}
48+
49+
// TryAdd attempts to add a child key to the deduplication set. It returns true
50+
// if the key was added (wasn't a duplicate), or false if the key already exists
51+
// (is a duplicate).
52+
func (dd *childKeyDeDup) TryAdd(childKey ChildKey) bool {
53+
// Handle PartitionKey case - simple map lookup.
54+
if childKey.PartitionKey != 0 {
55+
// Lazily initialize the partitionKeys map.
56+
if dd.partitionKeys == nil {
57+
dd.partitionKeys = make(map[PartitionKey]struct{}, dd.initialCapacity)
58+
}
59+
60+
if _, exists := dd.partitionKeys[childKey.PartitionKey]; exists {
61+
return false
62+
}
63+
dd.partitionKeys[childKey.PartitionKey] = struct{}{}
64+
return true
65+
}
66+
67+
// Handle KeyBytes case. Lazily initialize the KeyBytes structures.
68+
if dd.keyBytesMap == nil {
69+
dd.keyBytesMap = make(map[uint64]KeyBytes, dd.initialCapacity)
70+
}
71+
72+
// Calculate original hash for the key bytes.
73+
hash := dd.hashKeyBytes(childKey.KeyBytes)
74+
75+
// Check for the key, possibly having to look at multiple rehashed slots.
76+
iterations := 0
77+
for {
78+
existingKey, exists := dd.keyBytesMap[hash]
79+
if !exists {
80+
// No collision, we can use this hash.
81+
break
82+
}
83+
84+
// Check if this is the same key.
85+
if bytes.Equal(existingKey, childKey.KeyBytes) {
86+
return false
87+
}
88+
89+
// Hash collision, rehash to find a new slot.
90+
hash = dd.rehash(hash)
91+
92+
iterations++
93+
if iterations >= 100000 {
94+
// We must have hit a cycle, which should never happen.
95+
panic(errors.AssertionFailedf("rehash function cycled"))
96+
}
97+
}
98+
99+
// Add the key to the map.
100+
dd.keyBytesMap[hash] = childKey.KeyBytes
101+
return true
102+
}
103+
104+
// Clear removes all entries from the deduplication set.
105+
func (dd *childKeyDeDup) Clear() {
106+
// Reset all the data structures.
107+
clear(dd.partitionKeys)
108+
clear(dd.keyBytesMap)
109+
}
110+
111+
// defaultHashKeyBytes is the default implementation of hashKeyBytes.
112+
func (dd *childKeyDeDup) defaultHashKeyBytes(key KeyBytes) uint64 {
113+
return maphash.Bytes(dd.seed, key)
114+
}
115+
116+
// rehash creates a new hash from an existing hash to resolve collisions.
117+
func (dd *childKeyDeDup) rehash(hash uint64) uint64 {
118+
// These constants are large 64-bit primes.
119+
hash ^= 0xc3a5c85c97cb3127
120+
hash ^= hash >> 33
121+
hash *= 0xff51afd7ed558ccd
122+
hash ^= hash >> 33
123+
hash *= 0xc4ceb9fe1a85ec53
124+
hash ^= hash >> 33
125+
return hash
126+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package cspann
7+
8+
import (
9+
"hash/maphash"
10+
"strconv"
11+
"testing"
12+
13+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
14+
"github.com/cockroachdb/cockroach/pkg/util/log"
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
func TestChildKeyDeDupAddPartitionKey(t *testing.T) {
19+
defer leaktest.AfterTest(t)()
20+
defer log.Scope(t).Close(t)
21+
22+
var dd childKeyDeDup
23+
dd.Init(10)
24+
25+
// Add a new PartitionKey.
26+
added := dd.TryAdd(ChildKey{PartitionKey: 123})
27+
require.True(t, added)
28+
29+
// Try to add the same key again (should be a duplicate).
30+
added = dd.TryAdd(ChildKey{PartitionKey: 123})
31+
require.False(t, added)
32+
33+
// Add a different PartitionKey.
34+
added = dd.TryAdd(ChildKey{PartitionKey: 456})
35+
require.True(t, added)
36+
}
37+
38+
func TestChildKeyDeDupAddKeyBytes(t *testing.T) {
39+
defer leaktest.AfterTest(t)()
40+
defer log.Scope(t).Close(t)
41+
42+
var dd childKeyDeDup
43+
dd.Init(10)
44+
45+
// Add a new KeyBytes.
46+
key1 := []byte("key1")
47+
added := dd.TryAdd(ChildKey{KeyBytes: key1})
48+
require.True(t, added)
49+
50+
// Try to add the same key again (should be a duplicate).
51+
added = dd.TryAdd(ChildKey{KeyBytes: key1})
52+
require.False(t, added)
53+
54+
// Add a different KeyBytes.
55+
key2 := []byte("key2")
56+
added = dd.TryAdd(ChildKey{KeyBytes: key2})
57+
require.True(t, added)
58+
59+
// Verify that both keys are properly stored by checking for duplicates.
60+
added = dd.TryAdd(ChildKey{KeyBytes: key1})
61+
require.False(t, added, "Key1 should be detected as duplicate.")
62+
added = dd.TryAdd(ChildKey{KeyBytes: key2})
63+
require.False(t, added, "Key2 should be detected as duplicate.")
64+
}
65+
66+
func TestChildKeyDeDupClear(t *testing.T) {
67+
defer leaktest.AfterTest(t)()
68+
defer log.Scope(t).Close(t)
69+
70+
var dd childKeyDeDup
71+
dd.Init(10)
72+
73+
// Add a mix of keys.
74+
require.True(t, dd.TryAdd(ChildKey{PartitionKey: 123}))
75+
require.True(t, dd.TryAdd(ChildKey{KeyBytes: []byte("key1")}))
76+
77+
// Clear the deduplicator.
78+
dd.Clear()
79+
80+
// Verify keys were cleared.
81+
require.True(t, dd.TryAdd(ChildKey{PartitionKey: 123}))
82+
require.True(t, dd.TryAdd(ChildKey{KeyBytes: []byte("key1")}))
83+
}
84+
85+
func TestChildKeyDeDupRehashing(t *testing.T) {
86+
defer leaktest.AfterTest(t)()
87+
defer log.Scope(t).Close(t)
88+
89+
// This test simulates a scenario with many keys and random collisions.
90+
91+
// Create a custom hash function that has a small range, forcing collisions
92+
// but making it more realistic than the previous test.
93+
seed := maphash.MakeSeed()
94+
customHashFunc := func(key KeyBytes) uint64 {
95+
// Use only 10 possible hash values.
96+
return maphash.Bytes(seed, key)
97+
}
98+
99+
var dd childKeyDeDup
100+
dd.Init(100)
101+
dd.hashKeyBytes = customHashFunc
102+
103+
// Create 50 keys, which have guaranteed collisions with only 10 possible
104+
// hash values.
105+
const numKeys = 50
106+
keys := make([]KeyBytes, 0, numKeys)
107+
for i := 0; i < numKeys; i++ {
108+
key := []byte("test-key-" + strconv.Itoa(i))
109+
keys = append(keys, key)
110+
}
111+
112+
// Add all keys.
113+
for _, key := range keys {
114+
require.True(t, dd.TryAdd(ChildKey{KeyBytes: key}))
115+
}
116+
117+
// Verify all keys are findable.
118+
for _, key := range keys {
119+
require.False(t, dd.TryAdd(ChildKey{KeyBytes: key}))
120+
}
121+
}

pkg/sql/vecindex/cspann/fixup_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ func (fw *fixupWorker) linkNearbyVectors(
639639
defer fw.workspace.FreeVector(tempVector)
640640

641641
// Filter the results.
642-
results := idxCtx.tempSearchSet.PopUnsortedResults()
642+
results := idxCtx.tempSearchSet.PopResults()
643643
for i := range results {
644644
result := &results[i]
645645

pkg/sql/vecindex/cspann/index.go

Lines changed: 6 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"math"
1212
"math/rand"
1313
"runtime"
14-
"slices"
1514
"strconv"
1615
"strings"
1716

@@ -336,9 +335,8 @@ func (vi *Index) Close() {
336335
// NOTE: This can result in two vectors with the same primary key being inserted
337336
// into the index. To minimize this possibility, callers should call Delete
338337
// before Insert when a vector is updated. Even then, it's not guaranteed that
339-
// Delete will find the old vector. Vector index methods handle this rare case
340-
// by checking for duplicates when returning search results. For details, see
341-
// Index.pruneDuplicates.
338+
// Delete will find the old vector. The search set handles this rare case by
339+
// filtering out results with duplicate key bytes.
342340
func (vi *Index) Insert(
343341
ctx context.Context, idxCtx *Context, treeKey TreeKey, vec vector.T, key KeyBytes,
344342
) error {
@@ -456,7 +454,7 @@ func (vi *Index) SearchForDelete(
456454
if err != nil {
457455
return nil, err
458456
}
459-
results := idxCtx.tempSearchSet.PopUnsortedResults()
457+
results := idxCtx.tempSearchSet.PopResults()
460458
if len(results) == 0 {
461459
// Retry search with significantly higher beam size.
462460
baseBeamSize *= 8
@@ -564,7 +562,7 @@ func (vi *Index) searchForInsertHelper(
564562
if err != nil {
565563
return nil, err
566564
}
567-
results := idxCtx.tempSearchSet.PopUnsortedResults()
565+
results := idxCtx.tempSearchSet.PopResults()
568566
if len(results) != 1 {
569567
return nil, errors.AssertionFailedf(
570568
"SearchForInsert should return exactly one result, got %d", len(results))
@@ -658,7 +656,7 @@ func (vi *Index) searchHelper(ctx context.Context, idxCtx *Context, searchSet *S
658656
}
659657

660658
for {
661-
results := subSearchSet.PopUnsortedResults()
659+
results := subSearchSet.PopResults()
662660
if len(results) == 0 && searchLevel > LeafLevel {
663661
// This should never happen, as it means that interior partition(s)
664662
// have no children. The vector deletion logic should prevent that.
@@ -668,10 +666,6 @@ func (vi *Index) searchHelper(ctx context.Context, idxCtx *Context, searchSet *S
668666

669667
var zscore float64
670668
if searchLevel > LeafLevel {
671-
// Results need to be sorted in order to calculate their "spread". This
672-
// also sorts them for determining which partitions to search next.
673-
results.Sort()
674-
675669
// Compute the Z-score of the candidate list if there are enough
676670
// samples. Otherwise, use the default Z-score of 0.
677671
if len(results) >= vi.options.QualitySamples {
@@ -695,7 +689,6 @@ func (vi *Index) searchHelper(ctx context.Context, idxCtx *Context, searchSet *S
695689
// Aggregate all stats from searching lower levels of the tree.
696690
searchSet.Stats.Add(&subSearchSet.Stats)
697691

698-
results = vi.pruneDuplicates(results)
699692
if !idxCtx.options.SkipRerank || idxCtx.options.ReturnVectors {
700693
// Re-rank search results with full vectors.
701694
searchSet.Stats.FullVectorCount += len(results)
@@ -803,7 +796,7 @@ func (vi *Index) searchChildPartitions(
803796
// If one of the searched partitions has only 1 vector remaining, do not
804797
// return that vector when "ignoreLonelyVector" is true.
805798
if idxCtx.ignoreLonelyVector && idxCtx.level == level && count == 1 {
806-
searchSet.RemoveResults(parentResults[i].ChildKey.PartitionKey)
799+
searchSet.RemoveByParent(parentResults[i].ChildKey.PartitionKey)
807800
}
808801

809802
// Enqueue background fixup if a split or merge operation needs to be
@@ -825,38 +818,6 @@ func (vi *Index) searchChildPartitions(
825818
return level, nil
826819
}
827820

828-
// pruneDuplicates removes candidates with duplicate child keys. This is rare,
829-
// but it can happen when a vector updated in the primary index cannot be
830-
// located in the secondary index.
831-
// NOTE: This logic will reorder the candidates slice.
832-
// NOTE: This logic can remove the "wrong" duplicate, with a quantized distance
833-
// that doesn't correspond to the true distance. However, this has no impact as
834-
// long as we rerank candidates using the original full-size vectors. Even if
835-
// we're not reranking, the impact of this should be minimal, since duplicates
836-
// are so rare and there's already quite a bit of inaccuracy when not reranking.
837-
func (vi *Index) pruneDuplicates(candidates []SearchResult) []SearchResult {
838-
if len(candidates) <= 1 {
839-
// No possibility of duplicates.
840-
return candidates
841-
}
842-
843-
if candidates[0].ChildKey.KeyBytes == nil {
844-
// Only leaf partitions can have duplicates.
845-
return candidates
846-
}
847-
848-
// TODO DURING REVIEW: this is O(n * log(n)) instead of O(n) like the previous
849-
// code, but is probably faster in practice for small values of n because it
850-
// is allocation free. It is also cleaner and easier to understand. Choose an
851-
// approach.
852-
slices.SortFunc(candidates, func(a, b SearchResult) int {
853-
return bytes.Compare(a.ChildKey.KeyBytes, b.ChildKey.KeyBytes)
854-
})
855-
return slices.CompactFunc(candidates, func(a, b SearchResult) bool {
856-
return bytes.Equal(a.ChildKey.KeyBytes, b.ChildKey.KeyBytes)
857-
})
858-
}
859-
860821
// rerankSearchResults updates the given set of candidates with their exact
861822
// distances from the query vector. It does this by fetching the original full
862823
// size vectors from the store, in order to re-rank the top candidates for

0 commit comments

Comments
 (0)