Skip to content

Commit d25d3cd

Browse files
craig[bot]mw5h
andcommitted
Merge #144588
144588: backfill: cleanup vector index backfill r=mw5h a=mw5h Previously a patch to implement vector index backfill was committed and backported to 25.2 without consulting SQL Foundations. It turns out that they have some thoughts on the patch. This patch addresses their comments. The changes here mostly address readability of the code, but there was at least one issue where progress reporting would not work correctly. Epic: CRDB-42943 Release note: None Co-authored-by: Matt White <[email protected]>
2 parents 7a28a7b + 21fdc6a commit d25d3cd

File tree

2 files changed

+11
-10
lines changed

2 files changed

+11
-10
lines changed

pkg/sql/backfill/backfill.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ type VectorIndexHelper struct {
470470
vectorOrd int
471471
// centroid is an all zeros centroid of the appropriate dimension for the vector
472472
// column. It's used to encode the vector in the reader. The writer will
473-
// re-encode the vector with the centroid for the parittionselected.
473+
// re-encode the vector with the centroid for the partition selected.
474474
centroid vector.T
475475
// number of non-vector index key columns
476476
numPrefixCols int
@@ -516,7 +516,7 @@ func (vih *VectorIndexHelper) ReEncodeVector(
516516
key.Level = cspann.LeafLevel
517517
quantizedVector, ok := tree.AsDBytes(searcher.EncodedVector())
518518
if !ok {
519-
panic("expected encoded vector to be of type DBytes")
519+
return &rowenc.IndexEntry{}, errors.AssertionFailedf("expected encoded vector to be of type DBytes")
520520
}
521521

522522
outputEntry.Key = append(outputEntry.Key[:0], vih.indexPrefix...)

pkg/sql/rowexec/indexbackfiller.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func (ib *indexBackfiller) constructIndexEntries(
181181
return nil
182182
}
183183

184-
func (ib *indexBackfiller) maybeReencodeVectorIndexEntry(
184+
func (ib *indexBackfiller) maybeReencodeAndWriteVectorIndexEntry(
185185
ctx context.Context, tmpEntry *rowenc.IndexEntry, indexEntry *rowenc.IndexEntry,
186186
) (bool, error) {
187187
indexID, _, err := rowenc.DecodeIndexKeyPrefix(ib.flowCtx.EvalCtx.Codec, ib.desc.GetID(), indexEntry.Key)
@@ -271,12 +271,13 @@ func (ib *indexBackfiller) ingestIndexEntries(
271271
// When the bulk adder flushes, the spans which were previously marked as
272272
// "added" can now be considered "completed", and be sent back to the
273273
// coordinator node as part of the next progress report.
274-
adder.SetOnFlush(func(_ kvpb.BulkOpSummary) {
274+
flushAddedSpans := func(_ kvpb.BulkOpSummary) {
275275
mu.Lock()
276276
defer mu.Unlock()
277277
mu.completedSpans = append(mu.completedSpans, mu.addedSpans...)
278278
mu.addedSpans = nil
279-
})
279+
}
280+
adder.SetOnFlush(flushAddedSpans)
280281

281282
pushProgress := func() {
282283
mu.Lock()
@@ -320,9 +321,9 @@ func (ib *indexBackfiller) ingestIndexEntries(
320321
// TODO(mw5h): batch up multiple index entries into a single batch.
321322
// As is, we insert a single vector per batch, which is very slow.
322323
if len(ib.VectorIndexes) > 0 {
323-
isVectorIndex, err := ib.maybeReencodeVectorIndexEntry(ctx, &vectorInputEntry, &indexEntry)
324+
isVectorIndex, err := ib.maybeReencodeAndWriteVectorIndexEntry(ctx, &vectorInputEntry, &indexEntry)
324325
if err != nil {
325-
return err
326+
return ib.wrapDupError(ctx, err)
326327
} else if isVectorIndex {
327328
continue
328329
}
@@ -372,10 +373,10 @@ func (ib *indexBackfiller) ingestIndexEntries(
372373

373374
// If there are only vector indexes, we push the completed spans manually so that
374375
// progress reporting works.
376+
// TODO(mw5h): this is a hack to get progress reporting to work for vector only
377+
// backfills. We should remove this once we have a more permanent solution.
375378
if ib.VectorOnly {
376-
mu.Lock()
377-
mu.completedSpans = append(mu.completedSpans, mu.addedSpans...)
378-
mu.Unlock()
379+
flushAddedSpans(kvpb.BulkOpSummary{})
379380
}
380381
if err := adder.Flush(ctx); err != nil {
381382
return ib.wrapDupError(ctx, err)

0 commit comments

Comments
 (0)