Skip to content

Commit ae05c7c

Browse files
committed
vecindex: lazily create root partition metadata
When preparing to update a K-means tree, we check its root partition metadata. If that does not yet exist, we will create it using a CPut operation and then acquire a SHARED lock that prevents a background split operation until the transaction ends. Epic: CRDB-42943 Release note: None
1 parent 88a72c1 commit ae05c7c

File tree

3 files changed

+113
-4
lines changed

3 files changed

+113
-4
lines changed

pkg/sql/vecindex/vecstore/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ go_test(
4545
deps = [
4646
"//pkg/base",
4747
"//pkg/keys",
48+
"//pkg/kv",
4849
"//pkg/kv/kvpb",
4950
"//pkg/security/securityassets",
5051
"//pkg/security/securitytest",

pkg/sql/vecindex/vecstore/store_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@ package vecstore
88
import (
99
"context"
1010
"fmt"
11+
"sync"
12+
"sync/atomic"
1113
"testing"
1214

1315
"github.com/cockroachdb/cockroach/pkg/base"
1416
"github.com/cockroachdb/cockroach/pkg/keys"
17+
"github.com/cockroachdb/cockroach/pkg/kv"
1518
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1619
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1720
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
@@ -146,6 +149,73 @@ func TestStore(t *testing.T) {
146149
// Re-run the tests with a prefixed index.
147150
usePrefix = true
148151
suite.Run(t, commontest.NewStoreTestSuite(ctx, makeStore))
152+
153+
// Ensure that races to create partition metadata either do not error, or
154+
// they error with WriteTooOldError.
155+
t.Run("race to create partition metadata", func(t *testing.T) {
156+
store := makeStore(quantize.NewUnQuantizer(2)).(*testStore)
157+
158+
var done atomic.Int64
159+
getMetadata := func(treeKey cspann.TreeKey) {
160+
var err error
161+
tx, err := store.BeginTransaction(ctx)
162+
require.NoError(t, err)
163+
defer func() {
164+
if err != nil {
165+
err = store.AbortTransaction(ctx, tx)
166+
}
167+
}()
168+
169+
// Enable stepping in the txn, which is what SQL does.
170+
tx.(*Txn).kv.ConfigureStepping(ctx, kv.SteppingEnabled)
171+
172+
_, err = tx.GetPartitionMetadata(ctx, treeKey, cspann.RootKey, true /* forUpdate */)
173+
if err != nil {
174+
require.ErrorContains(t, err, "WriteTooOldError")
175+
done.Store(1)
176+
return
177+
}
178+
179+
// Run GetPartitionMetadata again, to ensure that it succeeds, as a
180+
// way of simulating multiple vectors being inserted in the same
181+
// SQL statement.
182+
_, err = tx.GetPartitionMetadata(ctx, treeKey, cspann.RootKey, true /* forUpdate */)
183+
require.NoError(t, err)
184+
185+
err = store.CommitTransaction(ctx, tx)
186+
require.NoError(t, err)
187+
}
188+
189+
for i := range 100 {
190+
var wait sync.WaitGroup
191+
wait.Add(2)
192+
treeKey := store.MakeTreeKey(t, i)
193+
go func() {
194+
defer func() {
195+
wait.Done()
196+
}()
197+
getMetadata(treeKey)
198+
}()
199+
go func() {
200+
defer func() {
201+
wait.Done()
202+
}()
203+
getMetadata(treeKey)
204+
}()
205+
wait.Wait()
206+
207+
// Fail on foreground goroutine if a background goroutine failed.
208+
if t.Failed() {
209+
t.FailNow()
210+
}
211+
212+
// End the test once we find at least one WriteTooOldError.
213+
if done.Load() == 1 {
214+
break
215+
}
216+
}
217+
})
218+
149219
}
150220

151221
func TestQuantizeAndEncode(t *testing.T) {

pkg/sql/vecindex/vecstore/store_txn.go

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -322,11 +322,13 @@ func (tx *Txn) GetPartitionMetadata(
322322
errors.Wrapf(err, "getting partition metadata for %d", partitionKey)
323323
}
324324

325-
metadata, err := tx.extractMetadataFromKVResult(treeKey, partitionKey, &b.Results[0])
326-
if err != nil {
327-
return cspann.PartitionMetadata{}, err
325+
// If we're preparing to update the root partition, then lazily create its
326+
// metadata if it does not yet exist.
327+
if forUpdate && partitionKey == cspann.RootKey && b.Results[0].Rows[0].Value == nil {
328+
return tx.createRootPartition(ctx, metadataKey)
328329
}
329-
return metadata, nil
330+
331+
return tx.extractMetadataFromKVResult(treeKey, partitionKey, &b.Results[0])
330332
}
331333

332334
// AddToPartition implements the cspann.Txn interface.
@@ -575,6 +577,42 @@ func (tx *Txn) GetFullVectors(
575577
return err
576578
}
577579

580+
// createRootPartition uses the KV CPut operation to create metadata for the
581+
// root partition, and then returns that metadata.
582+
//
583+
// NOTE: CPut always "sees" the latest write of the metadata record, even if the
584+
// timestamp of that write is higher than this transaction's.
585+
func (tx *Txn) createRootPartition(
586+
ctx context.Context, metadataKey roachpb.Key,
587+
) (cspann.PartitionMetadata, error) {
588+
b := tx.kv.NewBatch()
589+
metadata := cspann.PartitionMetadata{Level: cspann.LeafLevel, Centroid: tx.store.emptyVec}
590+
encoded, err := EncodePartitionMetadata(metadata.Level, metadata.Centroid)
591+
if err != nil {
592+
return cspann.PartitionMetadata{}, err
593+
}
594+
595+
// Use CPutAllowingIfNotExists in order to handle the case where the same
596+
// transaction inserts multiple vectors (e.g. multiple VALUES rows). In that
597+
// case, the first row will trigger creation of the metadata record. However,
598+
// subsequent inserts will not be able to "see" this record, since they will
599+
// read at a lower sequence number than the metadata record was written.
600+
// However, CPutAllowingIfNotExists will read at the higher sequence number
601+
// and see that the record was already created.
602+
//
603+
// On the other hand, if a different transaction wrote the record, it will
604+
// have a higher timestamp, and that will trigger a WriteTooOld error.
605+
// Transactions which lose that race need to be refreshed.
606+
var roachval roachpb.Value
607+
roachval.SetBytes(encoded)
608+
b.CPutAllowingIfNotExists(metadataKey, &roachval, roachval.TagAndDataBytes())
609+
if err := tx.kv.Run(ctx, b); err != nil {
610+
// Lost the race to a different transaction.
611+
return cspann.PartitionMetadata{}, errors.Wrapf(err, "creating root partition metadata")
612+
}
613+
return metadata, nil
614+
}
615+
578616
// QuantizeAndEncode quantizes the given vector (which has already been
579617
// randomized by the caller) with respect to the given centroid. It returns the
580618
// encoded form of that quantized vector.

0 commit comments

Comments
 (0)