Skip to content

Commit 972e20f

Browse files
committed
vecstore: use span.Builder in implementation of GetFullVectors
Previously, the vector store stored the table and index descriptors for the vector index it was associated with. Since the store itself did not have a lease on this descriptor and would outlive the transaction that caused the store to be created, we could end up using a stale table descriptor for vector operations. This patch moves table information into the vector transaction in the form of an index fetch spec created by a leaseholder on the table. The vecindex transaction is expected to have a lifetime shorter than the lease held by the caller, so the descriptors that generate this fetch spec will not go stale. Addtionally, we generate the needed family IDs for a span.Splitter(), to avoid reading families that we don't require. This is also included with the fetch spec. This patch also: * Introduces a proto buffer to hold the index fetch spec and family IDs. * Introduces a function to initialize this proto buffer. * Introduces a testing API into the vecstore so that tests can provide fake family descriptors to vecstore. Fixes: #146046 Release note (bug fix): A bug where using column families on tables with vector indexes would cause the index to fail to return results has been fixed.
1 parent 276a7a7 commit 972e20f

26 files changed

+699
-156
lines changed

pkg/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2379,6 +2379,7 @@ GO_TARGETS = [
23792379
"//pkg/sql/vecindex/vecencoding:vecencoding_test",
23802380
"//pkg/sql/vecindex/vecpb:vecpb",
23812381
"//pkg/sql/vecindex/vecpb:vecpb_test",
2382+
"//pkg/sql/vecindex/vecstore/vecstorepb:vecstorepb",
23822383
"//pkg/sql/vecindex/vecstore:vecstore",
23832384
"//pkg/sql/vecindex/vecstore:vecstore_test",
23842385
"//pkg/sql/vecindex:vecindex",

pkg/gen/protobuf.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ PROTOBUF_SRCS = [
7979
"//pkg/sql/vecindex/cspann/quantize:quantize_go_proto",
8080
"//pkg/sql/vecindex/cspann:cspann_go_proto",
8181
"//pkg/sql/vecindex/vecpb:vecpb_go_proto",
82+
"//pkg/sql/vecindex/vecstore/vecstorepb:vecstorepb_go_proto",
8283
"//pkg/storage/enginepb:enginepb_go_proto",
8384
"//pkg/testutils/grpcutils:grpcutils_go_proto",
8485
"//pkg/ts/catalog:catalog_go_proto",

pkg/sql/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,7 @@ go_library(
544544
"//pkg/sql/types",
545545
"//pkg/sql/vecindex",
546546
"//pkg/sql/vecindex/vecpb",
547+
"//pkg/sql/vecindex/vecstore",
547548
"//pkg/sql/vtable",
548549
"//pkg/storage",
549550
"//pkg/storage/enginepb",

pkg/sql/backfill/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ go_library(
4545
"//pkg/sql/vecindex",
4646
"//pkg/sql/vecindex/cspann",
4747
"//pkg/sql/vecindex/vecencoding",
48+
"//pkg/sql/vecindex/vecstore",
49+
"//pkg/sql/vecindex/vecstore/vecstorepb",
4850
"//pkg/util/admission/admissionpb",
4951
"//pkg/util/ctxgroup",
5052
"//pkg/util/hlc",
@@ -81,6 +83,7 @@ go_test(
8183
"//pkg/sql/catalog/tabledesc",
8284
"//pkg/sql/execinfra",
8385
"//pkg/sql/sem/catid",
86+
"//pkg/sql/sem/eval",
8487
"//pkg/testutils/serverutils",
8588
"//pkg/testutils/sqlutils",
8689
"//pkg/testutils/testcluster",

pkg/sql/backfill/backfill.go

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"time"
1313
"unsafe"
1414

15-
"github.com/cockroachdb/cockroach/pkg/keys"
1615
"github.com/cockroachdb/cockroach/pkg/kv"
1716
"github.com/cockroachdb/cockroach/pkg/roachpb"
1817
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -39,6 +38,8 @@ import (
3938
"github.com/cockroachdb/cockroach/pkg/sql/vecindex"
4039
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/cspann"
4140
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/vecencoding"
41+
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/vecstore"
42+
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/vecstore/vecstorepb"
4243
"github.com/cockroachdb/cockroach/pkg/util/log"
4344
"github.com/cockroachdb/cockroach/pkg/util/mon"
4445
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -477,8 +478,12 @@ type VectorIndexHelper struct {
477478
numPrefixCols int
478479
// vecIndex is the vector index retrieved from the vector index manager.
479480
vecIndex *cspann.Index
481+
// fullVecFetchSpec is the fetch spec for the full vector.
482+
fullVecFetchSpec vecstorepb.GetFullVectorsFetchSpec
480483
// indexPrefix are the prefix bytes for this index (/Tenant/Table/Index).
481484
indexPrefix []byte
485+
// evalCtx is the evaluation context used for vector operations.
486+
evalCtx *eval.Context
482487
}
483488

484489
// ReEncodeVector takes a rowenc.indexEntry, extracts the key values, unquantized
@@ -509,7 +514,8 @@ func (vih *VectorIndexHelper) ReEncodeVector(
509514

510515
// Locate a new partition for the key and re-encode the vector.
511516
var searcher vecindex.MutationSearcher
512-
searcher.Init(vih.vecIndex, txn)
517+
searcher.Init(vih.evalCtx, vih.vecIndex, txn, &vih.fullVecFetchSpec)
518+
513519
if err := searcher.SearchForInsert(ctx, roachpb.Key(key.Prefix), vec); err != nil {
514520
return &rowenc.IndexEntry{}, err
515521
}
@@ -598,7 +604,7 @@ func (ib *IndexBackfiller) InitForLocalUse(
598604
) error {
599605

600606
// Initialize ib.added.
601-
if err := ib.initIndexes(ctx, evalCtx.Codec, desc, nil /* allowList */, 0 /*sourceIndex*/, nil); err != nil {
607+
if err := ib.initIndexes(ctx, evalCtx, desc, nil /* allowList */, 0 /*sourceIndex*/, nil); err != nil {
602608
return err
603609
}
604610

@@ -744,7 +750,14 @@ func (ib *IndexBackfiller) InitForDistributedUse(
744750
evalCtx := flowCtx.NewEvalCtx()
745751

746752
// Initialize ib.added.
747-
if err := ib.initIndexes(ctx, evalCtx.Codec, desc, allowList, sourceIndexID, flowCtx.Cfg.VecIndexManager.(*vecindex.Manager)); err != nil {
753+
if err := ib.initIndexes(
754+
ctx,
755+
evalCtx,
756+
desc,
757+
allowList,
758+
sourceIndexID,
759+
flowCtx.Cfg.VecIndexManager.(*vecindex.Manager),
760+
); err != nil {
748761
return err
749762
}
750763

@@ -838,7 +851,7 @@ func (ib *IndexBackfiller) initCols(desc catalog.TableDescriptor) (err error) {
838851
// If `allowList` is nil, we add all adding index mutations.
839852
func (ib *IndexBackfiller) initIndexes(
840853
ctx context.Context,
841-
codec keys.SQLCodec,
854+
evalCtx *eval.Context,
842855
desc catalog.TableDescriptor,
843856
allowList []catid.IndexID,
844857
sourceIndexID catid.IndexID,
@@ -867,7 +880,7 @@ func (ib *IndexBackfiller) initIndexes(
867880
(allowListAsSet.Empty() || allowListAsSet.Contains(m.AsIndex().GetID())) {
868881
idx := m.AsIndex()
869882
ib.added = append(ib.added, idx)
870-
keyPrefix := rowenc.MakeIndexKeyPrefix(codec, desc.GetID(), idx.GetID())
883+
keyPrefix := rowenc.MakeIndexKeyPrefix(evalCtx.Codec, desc.GetID(), idx.GetID())
871884
ib.keyPrefixes = append(ib.keyPrefixes, keyPrefix)
872885
}
873886
}
@@ -896,13 +909,24 @@ func (ib *IndexBackfiller) initIndexes(
896909
return err
897910
}
898911

899-
ib.VectorIndexes[idx.GetID()] = VectorIndexHelper{
912+
helper := VectorIndexHelper{
900913
vectorOrd: vectorCol.Ordinal(),
901914
centroid: make(vector.T, idx.GetVecConfig().Dims),
902915
numPrefixCols: idx.NumKeyColumns() - 1,
903916
vecIndex: vecIndex,
904-
indexPrefix: rowenc.MakeIndexKeyPrefix(codec, desc.GetID(), idx.GetID()),
917+
indexPrefix: rowenc.MakeIndexKeyPrefix(evalCtx.Codec, desc.GetID(), idx.GetID()),
918+
evalCtx: ib.evalCtx,
919+
}
920+
if err := vecstore.InitGetFullVectorsFetchSpec(
921+
&helper.fullVecFetchSpec,
922+
evalCtx,
923+
desc,
924+
idx,
925+
ib.sourceIndex,
926+
); err != nil {
927+
return err
905928
}
929+
ib.VectorIndexes[idx.GetID()] = helper
906930
}
907931

908932
return nil

pkg/sql/backfill/index_backfiller_cols_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1616
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
1717
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
18+
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
1819
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
1920
"github.com/stretchr/testify/require"
2021
)
@@ -418,7 +419,7 @@ func TestInitIndexesAllowList(t *testing.T) {
418419
ib := &IndexBackfiller{}
419420
err := ib.initIndexes(
420421
context.Background(),
421-
keys.SystemSQLCodec,
422+
&eval.Context{Codec: keys.SystemSQLCodec},
422423
desc,
423424
nil, /* allowList */
424425
0, /* sourceIndexID */
@@ -434,7 +435,7 @@ func TestInitIndexesAllowList(t *testing.T) {
434435
ib := &IndexBackfiller{}
435436
err := ib.initIndexes(
436437
context.Background(),
437-
keys.SystemSQLCodec,
438+
&eval.Context{Codec: keys.SystemSQLCodec},
438439
desc,
439440
[]catid.IndexID{3}, /* allowList */
440441
0, /* sourceIndexID */

pkg/sql/distsql_physical_planner.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
5353
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
5454
"github.com/cockroachdb/cockroach/pkg/sql/types"
55+
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/vecstore"
5556
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
5657
"github.com/cockroachdb/cockroach/pkg/util/encoding"
5758
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -4602,6 +4603,16 @@ func (dsp *DistSQLPlanner) planVectorSearch(
46024603
return err
46034604
}
46044605

4606+
if err := vecstore.InitGetFullVectorsFetchSpec(
4607+
&spec.GetFullVectorsFetchSpec,
4608+
planCtx.EvalContext(),
4609+
planInfo.table,
4610+
planInfo.index,
4611+
planInfo.table.GetPrimaryIndex(),
4612+
); err != nil {
4613+
return err
4614+
}
4615+
46054616
// Execute the vector search on the gateway node.
46064617
corePlacement := []physicalplan.ProcessorCorePlacement{{
46074618
SQLInstanceID: dsp.gatewaySQLInstanceID,
@@ -4675,6 +4686,16 @@ func (dsp *DistSQLPlanner) planVectorMutationSearch(
46754686
return err
46764687
}
46774688

4689+
if err := vecstore.InitGetFullVectorsFetchSpec(
4690+
&spec.GetFullVectorsFetchSpec,
4691+
planCtx.EvalContext(),
4692+
planInfo.table,
4693+
planInfo.index,
4694+
planInfo.table.GetPrimaryIndex(),
4695+
); err != nil {
4696+
return err
4697+
}
4698+
46784699
// The vector mutation search can be conducted for each row independently, so
46794700
// it's fine to instantiate one instance for each stream.
46804701
pSpec := execinfrapb.ProcessorCoreUnion{VectorMutationSearch: spec}

pkg/sql/execinfrapb/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ proto_library(
100100
"//pkg/sql/inverted:inverted_proto",
101101
"//pkg/sql/sessiondatapb:sessiondatapb_proto",
102102
"//pkg/sql/types:types_proto",
103+
"//pkg/sql/vecindex/vecstore/vecstorepb:vecstorepb_proto",
103104
"//pkg/util/hlc:hlc_proto",
104105
"//pkg/util/optional:optional_proto",
105106
"//pkg/util/tracing/tracingpb:tracingpb_proto",
@@ -133,6 +134,7 @@ go_proto_library(
133134
"//pkg/sql/inverted",
134135
"//pkg/sql/sessiondatapb",
135136
"//pkg/sql/types",
137+
"//pkg/sql/vecindex/vecstore/vecstorepb",
136138
"//pkg/util/hlc",
137139
"//pkg/util/optional",
138140
"//pkg/util/tracing/tracingpb",

pkg/sql/execinfrapb/processors_sql.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import "sql/types/types.proto";
2424
import "sql/execinfrapb/data.proto";
2525
import "sql/execinfrapb/processors_base.proto";
2626
import "sql/inverted/span_expression.proto";
27+
import "sql/vecindex/vecstore/vecstorepb/fullvecfetchspec.proto";
2728
import "util/hlc/timestamp.proto";
2829

2930
// ValuesCoreSpec is the core of a processor that has no inputs and generates
@@ -1099,6 +1100,8 @@ message VectorSearchSpec {
10991100

11001101
// TargetNeighborCount is the number of nearest neighbors to search for.
11011102
optional uint64 target_neighbor_count = 4 [(gogoproto.nullable) = false];
1103+
1104+
optional vecindex.vecstore.vecstorepb.GetFullVectorsFetchSpec get_full_vectors_fetch_spec = 5 [(gogoproto.nullable) = false];
11021105
}
11031106

11041107
// VectorMutationSearchSpec is the specification for a vector-mutation-search
@@ -1128,4 +1131,6 @@ message VectorMutationSearchSpec {
11281131
// IsIndexPut is true if the search is being conducted for an index put,
11291132
// instead of an index del.
11301133
optional bool is_index_put = 7 [(gogoproto.nullable) = false];
1134+
1135+
optional vecindex.vecstore.vecstorepb.GetFullVectorsFetchSpec get_full_vectors_fetch_spec = 8 [(gogoproto.nullable) = false];
11311136
}

0 commit comments

Comments
 (0)