Skip to content

Commit 015d87f

Browse files
committed
vecindex: fix race condition when creating root partition
When multiple workers are racing to create the root partition, it's possible to trigger an error when the root partition already exists and is in a state other than Ready (e.g. Splitting). Fix this by creating the root metadata record only if doesn't already exist. If it does exist, just return the current metadata. Epic: CRDB-42943 Release note: None
1 parent d0c6163 commit 015d87f

File tree

4 files changed

+64
-45
lines changed

4 files changed

+64
-45
lines changed

pkg/sql/vecindex/cspann/partition_metadata.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ type PartitionStateDetails struct {
147147
// be merged into the root partition.
148148
Source PartitionKey
149149
// Timestamp is the time of the last state transition for the partition.
150+
// NOTE: We use NowNoMono to get the current time because the monotonic clock
151+
// reading is not round-tripped through the encoding/decoding functions, since
152+
// it's not useful to store.
150153
Timestamp time.Time
151154
}
152155

pkg/sql/vecindex/vecencoding/encoding.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/util/vector"
1818
)
1919

20-
/* Vector indexes are encoded as follows:
20+
/* Vector indexes are encoded as shown below.
2121
22-
Metadata KV key:
22+
NOTE: Key formats always are always suffixed by a Family ID byte of 0. This is
23+
necessary to include so that the key can be parsed by the KV range split code.
24+
That code uses the family ID to check that it doesn't split column families for
25+
the same row across ranges.
26+
27+
Metadata KV Key:
2328
Metadata keys always sort before vector keys, since Family ID 0 always
2429
sorts before Level, which is >= 1.
2530
┌────────────┬──────────────┬────────────┬───────────┐
@@ -29,11 +34,11 @@ Metadata KV Value:
2934
┌─────┬─────┬───────┬───────┬──────┬─────────┬────────┐
3035
│Level│State│Target1│Target2│Source|Timestamp│Centroid|
3136
└─────┴─────┴───────┴───────┴──────┴─────────┴────────┘
32-
Vector KV key (interior, non-leaf partition):
37+
Vector KV Key (interior, non-leaf partition):
3338
┌────────────┬──────────────┬────────────┬─────┬──────────────────┬───────────┐
3439
│Index Prefix│Prefix Columns│PartitionKey│Level│Child PartitionKey│Family ID 0│
3540
└────────────┴──────────────┴────────────┴─────┴──────────────────┴───────────┘
36-
Vector KV key (leaf partition):
41+
Vector KV Key (leaf partition):
3742
┌────────────┬──────────────┬────────────┬─────┬──────────┬───────────┐
3843
│Index Prefix│Prefix Columns│PartitionKey│Level│PrimaryKey│Family ID 0│
3944
└────────────┴──────────────┴────────────┴─────┴──────────┴───────────┘
@@ -147,18 +152,17 @@ func (vik *DecodedVectorKey) Encode(appendTo []byte) []byte {
147152
}
148153

149154
// EncodeVectorValue takes a quantized vector entry and any composite key data
150-
// and returns the byte slice encoding the value side of the vector index entry.
151-
// This value will still need to be further encoded as Bytes in the
152-
// valueside.Value
155+
// and returns the byte slice encoding the value of the vector index entry. This
156+
// value will still need to be further encoded as Bytes in valueside.Value.
153157
func EncodeVectorValue(appendTo []byte, vectorData []byte, compositeData []byte) []byte {
154-
// The value side is encoded as a concatenation of the vector data and the
158+
// The value is encoded as a concatenation of the vector data and the
155159
// composite data.
156160
appendTo = append(appendTo, vectorData...)
157161
return append(appendTo, compositeData...)
158162
}
159163

160164
// EncodedVectorValueLen returns the number of bytes needed to encode the value
161-
// side of a vector index entry.
165+
// of a vector index entry.
162166
func EncodedVectorValueLen(vectorData []byte, compositeData []byte) int {
163167
return len(vectorData) + len(compositeData)
164168
}

pkg/sql/vecindex/vecstore/store_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,6 @@ func TestStore(t *testing.T) {
179179
return
180180
}
181181

182-
// Step so that we can see the new root partition.
183-
require.NoError(t, tx.(*Txn).kv.Step(ctx, true /* allowReadTimestampStep */))
184-
185182
// Run GetPartitionMetadata again, to ensure that it succeeds, as a
186183
// way of simulating multiple vectors being inserted in the same
187184
// SQL statement.

pkg/sql/vecindex/vecstore/store_txn.go

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -304,22 +304,48 @@ func (tx *Txn) DeletePartition(
304304
func (tx *Txn) GetPartitionMetadata(
305305
ctx context.Context, treeKey cspann.TreeKey, partitionKey cspann.PartitionKey, forUpdate bool,
306306
) (cspann.PartitionMetadata, error) {
307-
// TODO(mw5h): Add to an existing batch instead of starting a new one.
308-
b := tx.kv.NewBatch()
309-
310307
metadataKey := vecencoding.EncodeMetadataKey(tx.store.prefix, treeKey, partitionKey)
311-
if forUpdate {
312-
// By acquiring a shared lock on metadata key, we prevent splits/merges of
313-
// this partition from conflicting with the add operation.
314-
b.GetForShare(metadataKey, tx.lockDurability)
315-
} else {
316-
b.Get(metadataKey)
317-
}
318308

319-
// Run the batch and get the partition metadata from results.
320-
if err := tx.kv.Run(ctx, b); err != nil {
321-
return cspann.PartitionMetadata{},
322-
errors.Wrapf(err, "getting partition metadata for %d", partitionKey)
309+
// By acquiring a shared lock on metadata key, we prevent splits/merges of
310+
// this partition from conflicting with the add operation.
311+
b, err := func() (b *kv.Batch, err error) {
312+
// TODO(mw5h): Add to an existing batch instead of starting a new one.
313+
b = tx.kv.NewBatch()
314+
315+
if tx.kv.Sender().GetSteppingMode(ctx) == kv.SteppingEnabled {
316+
// When there are multiple inserts within the same SQL statement, the
317+
// first insert will trigger creation of the metadata record. However,
318+
// subsequent inserts will not be able to "see" this record, since they
319+
// will read at a lower sequence number than the metadata record was
320+
// written. Handle this issue by temporarily stepping the read sequence
321+
// number so the latest metadata can be read.
322+
prevSeqNum := tx.kv.GetReadSeqNum()
323+
if err = tx.kv.Step(ctx, false /* allowReadTimestampStep */); err != nil {
324+
return nil, err
325+
}
326+
defer func() {
327+
// Restore the original sequence number.
328+
if readErr := tx.kv.SetReadSeqNum(prevSeqNum); err != nil {
329+
err = errors.CombineErrors(err, readErr)
330+
}
331+
}()
332+
}
333+
334+
if forUpdate {
335+
b.GetForShare(metadataKey, tx.lockDurability)
336+
} else {
337+
b.Get(metadataKey)
338+
}
339+
340+
// Run the batch.
341+
if err := tx.kv.Run(ctx, b); err != nil {
342+
return nil, errors.Wrapf(err, "getting partition metadata for %d", partitionKey)
343+
}
344+
345+
return b, nil
346+
}()
347+
if err != nil {
348+
return cspann.PartitionMetadata{}, err
323349
}
324350

325351
// If we're preparing to update the root partition, then lazily create its
@@ -591,10 +617,10 @@ func (tx *Txn) GetFullVectors(
591617
}
592618

593619
// createRootPartition uses the KV CPut operation to create metadata for the
594-
// root partition, and then returns that metadata.
595-
//
596-
// NOTE: CPut always "sees" the latest write of the metadata record, even if the
597-
// timestamp of that write is higher than this transaction's.
620+
// root partition, and then returns that metadata. If another transaction races
621+
// and creates the root partition at a higher timestamp, createRootPartition
622+
// returns a WriteTooOld error, which will trigger a refresh of this transaction
623+
// at the higher timestamp.
598624
func (tx *Txn) createRootPartition(
599625
ctx context.Context, metadataKey roachpb.Key,
600626
) (cspann.PartitionMetadata, error) {
@@ -606,22 +632,11 @@ func (tx *Txn) createRootPartition(
606632
}
607633
encoded := vecencoding.EncodeMetadataValue(metadata)
608634

609-
// Use CPutAllowingIfNotExists in order to handle the case where the same
610-
// transaction inserts multiple vectors (e.g. multiple VALUES rows). In that
611-
// case, the first row will trigger creation of the metadata record. However,
612-
// subsequent inserts will not be able to "see" this record, since they will
613-
// read at a lower sequence number than the metadata record was written.
614-
// However, CPutAllowingIfNotExists will read at the higher sequence number
615-
// and see that the record was already created.
616-
//
617-
// On the other hand, if a different transaction wrote the record, it will
618-
// have a higher timestamp, and that will trigger a WriteTooOld error.
619-
// Transactions which lose that race need to be refreshed.
620-
var roachval roachpb.Value
621-
roachval.SetBytes(encoded)
622-
b.CPutAllowingIfNotExists(metadataKey, &roachval, roachval.TagAndDataBytes())
635+
// Use CPut to detect the case where another transaction is racing to create
636+
// the root partition. CPut always "sees" the latest version of the metadata
637+
// record.
638+
b.CPut(metadataKey, encoded, nil /* expValue */)
623639
if err := tx.kv.Run(ctx, b); err != nil {
624-
// Lost the race to a different transaction.
625640
return cspann.PartitionMetadata{}, errors.Wrapf(err, "creating root partition metadata")
626641
}
627642
return metadata, nil

0 commit comments

Comments
 (0)