Skip to content

Commit 1496755

Browse files
committed
vecindex: clone treeKey when creating fixups
Previously, the C-SPANN index assumed that it owned the memory for the vector index prefix value (called the "tree key" in C-SPANN). However, the backfiller assumed the opposite, and reuses the memory between SearchForInsert calls. This commit resolves the conflict by always cloning the tree key when it's enqueued as part of a fixup, so that callers can reuse the memory. Fixes: #145261 Epic: CRDB-42943 Release note: None
1 parent 1095d6c commit 1496755

File tree

6 files changed

+156
-21
lines changed

6 files changed

+156
-21
lines changed

pkg/sql/backfill/backfill_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,3 +181,63 @@ func TestConcurrentOperationsDuringVectorIndexCreation(t *testing.T) {
181181
sqlDB.QueryRow(t, `SELECT id FROM vectors@vec_idx ORDER BY vec <-> '[1, 2, 3]' LIMIT 1`).Scan(&id)
182182
require.Equal(t, 1, id)
183183
}
184+
185+
// Regression for issue #145261: vector index backfill with a prefix column
186+
// crashes the node.
187+
func TestVectorIndexWithPrefixBackfill(t *testing.T) {
188+
defer leaktest.AfterTest(t)()
189+
defer log.Scope(t).Close(t)
190+
191+
ctx := context.Background()
192+
srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
193+
defer srv.Stopper().Stop(ctx)
194+
sqlDB := sqlutils.MakeSQLRunner(db)
195+
196+
// Enable vector indexes.
197+
sqlDB.Exec(t, `SET CLUSTER SETTING feature.vector_index.enabled = true`)
198+
199+
// Create a table with a vector column + a prefix column.
200+
sqlDB.Exec(t, `
201+
CREATE TABLE items (
202+
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
203+
customer_id INT NOT NULL,
204+
name TEXT,
205+
embedding VECTOR(3)
206+
);
207+
`)
208+
209+
// Generate 10 customer id's, each with 100 vectors.
210+
sqlDB.Exec(t, `
211+
INSERT INTO items (customer_id, name, embedding)
212+
SELECT
213+
(i % 10) + 1 AS customer_id,
214+
'Item ' || i,
215+
ARRAY[random(), random(), random()]::vector
216+
FROM generate_series(1, 1000) AS s(i);
217+
`)
218+
219+
// Create the vector index with a small partition size so that the trees
220+
// have more levels and splits to get there.
221+
sqlDB.Exec(t, `
222+
CREATE VECTOR INDEX ON items (customer_id, embedding)
223+
WITH (min_partition_size=2, max_partition_size=8, build_beam_size=2);
224+
`)
225+
226+
// Ensure that each customer has 100 vectors.
227+
for i := range 10 {
228+
func() {
229+
rows := sqlDB.Query(t, `
230+
SELECT id FROM items
231+
WHERE customer_id = $1
232+
ORDER BY embedding <-> $2
233+
LIMIT 200`, i+1, "[0, 0, 0]")
234+
defer rows.Close()
235+
236+
count := 0
237+
for rows.Next() {
238+
count++
239+
}
240+
require.Equal(t, 100, count)
241+
}()
242+
}
243+
}

pkg/sql/vecindex/cspann/fixup_processor.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"fmt"
1111
"runtime"
12+
"slices"
1213
"sync"
1314
"time"
1415

@@ -290,7 +291,8 @@ func (fp *FixupProcessor) AddDeleteVector(
290291
ctx context.Context, treeKey TreeKey, partitionKey PartitionKey, vectorKey KeyBytes,
291292
) {
292293
fp.addFixup(ctx, fixup{
293-
TreeKey: treeKey,
294+
// Clone the tree key, since we don't own the memory.
295+
TreeKey: slices.Clone(treeKey),
294296
Type: vectorDeleteFixup,
295297
PartitionKey: partitionKey,
296298
VectorKey: vectorKey,
@@ -306,7 +308,8 @@ func (fp *FixupProcessor) AddSplit(
306308
singleStep bool,
307309
) {
308310
fp.addFixup(ctx, fixup{
309-
TreeKey: treeKey,
311+
// Clone the tree key, since we don't own the memory.
312+
TreeKey: slices.Clone(treeKey),
310313
Type: splitFixup,
311314
ParentPartitionKey: parentPartitionKey,
312315
PartitionKey: partitionKey,
@@ -323,7 +326,8 @@ func (fp *FixupProcessor) AddMerge(
323326
singleStep bool,
324327
) {
325328
fp.addFixup(ctx, fixup{
326-
TreeKey: treeKey,
329+
// Clone the tree key, since we don't own the memory.
330+
TreeKey: slices.Clone(treeKey),
327331
Type: mergeFixup,
328332
ParentPartitionKey: parentPartitionKey,
329333
PartitionKey: partitionKey,

pkg/sql/vecindex/cspann/index.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,10 @@ func (vi *Index) Close() {
350350
// before Insert when a vector is updated. Even then, it's not guaranteed that
351351
// Delete will find the old vector. The search set handles this rare case by
352352
// filtering out results with duplicate key bytes.
353+
//
354+
// NOTE: The caller is assumed to own the memory for all parameters and can
355+
// reuse the memory after the call returns.
356+
// TODO(andyk): This is not true of the MemStore.
353357
func (vi *Index) Insert(
354358
ctx context.Context, idxCtx *Context, treeKey TreeKey, vec vector.T, key KeyBytes,
355359
) error {
@@ -393,6 +397,10 @@ func (vi *Index) Insert(
393397
//
394398
// NOTE: Even if the vector is removed, there may still be duplicate dangling
395399
// instances of the vector still remaining in the index.
400+
//
401+
// NOTE: The caller is assumed to own the memory for all parameters and can
402+
// reuse the memory after the call returns.
403+
// TODO(andyk): This is not true of the MemStore.
396404
func (vi *Index) Delete(
397405
ctx context.Context, idxCtx *Context, treeKey TreeKey, vec vector.T, key KeyBytes,
398406
) (deleted bool, err error) {
@@ -423,6 +431,10 @@ func (vi *Index) Delete(
423431
// and returns them in the search set. Set searchSet.MaxResults to limit the
424432
// number of results. This is called within the scope of a transaction so that
425433
// the index does not appear to change during the search.
434+
//
435+
// NOTE: The caller is assumed to own the memory for all parameters and can
436+
// reuse the memory after the call returns.
437+
// TODO(andyk): This is not true of the MemStore.
426438
func (vi *Index) Search(
427439
ctx context.Context,
428440
idxCtx *Context,
@@ -440,6 +452,10 @@ func (vi *Index) Search(
440452
// partition, as well as the centroid of the partition (in the Vector field).
441453
// This is useful for callers that directly insert KV rows rather than using
442454
// this library to do it.
455+
//
456+
// NOTE: The caller is assumed to own the memory for all parameters and can
457+
// reuse the memory after the call returns.
458+
// TODO(andyk): This is not true of the MemStore.
443459
func (vi *Index) SearchForInsert(
444460
ctx context.Context, idxCtx *Context, treeKey TreeKey, vec vector.T,
445461
) (*SearchResult, error) {
@@ -471,6 +487,10 @@ func (vi *Index) SearchForInsert(
471487
// It returns a single search result containing the key of that partition, or
472488
// nil if the vector cannot be found. This is useful for callers that directly
473489
// delete KV rows rather than using this library to do it.
490+
//
491+
// NOTE: The caller is assumed to own the memory for all parameters and can
492+
// reuse the memory after the call returns.
493+
// TODO(andyk): This is not true of the MemStore.
474494
func (vi *Index) SearchForDelete(
475495
ctx context.Context, idxCtx *Context, treeKey TreeKey, vec vector.T, key KeyBytes,
476496
) (*SearchResult, error) {

pkg/sql/vecindex/mutation_searcher.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ func (s *MutationSearcher) Init(idx *cspann.Index, txn *kv.Txn) {
5151
// SearchForInsert triggers a search for the partition in which to insert the
5252
// input vector. The partition's key is returned by PartitionKey() and the
5353
// input vector's quantized and encoded form is returned by EncodedVector().
54+
//
55+
// NOTE: The caller is assumed to own the memory for all parameters and can
56+
// reuse the memory after the call returns.
5457
func (s *MutationSearcher) SearchForInsert(
5558
ctx context.Context, prefix roachpb.Key, vec vector.T,
5659
) error {
@@ -79,6 +82,9 @@ func (s *MutationSearcher) SearchForInsert(
7982
// SearchForDelete triggers a search for the partition which contains the vector
8083
// to be deleted, identified by its primary key. If the input vector is found,
8184
// its partition is returned by PartitionKey().
85+
//
86+
// NOTE: The caller is assumed to own the memory for all parameters and can
87+
// reuse the memory after the call returns.
8288
func (s *MutationSearcher) SearchForDelete(
8389
ctx context.Context, prefix roachpb.Key, vec vector.T, key cspann.KeyBytes,
8490
) error {

pkg/sql/vecindex/searcher.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ func (s *Searcher) Init(idx *cspann.Index, txn *kv.Txn, baseBeamSize, maxResults
6161
// Search triggers a search over the index for the given vector, within the
6262
// scope of the given prefix. "maxResults" specifies the maximum number of
6363
// results that will be returned.
64+
//
65+
// NOTE: The caller is assumed to own the memory for all parameters and can
66+
// reuse the memory after the call returns.
6467
func (s *Searcher) Search(ctx context.Context, prefix roachpb.Key, vec vector.T) error {
6568
err := s.idx.Search(ctx, &s.idxCtx, cspann.TreeKey(prefix), vec, &s.searchSet, s.options)
6669
if err != nil {

pkg/sql/vecindex/searcher_test.go

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package vecindex
77

88
import (
99
"context"
10+
"slices"
1011
"testing"
1112

1213
"github.com/cockroachdb/cockroach/pkg/base"
@@ -89,22 +90,40 @@ func TestSearcher(t *testing.T) {
8990
// Insert two vectors into root partition.
9091
var mutator MutationSearcher
9192
mutator.Init(idx, tx)
92-
prefix := keys.MakeFamilyKey(encoding.EncodeVarintAscending([]byte{}, 100), 0 /* famID */)
93+
94+
// Reuse prefix, key bytes, value bytes and vector memory, to ensure it's
95+
// allowed.
96+
var prefix, keyBytes, valueBytes []byte
97+
var original, randomized vector.T
9398

9499
insertVector := func(vec vector.T, key int64, val cspann.ValueBytes) vector.T {
100+
prefix = prefix[:0]
101+
keyBytes = keyBytes[:0]
102+
randomized = randomized[:0]
103+
104+
prefix = encoding.EncodeVarintAscending(prefix, 100)
95105
require.NoError(t, mutator.SearchForInsert(ctx, prefix, vec))
106+
96107
partitionKey := cspann.PartitionKey(*mutator.PartitionKey().(*tree.DInt))
97-
keyBytes := keys.MakeFamilyKey(encoding.EncodeVarintAscending([]byte{}, key), 0 /* famID */)
98-
randomizedVec := make(vector.T, len(vec))
99-
idx.RandomizeVector(vec, randomizedVec)
108+
keyBytes = keys.MakeFamilyKey(encoding.EncodeVarintAscending(keyBytes, key), 0 /* famID */)
109+
110+
randomized = slices.Grow(randomized, len(vec))[:len(vec)]
111+
idx.RandomizeVector(vec, randomized)
100112
err = mutator.txn.AddToPartition(ctx, cspann.TreeKey(prefix), partitionKey, cspann.LeafLevel,
101-
randomizedVec, cspann.ChildKey{KeyBytes: keyBytes}, val)
113+
randomized, cspann.ChildKey{KeyBytes: keyBytes}, val)
102114
require.NoError(t, err)
103-
return randomizedVec
115+
return randomized
104116
}
105117

106-
insertVector(vector.T{1, 2}, 1, cspann.ValueBytes{1, 2})
107-
randomizedVec := insertVector(vector.T{5, 3}, 2, cspann.ValueBytes{3, 4})
118+
// Reuse vector and value memory, to ensure it's allowed.
119+
original = vector.T{1, 2}
120+
valueBytes = []byte{1, 2}
121+
insertVector(original, 1, cspann.ValueBytes(valueBytes))
122+
original[0] = 5
123+
original[1] = 3
124+
valueBytes[0] = 3
125+
valueBytes[1] = 4
126+
randomized = insertVector(original, 2, cspann.ValueBytes(valueBytes))
108127

109128
// Validate that search vector was correctly encoded and quantized.
110129
encodedVec := mutator.EncodedVector()
@@ -113,29 +132,52 @@ func TestSearcher(t *testing.T) {
113132
[]byte(*encodedVec.(*tree.DBytes)), &vecSet)
114133
require.NoError(t, err)
115134
require.Empty(t, remainder)
116-
require.Equal(t, randomizedVec, vecSet.Vectors.At(0))
135+
require.Equal(t, randomized, vecSet.Vectors.At(0))
117136

118-
// Use the Searcher.
137+
// Search for a vector that doesn't exist in the tree (reuse memory).
138+
prefix = prefix[:0]
139+
prefix = encoding.EncodeVarintAscending(prefix, 200)
140+
original[0] = 1
141+
original[1] = 1
119142
var searcher Searcher
120143
searcher.Init(idx, tx, 8 /* baseBeamSize */, 2 /* maxResults */)
121-
require.NoError(t, searcher.Search(ctx, prefix, vector.T{1, 1}))
144+
require.NoError(t, searcher.Search(ctx, prefix, original))
145+
require.Nil(t, searcher.NextResult())
146+
147+
// Search for a vector that does exist (reuse memory).
148+
prefix = prefix[:0]
149+
prefix = encoding.EncodeVarintAscending(prefix, 100)
150+
require.NoError(t, searcher.Search(ctx, prefix, original))
122151
res := searcher.NextResult()
123152
require.InDelta(t, float32(1), res.QuerySquaredDistance, 0.01)
124153
res = searcher.NextResult()
125154
require.InDelta(t, float32(20), res.QuerySquaredDistance, 0.01)
126155
require.Nil(t, searcher.NextResult())
127156

128-
// Search for a vector to delete.
129-
keyBytes := keys.MakeFamilyKey(encoding.EncodeVarintAscending([]byte{}, 1), 0 /* famID */)
130-
require.NoError(t, mutator.SearchForDelete(ctx, prefix, vector.T{1, 2}, keyBytes))
131-
require.Equal(t, tree.NewDInt(tree.DInt(1)), mutator.PartitionKey())
157+
// Search for a vector to delete that doesn't exist (reuse memory).
158+
keyBytes = keyBytes[:0]
159+
original[0] = 1
160+
original[1] = 2
161+
keyBytes = keys.MakeFamilyKey(encoding.EncodeVarintAscending(keyBytes, 123), 0 /* famID */)
162+
require.NoError(t, mutator.SearchForDelete(ctx, prefix, original, keyBytes))
163+
require.Equal(t, tree.DNull, mutator.PartitionKey())
132164
require.Nil(t, mutator.EncodedVector())
133165

134-
// Search for a vector to delete that doesn't exist.
135-
keyBytes = keys.MakeFamilyKey(encoding.EncodeVarintAscending([]byte{}, 123), 0 /* famID */)
136-
require.NoError(t, mutator.SearchForDelete(ctx, prefix, vector.T{1, 2}, keyBytes))
166+
// Search for a vector to delete that doesn't exist in the tree (reuse memory).
167+
prefix = prefix[:0]
168+
prefix = encoding.EncodeVarintAscending(prefix, 200)
169+
require.NoError(t, mutator.SearchForDelete(ctx, prefix, original, keyBytes))
137170
require.Equal(t, tree.DNull, mutator.PartitionKey())
138171
require.Nil(t, mutator.EncodedVector())
139172

173+
// Search for a vector to delete (reuse memory).
174+
prefix = prefix[:0]
175+
prefix = encoding.EncodeVarintAscending(prefix, 100)
176+
keyBytes = keyBytes[:0]
177+
keyBytes = keys.MakeFamilyKey(encoding.EncodeVarintAscending(keyBytes, 1), 0 /* famID */)
178+
require.NoError(t, mutator.SearchForDelete(ctx, prefix, original, keyBytes))
179+
require.Equal(t, tree.NewDInt(tree.DInt(1)), mutator.PartitionKey())
180+
require.Nil(t, mutator.EncodedVector())
181+
140182
require.NoError(t, tx.Commit(ctx))
141183
}

0 commit comments

Comments
 (0)