Skip to content

Commit 53a36eb

Browse files
committed
builtins: add process_vector_index_fixups builtin
Add a new builtin function that waits until all pending fixups have been processed for a given index. This is used for testing. Previously, we were processing fixups in deterministic tests by calling ProcessFixups just before inserting vectors or searching for them. However, that was causing deadlocks, because fixups were being processed in the same transaction that was updating the index. The vector_index test is now updated to call the new builtin when it needs to force fixup processing. Epic: CRDB-42943 Release note: None
1 parent fe7e2f9 commit 53a36eb

File tree

8 files changed

+88
-14
lines changed

8 files changed

+88
-14
lines changed

pkg/sql/faketreeeval/evalctx.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,13 @@ func (ep *DummyEvalPlanner) RetryCounter() int {
577577
return 0
578578
}
579579

580+
// ProcessVectorIndexFixups is part of the eval.Planner interface.
581+
func (ep *DummyEvalPlanner) ProcessVectorIndexFixups(
582+
ctx context.Context, tableID descpb.ID, indexID descpb.IndexID,
583+
) error {
584+
return nil
585+
}
586+
580587
// DummyPrivilegedAccessor implements the tree.PrivilegedAccessor interface by returning errors.
581588
type DummyPrivilegedAccessor struct{}
582589

pkg/sql/logictest/testdata/logic_test/vector_index

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,12 +539,37 @@ INSERT INTO exec_test (a, b, vec1) VALUES
539539
(2, 1, '[4, 5, 6]'),
540540
(3, 2, '[7, 8, 9]'),
541541
(4, 2, '[10, 11, 12]'),
542-
(5, 2, '[13, 14, 15]'),
542+
(5, 2, '[13, 14, 15]');
543+
544+
statement ok
545+
INSERT INTO exec_test (a, b, vec1) VALUES
543546
(6, NULL, '[16, 17, 18]'),
544547
(7, NULL, '[1, 1, 1]'),
545548
(8, NULL, NULL),
546549
(9, 3, NULL);
547550

551+
# Define
552+
statement ok
553+
CREATE PROCEDURE process_vector_index_fixups_for(desc_name STRING, idx_name STRING)
554+
LANGUAGE SQL
555+
AS $$
556+
SELECT crdb_internal.process_vector_index_fixups(
557+
(SELECT id FROM system.namespace WHERE name = desc_name),
558+
(
559+
SELECT index_id
560+
FROM crdb_internal.table_indexes
561+
WHERE descriptor_name = desc_name AND index_name = idx_name
562+
)
563+
)
564+
$$;
565+
566+
# Process split fixups for the two indexes.
567+
statement ok
568+
CALL process_vector_index_fixups_for('exec_test', 'idx1');
569+
570+
statement ok
571+
CALL process_vector_index_fixups_for('exec_test', 'idx2');
572+
548573
statement error pgcode 22000 pq: expected 3 dimensions, not 1
549574
INSERT INTO exec_test (a, b, vec1) VALUES (10, 1, '[1]');
550575

@@ -765,9 +790,22 @@ INSERT INTO distance_metrics (a, v) VALUES
765790
(1, '[0, 0]'),
766791
(2, '[-2, -2]'),
767792
(3, '[2, 2]'),
768-
(4, '[4, 4]'),
793+
(4, '[4, 4]');
794+
795+
statement ok
796+
INSERT INTO distance_metrics (a, v) VALUES
769797
(5, '[-2, 2]');
770798

799+
# Process split fixups for the three indexes.
800+
statement ok
801+
CALL process_vector_index_fixups_for('distance_metrics', 'idx1');
802+
803+
statement ok
804+
CALL process_vector_index_fixups_for('distance_metrics', 'idx2');
805+
806+
statement ok
807+
CALL process_vector_index_fixups_for('distance_metrics', 'idx3');
808+
771809
# Results using L2 distance.
772810
query ITF rowsort
773811
SELECT a, v, round(v <-> '[2, 2]', 2) dist FROM distance_metrics ORDER BY v <-> '[2, 2]' LIMIT 5;

pkg/sql/planner.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,3 +1077,15 @@ func (p *planner) ExtendHistoryRetention(ctx context.Context, jobID jobspb.JobID
10771077
func (p *planner) RetryCounter() int {
10781078
return p.autoRetryCounter + p.autoRetryStmtCounter
10791079
}
1080+
1081+
// ProcessVectorIndexFixups is part of the eval.Planner interface.
1082+
func (p *planner) ProcessVectorIndexFixups(
1083+
ctx context.Context, tableID descpb.ID, indexID descpb.IndexID,
1084+
) error {
1085+
vi, err := p.execCfg.VecIndexManager.Get(ctx, tableID, indexID)
1086+
if err != nil {
1087+
return err
1088+
}
1089+
vi.ProcessFixups()
1090+
return nil
1091+
}

pkg/sql/sem/builtins/builtins.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9357,6 +9357,30 @@ WHERE object_id = table_descriptor_id
93579357
},
93589358
},
93599359
),
9360+
"crdb_internal.process_vector_index_fixups": makeBuiltin(
9361+
tree.FunctionProperties{
9362+
Category: builtinconstants.CategoryTesting,
9363+
Undocumented: true,
9364+
},
9365+
tree.Overload{
9366+
Types: tree.ParamTypes{
9367+
{Name: "table_id", Typ: types.Int},
9368+
{Name: "index_id", Typ: types.Int},
9369+
},
9370+
ReturnType: tree.FixedReturnType(types.Void),
9371+
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
9372+
tableID := descpb.ID(tree.MustBeDInt(args[0]))
9373+
indexID := descpb.IndexID(tree.MustBeDInt(args[1]))
9374+
err := evalCtx.Planner.ProcessVectorIndexFixups(ctx, tableID, indexID)
9375+
if err != nil {
9376+
return nil, err
9377+
}
9378+
return tree.DVoidDatum, nil
9379+
},
9380+
Info: "Waits until all outstanding fixups for the vector index with the given ID have been processed.",
9381+
Volatility: volatility.Volatile,
9382+
},
9383+
),
93609384
}
93619385

93629386
var lengthImpls = func(incBitOverload bool) builtinDefinition {

pkg/sql/sem/builtins/fixed_oids.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2795,6 +2795,7 @@ var builtinOidsArray = []string{
27952795
2840: `lead(val: ltree, n: int) -> ltree`,
27962796
2841: `lead(val: ltree, n: int, default: ltree) -> ltree`,
27972797
2842: `last_value(val: ltree) -> ltree`,
2798+
2843: `crdb_internal.process_vector_index_fixups(table_id: int, index_id: int) -> void`,
27982799
}
27992800

28002801
var builtinOidsBySignature map[string]oid.Oid

pkg/sql/sem/eval/deps.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,10 @@ type Planner interface {
455455

456456
// RetryCounter is the number of times this statement has been retried.
457457
RetryCounter() int
458+
459+
// ProcessVectorIndexFixups waits until all outstanding fixups for the vector
460+
// index with the given ID have been processed.
461+
ProcessVectorIndexFixups(ctx context.Context, tableID descpb.ID, indexID descpb.IndexID) error
458462
}
459463

460464
// InternalRows is an iterator interface that's exposed by the internal

pkg/sql/vecindex/mutation_searcher.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,6 @@ func (s *MutationSearcher) Init(
5252
s.txn.Init(evalCtx, idx.Store().(*vecstore.Store), txn, getFullVectorsFetchSpec)
5353
s.idxCtx.Init(&s.txn)
5454
s.evalCtx = evalCtx
55-
56-
// If the index is deterministic, then synchronously run the background worker
57-
// to process any pending fixups.
58-
if s.idx.Options().IsDeterministic {
59-
s.idx.ProcessFixups()
60-
}
6155
}
6256

6357
// SearchForInsert triggers a search for the partition in which to insert the

pkg/sql/vecindex/searcher.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,6 @@ func (s *Searcher) Init(
6262
}
6363
s.searchSet.MaxResults, s.searchSet.MaxExtraResults =
6464
cspann.IncreaseRerankResults(baseBeamSize, maxResults, rerankMultiplier)
65-
66-
// If the index is deterministic, then synchronously run the background worker
67-
// to process any pending fixups.
68-
if idx.Options().IsDeterministic {
69-
s.idx.ProcessFixups()
70-
}
7165
}
7266

7367
// Search triggers a search over the index for the given vector, within the

0 commit comments

Comments
 (0)