Skip to content

Commit f486be2

Browse files
committed
cspann: change TryGetPartitionMetadata to handle batches
Previously, TryGetPartitionMetadata fetched metadata for a single partition. This commit changes that to instead fetch metadata for a batch of multiple partitions in one call. This will be used by a future commit. Epic: CRDB-42943 Release note: None
1 parent 314510c commit f486be2

File tree

7 files changed

+107
-58
lines changed

7 files changed

+107
-58
lines changed

pkg/sql/vecindex/cspann/commontest/storetests.go

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,11 @@ type MakeStoreFunc func(quantizer quantize.Quantizer) TestStore
7676
type StoreTestSuite struct {
7777
suite.Suite
7878

79-
ctx context.Context
80-
makeStore MakeStoreFunc
81-
rootQuantizer quantize.Quantizer
82-
quantizer quantize.Quantizer
79+
ctx context.Context
80+
makeStore MakeStoreFunc
81+
rootQuantizer quantize.Quantizer
82+
quantizer quantize.Quantizer
83+
nextPartitionKey cspann.PartitionKey
8384
}
8485

8586
// NewStoreTestSuite constructs a new suite of tests that run against
@@ -88,10 +89,12 @@ type StoreTestSuite struct {
8889
// tests.
8990
func NewStoreTestSuite(ctx context.Context, makeStore MakeStoreFunc) *StoreTestSuite {
9091
return &StoreTestSuite{
91-
ctx: ctx,
92-
makeStore: makeStore,
93-
rootQuantizer: quantize.NewUnQuantizer(2, vecpb.L2SquaredDistance),
94-
quantizer: quantize.NewRaBitQuantizer(2, 42, vecpb.L2SquaredDistance)}
92+
ctx: ctx,
93+
makeStore: makeStore,
94+
rootQuantizer: quantize.NewUnQuantizer(2, vecpb.L2SquaredDistance),
95+
quantizer: quantize.NewRaBitQuantizer(2, 42, vecpb.L2SquaredDistance),
96+
nextPartitionKey: cspann.RootKey + 1,
97+
}
9598
}
9699

97100
func (suite *StoreTestSuite) TestRunTransaction() {
@@ -700,31 +703,38 @@ func (suite *StoreTestSuite) TestTryGetPartitionMetadata() {
700703

701704
doTest := func(treeID int) {
702705
treeKey := store.MakeTreeKey(suite.T(), treeID)
703-
partitionKey := cspann.PartitionKey(10)
704-
705-
// Partition does not yet exist.
706-
_, err := store.TryGetPartitionMetadata(suite.ctx, treeKey, partitionKey)
707-
suite.ErrorIs(err, cspann.ErrPartitionNotFound)
708706

709-
// Create partition with some vectors in it.
710-
partitionKey, partition := suite.createTestPartition(store, treeKey)
707+
// Create two partition with vectors in them.
708+
partitionKey1, partition1 := suite.createTestPartition(store, treeKey)
709+
partitionKey2, partition2 := suite.createTestPartition(store, treeKey)
711710

712-
// Fetch back only the metadata and validate it.
713-
partitionMetadata, err := store.TryGetPartitionMetadata(suite.ctx, treeKey, partitionKey)
711+
// Fetch metadata for the partitions, along with one that doesn't exist.
712+
toGet := []cspann.PartitionMetadataToGet{
713+
{Key: partitionKey1},
714+
{Key: cspann.PartitionKey(9999)},
715+
{Key: partitionKey2},
716+
}
717+
err := store.TryGetPartitionMetadata(suite.ctx, treeKey, toGet)
714718
suite.NoError(err)
715-
suite.True(partitionMetadata.Equal(partition.Metadata()))
719+
720+
// Validate that partition 9999 does not exist.
721+
suite.Equal(cspann.PartitionMetadata{}, toGet[1].Metadata)
722+
723+
// Validate metadata for other partitions.
724+
suite.True(partition1.Metadata().Equal(&toGet[0].Metadata))
725+
suite.True(partition2.Metadata().Equal(&toGet[2].Metadata))
716726

717727
// Update the metadata and verify we get the updated values.
718-
expected := *partition.Metadata()
728+
expected := toGet[0].Metadata
719729
metadata := expected
720730
metadata.StateDetails.MakeUpdating(30)
721731
suite.NoError(store.TryUpdatePartitionMetadata(
722-
suite.ctx, treeKey, partitionKey, metadata, expected))
732+
suite.ctx, treeKey, partitionKey1, metadata, expected))
723733

724734
// Fetch updated metadata and validate.
725-
partitionMetadata, err = store.TryGetPartitionMetadata(suite.ctx, treeKey, partitionKey)
735+
err = store.TryGetPartitionMetadata(suite.ctx, treeKey, toGet[:1])
726736
suite.NoError(err)
727-
suite.True(partitionMetadata.Equal(&metadata))
737+
suite.True(toGet[0].Metadata.Equal(&metadata))
728738
}
729739

730740
suite.Run("default tree", func() {
@@ -1043,7 +1053,8 @@ func (suite *StoreTestSuite) TestTryClearPartition() {
10431053
func (suite *StoreTestSuite) createTestPartition(
10441054
store TestStore, treeKey cspann.TreeKey,
10451055
) (cspann.PartitionKey, *cspann.Partition) {
1046-
partitionKey := cspann.PartitionKey(10)
1056+
partitionKey := suite.nextPartitionKey
1057+
suite.nextPartitionKey++
10471058
metadata := cspann.MakeReadyPartitionMetadata(cspann.SecondLevel, vector.T{4, 3})
10481059
suite.NoError(store.TryCreateEmptyPartition(suite.ctx, treeKey, partitionKey, metadata))
10491060
vectors := vector.MakeSet(2)

pkg/sql/vecindex/cspann/fixup_split.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ func (fw *fixupWorker) splitPartition(
209209
return errors.Wrapf(errFixupAborted, "reloading partition, metadata timestamp changed")
210210
}
211211
} else if metadata.StateDetails.State != MissingState {
212+
// Fetch metadata for already created left and right partitions.
212213
leftMetadata, err = fw.getPartitionMetadata(ctx, leftPartitionKey)
213214
if err != nil {
214215
return err
@@ -363,16 +364,15 @@ func (fw *fixupWorker) getPartition(
363364
// getPartitionMetadata returns the up-to-date metadata of the partition with
364365
// the given key.
365366
func (fw *fixupWorker) getPartitionMetadata(
366-
ctx context.Context, partitionKey PartitionKey,
367+
ctx context.Context, partitionKey PartitionKey,
367368
) (PartitionMetadata, error) {
368-
metadata, err := fw.index.store.TryGetPartitionMetadata(ctx, fw.treeKey, partitionKey)
369+
fw.tempMetadataToGet = ensureSliceLen(fw.tempMetadataToGet, 1)
370+
fw.tempMetadataToGet[0].Key = partitionKey
371+
err := fw.index.store.TryGetPartitionMetadata(ctx, fw.treeKey, fw.tempMetadataToGet)
369372
if err != nil {
370-
metadata, err = suppressRaceErrors(err)
371-
if err != nil {
372-
return PartitionMetadata{}, errors.Wrapf(err, "getting metadata for partition %d", partitionKey)
373-
}
373+
return PartitionMetadata{}, errors.Wrapf(err, "getting metadata for partition %d", partitionKey)
374374
}
375-
return metadata, nil
375+
return fw.tempMetadataToGet[0].Metadata, nil
376376
}
377377

378378
// updateMetadata updates the given partition's metadata record, on the
@@ -586,8 +586,7 @@ func (fw *fixupWorker) createSplitSubPartition(
586586
// Load parent metadata to verify that it's in a state that allows inserts.
587587
parentMetadata, err := fw.getPartitionMetadata(ctx, parentPartitionKey)
588588
if err != nil {
589-
return PartitionMetadata{}, errors.Wrapf(err,
590-
"getting parent partition %d metadata", parentPartitionKey)
589+
return PartitionMetadata{}, err
591590
}
592591

593592
parentLevel := sourceMetadata.Level + 1

pkg/sql/vecindex/cspann/fixup_worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ type fixupWorker struct {
9595
tempVectorsWithKeys []VectorWithKey
9696
tempChildKey [1]ChildKey
9797
tempValueBytes [1]ValueBytes
98+
tempMetadataToGet []PartitionMetadataToGet
9899
}
99100

100101
// ewFixupWorker returns a new worker for the given processor.

pkg/sql/vecindex/cspann/memstore/memstore.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -377,17 +377,26 @@ func (s *Store) TryGetPartition(
377377

378378
// TryGetPartitionMetadata implements the Store interface.
379379
func (s *Store) TryGetPartitionMetadata(
380-
ctx context.Context, treeKey cspann.TreeKey, partitionKey cspann.PartitionKey,
381-
) (cspann.PartitionMetadata, error) {
382-
memPart := s.lockPartition(treeKey, partitionKey, uniqueOwner, false /* isExclusive */)
383-
if memPart == nil {
384-
// Partition does not exist.
385-
return cspann.PartitionMetadata{}, cspann.ErrPartitionNotFound
380+
ctx context.Context, treeKey cspann.TreeKey, toGet []cspann.PartitionMetadataToGet,
381+
) error {
382+
for i := range toGet {
383+
item := &toGet[i]
384+
385+
func() {
386+
memPart := s.lockPartition(treeKey, item.Key, uniqueOwner, false /* isExclusive */)
387+
if memPart == nil {
388+
// Partition does not exist, so map it to Missing.
389+
item.Metadata = cspann.PartitionMetadata{}
390+
return
391+
}
392+
defer memPart.lock.ReleaseShared()
393+
394+
// Return a copy of the metadata.
395+
item.Metadata = *memPart.lock.partition.Metadata()
396+
}()
386397
}
387-
defer memPart.lock.ReleaseShared()
388398

389-
// Return a copy of the metadata.
390-
return *memPart.lock.partition.Metadata(), nil
399+
return nil
391400
}
392401

393402
// TryUpdatePartitionMetadata implements the Store interface.

pkg/sql/vecindex/cspann/memstore/memstore_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,11 @@ func TestInMemoryStoreUpdateStats(t *testing.T) {
197197
require.Equal(t, []cspann.CVStats{}, stats.CVStats)
198198

199199
// Increase root partition level and check stats.
200-
metadata, err := store.TryGetPartitionMetadata(ctx, treeKey, cspann.RootKey)
200+
toGet := []cspann.PartitionMetadataToGet{{Key: cspann.RootKey}}
201+
err = store.TryGetPartitionMetadata(ctx, treeKey, toGet)
201202
require.NoError(t, err)
202203

203-
expected := metadata
204+
expected := toGet[0].Metadata
204205
metadata.Level = 3
205206
err = store.TryUpdatePartitionMetadata(ctx, treeKey, cspann.RootKey, metadata, expected)
206207
require.NoError(t, err)

pkg/sql/vecindex/cspann/store.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ type PartitionToSearch struct {
4444
Count int
4545
}
4646

47+
// PartitionMetadataToGet contains information about partition metadata to be
48+
// fetched by the TryGetPartitionMetadata method.
49+
type PartitionMetadataToGet struct {
50+
// Key specifies which partition's metadata to fetch and is set by the caller.
51+
Key PartitionKey
52+
// Metadata is the metadata for the partition and is set by the Store.
53+
Metadata PartitionMetadata
54+
}
55+
4756
// Store encapsulates the component that's actually storing the vectors, whether
4857
// that's in a CRDB cluster for production or in memory for testing and
4958
// benchmarking. Callers can use Store to start and commit transactions against
@@ -92,13 +101,14 @@ type Store interface {
92101
ctx context.Context, treeKey TreeKey, partitionKey PartitionKey,
93102
) (*Partition, error)
94103

95-
// TryGetPartitionMetadata returns just the metadata of the requested
96-
// partition, if it exists, else it returns ErrPartitionNotFound. This is
97-
// more efficient than loading the entire partition when only metadata is
98-
// needed.
104+
// TryGetPartitionMetadata returns the metadata of the requested partitions.
105+
// If a partition does not exist, its state is set to Missing.
106+
//
107+
// NOTE: The caller owns the "toGet" memory. The Store implementation should
108+
// not try to use it after returning.
99109
TryGetPartitionMetadata(
100-
ctx context.Context, treeKey TreeKey, partitionKey PartitionKey,
101-
) (PartitionMetadata, error)
110+
ctx context.Context, treeKey TreeKey, toGet []PartitionMetadataToGet,
111+
) error
102112

103113
// TryUpdatePartitionMetadata updates the partition's metadata only if it's
104114
// equal to the expected value, else it returns a ConditionFailedError. If

pkg/sql/vecindex/vecstore/store.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -287,18 +287,36 @@ func (s *Store) TryGetPartition(
287287
}
288288

289289
// TryGetPartitionMetadata is part of the cspann.Store interface. It returns the
290-
// metadata of an existing partition.
290+
// metadata for a batch of partitions.
291291
func (s *Store) TryGetPartitionMetadata(
292-
ctx context.Context, treeKey cspann.TreeKey, partitionKey cspann.PartitionKey,
293-
) (cspann.PartitionMetadata, error) {
292+
ctx context.Context, treeKey cspann.TreeKey, toGet []cspann.PartitionMetadataToGet,
293+
) error {
294+
// Construct a batch with one Get request per partition.
294295
b := s.kv.NewBatch()
295-
metadataKey := vecencoding.EncodeMetadataKey(s.prefix, treeKey, partitionKey)
296-
b.Get(metadataKey)
297-
if err := s.kv.Run(ctx, b); err != nil {
298-
return cspann.PartitionMetadata{},
299-
errors.Wrapf(err, "getting partition metadata for %d", partitionKey)
296+
for i := range toGet {
297+
metadataKey := vecencoding.EncodeMetadataKey(s.prefix, treeKey, toGet[i].Key)
298+
b.Get(metadataKey)
299+
}
300+
301+
// Run the batch and return results.
302+
var err error
303+
if err = s.kv.Run(ctx, b); err != nil {
304+
return errors.Wrapf(err, "getting partition metadata for %d partitions", len(toGet))
300305
}
301-
return s.getMetadataFromKVResult(partitionKey, &b.Results[0])
306+
307+
for i := range toGet {
308+
item := &toGet[i]
309+
item.Metadata, err = s.getMetadataFromKVResult(item.Key, &b.Results[i])
310+
311+
// If partition is missing, just return Missing metadata.
312+
if err != nil {
313+
if !errors.Is(err, cspann.ErrPartitionNotFound) {
314+
return err
315+
}
316+
}
317+
}
318+
319+
return nil
302320
}
303321

304322
// TryUpdatePartitionMetadata is part of the cspann.Store interface. It updates

0 commit comments

Comments
 (0)