Skip to content

Commit 161a3f7

Browse files
committed
sql/schemachanger: fix index creation failures after pause/resume
Previously, when the index backfiller was tracking completed spans, the final chunk of a span would cause an update to be emitted that incorrectly indicated the entire span was complete. While this was not an issue with a single ingest goroutine, it caused problems with the introduction of multiple ingest goroutines. As a result, if the job was paused and resumed after this incorrect progress update, the backfiller could skip rows, leading to validation errors. To address this, this patch ensures that progress updates for the final chunk of a span correctly report only the work completed in that chunk. Fixes: #153522 Release note (bug fix): Addressed a bug where index creation could fail due to validation errors if the schema change was retried or paused/resumed during the backfill.
1 parent e4db2dd commit 161a3f7

File tree

4 files changed

+74
-62
lines changed

4 files changed

+74
-62
lines changed

pkg/sql/backfill/backfill.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,19 +1010,21 @@ func (ib *IndexBackfiller) init(
10101010
// that needs to be freed once the returned IndexEntry slice is freed. This is
10111011
// returned for the successful and failure cases. It is the callers responsibility
10121012
// to clear the associated bound account when appropriate.
1013+
// A non-nil resumeKey is returned when there is still work to be done in this
1014+
// span (sp.Key < resumeKey < sp.EndKey), which happens if the entire span does
1015+
// not fit within the batch size.
10131016
func (ib *IndexBackfiller) BuildIndexEntriesChunk(
10141017
ctx context.Context,
10151018
txn *kv.Txn,
10161019
tableDesc catalog.TableDescriptor,
10171020
sp roachpb.Span,
10181021
chunkSize int64,
10191022
traceKV bool,
1020-
) ([]rowenc.IndexEntry, roachpb.Key, int64, error) {
1023+
) (entries []rowenc.IndexEntry, resumeKey roachpb.Key, memUsedPerChunk int64, err error) {
10211024
// This ought to be chunkSize but in most tests we are actually building smaller
10221025
// indexes so use a smaller value.
10231026
const initBufferSize = 1000
10241027
const sizeOfIndexEntry = int64(unsafe.Sizeof(rowenc.IndexEntry{}))
1025-
var memUsedPerChunk int64
10261028

10271029
indexEntriesInChunkInitialBufferSize :=
10281030
sizeOfIndexEntry * initBufferSize * int64(len(ib.added))
@@ -1031,7 +1033,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
10311033
"failed to initialize empty buffer to store the index entries of all rows in the chunk")
10321034
}
10331035
memUsedPerChunk += indexEntriesInChunkInitialBufferSize
1034-
entries := make([]rowenc.IndexEntry, 0, initBufferSize*int64(len(ib.added)))
1036+
entries = make([]rowenc.IndexEntry, 0, initBufferSize*int64(len(ib.added)))
10351037

10361038
var fetcherCols []descpb.ColumnID
10371039
for i, c := range ib.cols {
@@ -1264,7 +1266,6 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
12641266
ib.ShrinkBoundAccount(ctx, shrinkSize)
12651267
memUsedPerChunk -= shrinkSize
12661268

1267-
var resumeKey roachpb.Key
12681269
if fetcher.Key() != nil {
12691270
resumeKey = make(roachpb.Key, len(fetcher.Key()))
12701271
copy(resumeKey, fetcher.Key())

pkg/sql/index_backfiller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
106106

107107
knobs := &ib.execCfg.DistSQLSrv.TestingKnobs
108108
if knobs.RunBeforeIndexBackfillProgressUpdate != nil {
109-
knobs.RunBeforeIndexBackfillProgressUpdate(ctx, progress.CompletedSpans)
109+
knobs.RunBeforeIndexBackfillProgressUpdate(ctx, meta.BulkProcessorProgress.CompletedSpans)
110110
}
111111
return tracker.SetBackfillProgress(ctx, progress)
112112
}

pkg/sql/indexbackfiller_test.go

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import (
3737
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
3838
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
3939
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
40+
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
41+
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan"
4042
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
4143
"github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype"
4244
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
@@ -582,7 +584,8 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
582584

583585
ctx := context.Background()
584586
backfillProgressCompletedCh := make(chan []roachpb.Span)
585-
const numSpans = 100
587+
const numRows = 100
588+
const numSpans = 20
586589
var isBlockingBackfillProgress atomic.Bool
587590
isBlockingBackfillProgress.Store(true)
588591

@@ -611,22 +614,35 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
611614
}
612615
return nil
613616
},
617+
AfterStage: func(p scplan.Plan, stageIdx int) error {
618+
if p.Stages[stageIdx].Type() != scop.BackfillType || !isBlockingBackfillProgress.Load() {
619+
return nil
620+
}
621+
isBlockingBackfillProgress.Store(false)
622+
close(backfillProgressCompletedCh)
623+
return nil
624+
},
614625
},
615626
},
616627
})
617628
defer tc.Stopper().Stop(ctx)
618629

619630
_, err := db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.batch_size = 10`)
620631
require.NoError(t, err)
632+
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.ingest_concurrency=4`)
633+
require.NoError(t, err)
621634
// Ensure that we checkpoint our progress to the backfill job so that
622635
// RESUMEs can get an up-to-date backfill progress.
623636
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.checkpoint_interval = '10ms'`)
624637
require.NoError(t, err)
625638
_, err = db.Exec(`CREATE TABLE t(i INT PRIMARY KEY)`)
626639
require.NoError(t, err)
627-
_, err = db.Exec(`INSERT INTO t SELECT generate_series(1, $1)`, numSpans)
640+
// Have a 100 splits each containing a 100 rows.
641+
_, err = db.Exec(`INSERT INTO t SELECT generate_series(1, $1)`, (numRows*numSpans)+1)
628642
require.NoError(t, err)
629-
_, err = db.Exec(`ALTER TABLE t SPLIT AT TABLE generate_series(1, $1)`, numSpans)
643+
for split := 0; split < numSpans; split++ {
644+
_, err = db.Exec(`ALTER TABLE t SPLIT AT VALUES ($1)`, numRows*numSpans)
645+
}
630646
require.NoError(t, err)
631647
var descID catid.DescID
632648
descIDRow := db.QueryRow(`SELECT 't'::regclass::oid`)
@@ -670,21 +686,28 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
670686
}
671687

672688
var completedSpans roachpb.SpanGroup
689+
var observedSpans []roachpb.Span
673690
receiveProgressUpdate := func() {
674-
progressUpdate := <-backfillProgressCompletedCh
675-
676-
// Make sure the progress update does not contain overlapping spans.
677-
for i, span1 := range progressUpdate {
678-
for j, span2 := range progressUpdate {
679-
if i <= j {
680-
continue
681-
}
682-
if span1.Overlaps(span2) {
683-
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
691+
updateCount := 2
692+
for isBlockingBackfillProgress.Load() && updateCount > 0 {
693+
progressUpdate := <-backfillProgressCompletedCh
694+
// Make sure the progress update does not contain overlapping spans.
695+
for i, span1 := range progressUpdate {
696+
for j, span2 := range progressUpdate {
697+
if i == j {
698+
continue
699+
}
700+
if span1.Overlaps(span2) {
701+
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
702+
}
684703
}
685704
}
705+
hasMoreSpans := completedSpans.Add(progressUpdate...)
706+
if hasMoreSpans {
707+
updateCount -= 1
708+
}
709+
observedSpans = append(observedSpans, progressUpdate...)
686710
}
687-
completedSpans.Add(progressUpdate...)
688711
}
689712

690713
ensureCompletedSpansAreCheckpointed := func() {
@@ -718,45 +741,35 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
718741
}, 5*time.Second)
719742
}
720743

721-
// Let the backfill step forward a bit before we do our PAUSE/RESUME
722-
// dance.
723-
for i := 0; i < 2; i++ {
744+
for isBlockingBackfillProgress.Load() {
724745
receiveProgressUpdate()
725-
}
726-
727-
ensureCompletedSpansAreCheckpointed()
728-
t.Logf("pausing backfill")
729-
_, err = db.Exec(`PAUSE JOB $1`, jobID)
730-
require.NoError(t, err)
731-
ensureJobState("paused")
732-
733-
t.Logf("resuming backfill")
734-
_, err = db.Exec(`RESUME JOB $1`, jobID)
735-
require.NoError(t, err)
736-
ensureJobState("running")
746+
ensureCompletedSpansAreCheckpointed()
747+
t.Logf("pausing backfill")
748+
_, err = db.Exec(`PAUSE JOB $1`, jobID)
749+
require.NoError(t, err)
750+
ensureJobState("paused")
737751

738-
// Step forward again before re-pausing.
739-
for i := 0; i < 2; i++ {
740-
receiveProgressUpdate()
752+
t.Logf("resuming backfill")
753+
_, err = db.Exec(`RESUME JOB $1`, jobID)
754+
require.NoError(t, err)
755+
ensureJobState("running")
741756
}
742-
743-
ensureCompletedSpansAreCheckpointed()
744-
isBlockingBackfillProgress.Store(false)
745-
746-
t.Logf("pausing backfill")
747-
_, err = db.Exec(`PAUSE JOB $1`, jobID)
748-
require.NoError(t, err)
749-
ensureJobState("paused")
750-
751-
t.Logf("resuming backfill")
752-
_, err = db.Exec(`RESUME JOB $1`, jobID)
753-
require.NoError(t, err)
754-
ensureJobState("running")
755-
756757
// Now we can wait for the job to succeed
757758
ensureJobState("succeeded")
758-
759759
if err = g.Wait(); err != nil {
760760
require.NoError(t, err)
761761
}
762+
// Make sure the spans we are adding do not overlap otherwise, this indicates
763+
// a bug. Where we computed chunks incorrectly. Each chunk should be an independent
764+
// piece of work.
765+
for i, span1 := range observedSpans {
766+
for j, span2 := range observedSpans {
767+
if i == j {
768+
continue
769+
}
770+
if span1.Overlaps(span2) {
771+
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
772+
}
773+
}
774+
}
762775
}

pkg/sql/rowexec/indexbackfiller.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,8 @@ func (ib *indexBackfiller) constructIndexEntries(
162162

163163
// Identify the Span for which we have constructed index entries. This is
164164
// used for reporting progress and updating the job details.
165-
completedSpan := ib.spec.Spans[i]
165+
completedSpan := roachpb.Span{Key: startKey, EndKey: ib.spec.Spans[i].EndKey}
166166
if todo.Key != nil {
167-
completedSpan.Key = startKey
168167
completedSpan.EndKey = todo.Key
169168
}
170169

@@ -529,21 +528,20 @@ func (ib *indexBackfiller) getProgressReportInterval() time.Duration {
529528
return indexBackfillProgressReportInterval
530529
}
531530

532-
// buildIndexEntryBatch constructs the index entries for a single indexBatch.
531+
// buildIndexEntryBatch constructs the index entries for a single indexBatch for
532+
// the given span. A non-nil resumeKey is returned when there is still work to
533+
// be done in this span (sp.Key < resumeKey < sp.EndKey), which happens if the
534+
// entire span does not fit within the batch size.
533535
func (ib *indexBackfiller) buildIndexEntryBatch(
534536
tctx context.Context, sp roachpb.Span, readAsOf hlc.Timestamp,
535-
) (roachpb.Key, []rowenc.IndexEntry, int64, error) {
537+
) (resumeKey roachpb.Key, entries []rowenc.IndexEntry, memUsedBuildingBatch int64, err error) {
536538
knobs := &ib.flowCtx.Cfg.TestingKnobs
537539
if knobs.RunBeforeBackfillChunk != nil {
538540
if err := knobs.RunBeforeBackfillChunk(sp); err != nil {
539541
return nil, nil, 0, err
540542
}
541543
}
542544

543-
var memUsedBuildingBatch int64
544-
var key roachpb.Key
545-
var entries []rowenc.IndexEntry
546-
547545
br := indexBatchRetry{
548546
nextChunkSize: ib.spec.ChunkSize,
549547
// Memory used while building index entries is released by another goroutine
@@ -582,7 +580,7 @@ func (ib *indexBackfiller) buildIndexEntryBatch(
582580

583581
// TODO(knz): do KV tracing in DistSQL processors.
584582
var err error
585-
entries, key, memUsedBuildingBatch, err = ib.BuildIndexEntriesChunk(
583+
entries, resumeKey, memUsedBuildingBatch, err = ib.BuildIndexEntriesChunk(
586584
ctx, txn.KV(), ib.desc, sp, br.nextChunkSize, false, /* traceKV */
587585
)
588586
return err
@@ -598,7 +596,7 @@ func (ib *indexBackfiller) buildIndexEntryBatch(
598596
log.VEventf(ctx, 3, "index backfill stats: entries %d, prepare %+v",
599597
len(entries), prepTime)
600598

601-
return key, entries, memUsedBuildingBatch, nil
599+
return resumeKey, entries, memUsedBuildingBatch, nil
602600
}
603601

604602
// Resume is part of the execinfra.Processor interface.

0 commit comments

Comments
 (0)