Skip to content

Commit ae06933

Browse files
craig[bot]andy-kimball
andcommitted
Merge #144360
144360: cspann: convert search logic to use pull iteration r=mw5h a=andy-kimball This PR fixes multiple issues that were causing the `TestIndexConcurrency` test to fail. That test was no longer working after the change to create a new non-transactional background split fixup. Fixes: #143941 #### cspann: convert search logic to use pull iteration Using non-transactional splits breaks certain assumptions that the C-SPANN library relies upon: * The K-means tree must be fully balanced - interior partitions can now be empty. * Partition child keys are never duplicated - splitting can duplicates child partition keys, which can persist in the tree. * Partition child keys always reference existing partitions - partition child keys can now reference missing partitions. * Inserts will always trivially find an insertion partition - now, it's possible to get "blocked" trying to find a path to a target partition that supports inserts. The blockage could be in the form of an empty interior partition, a dangling child partition key, or a target partition that does not allow inserts. To address these issues, this commit converts the existing search logic to use "pull" iteration. Code that needs to search the tree can iteratively get the next batch of results, and the next after that, and so on, without knowing up front exactly how many results are needed. Each batch is sorted by distance, with duplicates removed. However, batches are not strictly ordered in relation to one another and duplicates can exist across batches (though each subsequent batch does tend to have greater distances). Pull iteration largely solves the issues noted above. For example, the insert operation can pull one result at a time. If that partition does not support inserts, it can pull the next. If a batch is empty due to hitting a "dead end" in tree traversal, it can just pull the next batch. Epic: CRDB-42943 Release note: None #### cspann: update C-SPANN index code to tolerate missing partitions Previously, when index operations like search or insert came across a missing partition, they failed with an error, as this should never be possible. However, with non-transactional splits, it's now acceptable for partitions to be missing. This commit changes the index code to tolerate this condition. It also updates transactional Store methods to support the new semantics. Epic: CRDB-42943 Release note: None #### vecindex: remove Begin/Commit/Abort Transaction methods Remove the BeginTransaction, CommitTransaction, and AbortTransaction methods from the cspann.Store interface. These methods are superseded by the RunTransaction method. Fix up all test usages of any of these methods to use a new commontest.RunTransaction helper function. Epic: CRDB-42943 Release note: None Co-authored-by: Andrew Kimball <[email protected]>
2 parents 3d3db75 + 38f032a commit ae06933

30 files changed

+1862
-1443
lines changed

pkg/cmd/vecbench/mem_provider.go

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -128,34 +128,27 @@ func (m *MemProvider) InsertVectors(
128128
func (m *MemProvider) Search(
129129
ctx context.Context, vec vector.T, maxResults int, beamSize int, stats *cspann.SearchStats,
130130
) (keys []cspann.KeyBytes, err error) {
131-
var txn cspann.Txn
132-
txn, err = m.store.BeginTransaction(ctx)
133-
defer func() {
134-
if err == nil {
135-
err = m.store.CommitTransaction(ctx, txn)
136-
}
131+
err = m.store.RunTransaction(ctx, func(txn cspann.Txn) error {
132+
// Search the store.
133+
var idxCtx cspann.Context
134+
idxCtx.Init(txn)
135+
searchSet := cspann.SearchSet{MaxResults: maxResults}
136+
searchOptions := cspann.SearchOptions{BaseBeamSize: beamSize}
137+
err = m.index.Search(ctx, &idxCtx, nil /* treeKey */, vec, &searchSet, searchOptions)
137138
if err != nil {
138-
err = m.store.AbortTransaction(ctx, txn)
139+
return err
139140
}
140-
}()
141-
142-
// Search the store.
143-
var idxCtx cspann.Context
144-
idxCtx.Init(txn)
145-
searchSet := cspann.SearchSet{MaxResults: maxResults}
146-
searchOptions := cspann.SearchOptions{BaseBeamSize: beamSize}
147-
err = m.index.Search(ctx, &idxCtx, nil /* treeKey */, vec, &searchSet, searchOptions)
148-
if err != nil {
149-
return nil, err
150-
}
151-
*stats = searchSet.Stats
141+
*stats = searchSet.Stats
152142

153-
// Get result keys.
154-
results := searchSet.PopResults()
155-
keys = make([]cspann.KeyBytes, len(results))
156-
for i, res := range results {
157-
keys[i] = []byte(res.ChildKey.KeyBytes)
158-
}
143+
// Get result keys.
144+
results := searchSet.PopResults()
145+
keys = make([]cspann.KeyBytes, len(results))
146+
for i, res := range results {
147+
keys[i] = []byte(res.ChildKey.KeyBytes)
148+
}
149+
150+
return nil
151+
})
159152

160153
return keys, err
161154
}

pkg/sql/vecindex/cspann/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ go_library(
2323
"partition.go",
2424
"partition_metadata.go",
2525
"search_set.go",
26+
"searcher.go",
2627
"split_data.go",
2728
"store.go",
2829
"store_errors.go",

pkg/sql/vecindex/cspann/childkey_dedup.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ import (
1515
// hashKeyFunc is a function type for hashing KeyBytes.
1616
type hashKeyFunc func(KeyBytes) uint64
1717

18-
// childKeyDeDup provides de-duplication for ChildKey values. It supports both
18+
// ChildKeyDeDup provides de-duplication for ChildKey values. It supports both
1919
// PartitionKey and KeyBytes child keys efficiently without making unnecessary
2020
// allocations.
21-
type childKeyDeDup struct {
21+
type ChildKeyDeDup struct {
2222
// initialCapacity is used to initialize the size of the data structures used
2323
// by the de-duplicator.
2424
initialCapacity int
@@ -39,17 +39,22 @@ type childKeyDeDup struct {
3939
}
4040

4141
// Init initializes the de-duplicator.
42-
func (dd *childKeyDeDup) Init(capacity int) {
42+
func (dd *ChildKeyDeDup) Init(capacity int) {
4343
dd.initialCapacity = capacity
4444
dd.seed = maphash.MakeSeed()
4545
dd.hashKeyBytes = dd.defaultHashKeyBytes
4646
dd.Clear()
4747
}
4848

49+
// Count returns the number of keys in the de-duplicator.
50+
func (dd *ChildKeyDeDup) Count() int {
51+
return len(dd.partitionKeys) + len(dd.keyBytesMap)
52+
}
53+
4954
// TryAdd attempts to add a child key to the deduplication set. It returns true
5055
// if the key was added (wasn't a duplicate), or false if the key already exists
5156
// (is a duplicate).
52-
func (dd *childKeyDeDup) TryAdd(childKey ChildKey) bool {
57+
func (dd *ChildKeyDeDup) TryAdd(childKey ChildKey) bool {
5358
// Handle PartitionKey case - simple map lookup.
5459
if childKey.PartitionKey != 0 {
5560
// Lazily initialize the partitionKeys map.
@@ -102,19 +107,19 @@ func (dd *childKeyDeDup) TryAdd(childKey ChildKey) bool {
102107
}
103108

104109
// Clear removes all entries from the deduplication set.
105-
func (dd *childKeyDeDup) Clear() {
110+
func (dd *ChildKeyDeDup) Clear() {
106111
// Reset all the data structures.
107112
clear(dd.partitionKeys)
108113
clear(dd.keyBytesMap)
109114
}
110115

111116
// defaultHashKeyBytes is the default implementation of hashKeyBytes.
112-
func (dd *childKeyDeDup) defaultHashKeyBytes(key KeyBytes) uint64 {
117+
func (dd *ChildKeyDeDup) defaultHashKeyBytes(key KeyBytes) uint64 {
113118
return maphash.Bytes(dd.seed, key)
114119
}
115120

116121
// rehash creates a new hash from an existing hash to resolve collisions.
117-
func (dd *childKeyDeDup) rehash(hash uint64) uint64 {
122+
func (dd *ChildKeyDeDup) rehash(hash uint64) uint64 {
118123
// These constants are large 64-bit primes.
119124
hash ^= 0xc3a5c85c97cb3127
120125
hash ^= hash >> 33

pkg/sql/vecindex/cspann/childkey_dedup_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,42 +19,48 @@ func TestChildKeyDeDupAddPartitionKey(t *testing.T) {
1919
defer leaktest.AfterTest(t)()
2020
defer log.Scope(t).Close(t)
2121

22-
var dd childKeyDeDup
22+
var dd ChildKeyDeDup
2323
dd.Init(10)
2424

2525
// Add a new PartitionKey.
2626
added := dd.TryAdd(ChildKey{PartitionKey: 123})
2727
require.True(t, added)
28+
require.Equal(t, 1, dd.Count())
2829

2930
// Try to add the same key again (should be a duplicate).
3031
added = dd.TryAdd(ChildKey{PartitionKey: 123})
3132
require.False(t, added)
33+
require.Equal(t, 1, dd.Count())
3234

3335
// Add a different PartitionKey.
3436
added = dd.TryAdd(ChildKey{PartitionKey: 456})
3537
require.True(t, added)
38+
require.Equal(t, 2, dd.Count())
3639
}
3740

3841
func TestChildKeyDeDupAddKeyBytes(t *testing.T) {
3942
defer leaktest.AfterTest(t)()
4043
defer log.Scope(t).Close(t)
4144

42-
var dd childKeyDeDup
45+
var dd ChildKeyDeDup
4346
dd.Init(10)
4447

4548
// Add a new KeyBytes.
4649
key1 := []byte("key1")
4750
added := dd.TryAdd(ChildKey{KeyBytes: key1})
4851
require.True(t, added)
52+
require.Equal(t, 1, dd.Count())
4953

5054
// Try to add the same key again (should be a duplicate).
5155
added = dd.TryAdd(ChildKey{KeyBytes: key1})
5256
require.False(t, added)
57+
require.Equal(t, 1, dd.Count())
5358

5459
// Add a different KeyBytes.
5560
key2 := []byte("key2")
5661
added = dd.TryAdd(ChildKey{KeyBytes: key2})
5762
require.True(t, added)
63+
require.Equal(t, 2, dd.Count())
5864

5965
// Verify that both keys are properly stored by checking for duplicates.
6066
added = dd.TryAdd(ChildKey{KeyBytes: key1})
@@ -67,17 +73,19 @@ func TestChildKeyDeDupClear(t *testing.T) {
6773
defer leaktest.AfterTest(t)()
6874
defer log.Scope(t).Close(t)
6975

70-
var dd childKeyDeDup
76+
var dd ChildKeyDeDup
7177
dd.Init(10)
7278

7379
// Add a mix of keys.
7480
require.True(t, dd.TryAdd(ChildKey{PartitionKey: 123}))
7581
require.True(t, dd.TryAdd(ChildKey{KeyBytes: []byte("key1")}))
82+
require.Equal(t, 2, dd.Count())
7683

7784
// Clear the deduplicator.
7885
dd.Clear()
7986

8087
// Verify keys were cleared.
88+
require.Equal(t, 0, dd.Count())
8189
require.True(t, dd.TryAdd(ChildKey{PartitionKey: 123}))
8290
require.True(t, dd.TryAdd(ChildKey{KeyBytes: []byte("key1")}))
8391
}
@@ -96,7 +104,7 @@ func TestChildKeyDeDupRehashing(t *testing.T) {
96104
return maphash.Bytes(seed, key)
97105
}
98106

99-
var dd childKeyDeDup
107+
var dd ChildKeyDeDup
100108
dd.Init(100)
101109
dd.hashKeyBytes = customHashFunc
102110

0 commit comments

Comments
 (0)