Skip to content

Commit 904baf4

Browse files
committed
sql/schemachanger: fix index creation failures after pause/resume
Previously, when the index backfiller was tracking completed spans, the final chunk of a span would cause an update to be emitted that incorrectly indicated the entire span was complete. While this was not an issue with a single ingest goroutine, it caused problems with the introduction of multiple ingest goroutines. As a result, if the job was paused and resumed after this incorrect progress update, the backfiller could skip rows, leading to validation errors. To address this, this patch ensures that progress updates for the final chunk of a span correctly report only the work completed in that chunk. Fixes: #153522 Release note (bug fix): Addressed a bug where index creation could fail due to validation errors if the schema change was retried or paused/resumed during the backfill.
1 parent 6ad4c58 commit 904baf4

File tree

4 files changed

+74
-62
lines changed

4 files changed

+74
-62
lines changed

pkg/sql/backfill/backfill.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -948,19 +948,21 @@ func (ib *IndexBackfiller) init(
948948
// that needs to be freed once the returned IndexEntry slice is freed. This is
949949
// returned for the successful and failure cases. It is the callers responsibility
950950
// to clear the associated bound account when appropriate.
951+
// A non-nil resumeKey is returned when there is still work to be done in this
952+
// span (sp.Key < resumeKey < sp.EndKey), which happens if the entire span does
953+
// not fit within the batch size.
951954
func (ib *IndexBackfiller) BuildIndexEntriesChunk(
952955
ctx context.Context,
953956
txn *kv.Txn,
954957
tableDesc catalog.TableDescriptor,
955958
sp roachpb.Span,
956959
chunkSize int64,
957960
traceKV bool,
958-
) ([]rowenc.IndexEntry, roachpb.Key, int64, error) {
961+
) (entries []rowenc.IndexEntry, resumeKey roachpb.Key, memUsedPerChunk int64, err error) {
959962
// This ought to be chunkSize but in most tests we are actually building smaller
960963
// indexes so use a smaller value.
961964
const initBufferSize = 1000
962965
const sizeOfIndexEntry = int64(unsafe.Sizeof(rowenc.IndexEntry{}))
963-
var memUsedPerChunk int64
964966

965967
indexEntriesInChunkInitialBufferSize :=
966968
sizeOfIndexEntry * initBufferSize * int64(len(ib.added))
@@ -969,7 +971,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
969971
"failed to initialize empty buffer to store the index entries of all rows in the chunk")
970972
}
971973
memUsedPerChunk += indexEntriesInChunkInitialBufferSize
972-
entries := make([]rowenc.IndexEntry, 0, initBufferSize*int64(len(ib.added)))
974+
entries = make([]rowenc.IndexEntry, 0, initBufferSize*int64(len(ib.added)))
973975

974976
var fetcherCols []descpb.ColumnID
975977
for i, c := range ib.cols {
@@ -1202,7 +1204,6 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
12021204
ib.ShrinkBoundAccount(ctx, shrinkSize)
12031205
memUsedPerChunk -= shrinkSize
12041206

1205-
var resumeKey roachpb.Key
12061207
if fetcher.Key() != nil {
12071208
resumeKey = make(roachpb.Key, len(fetcher.Key()))
12081209
copy(resumeKey, fetcher.Key())

pkg/sql/index_backfiller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
106106

107107
knobs := &ib.execCfg.DistSQLSrv.TestingKnobs
108108
if knobs.RunBeforeIndexBackfillProgressUpdate != nil {
109-
knobs.RunBeforeIndexBackfillProgressUpdate(ctx, progress.CompletedSpans)
109+
knobs.RunBeforeIndexBackfillProgressUpdate(ctx, meta.BulkProcessorProgress.CompletedSpans)
110110
}
111111
return tracker.SetBackfillProgress(ctx, progress)
112112
}

pkg/sql/indexbackfiller_test.go

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import (
3737
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
3838
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
3939
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
40+
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
41+
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan"
4042
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
4143
"github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype"
4244
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
@@ -581,7 +583,8 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
581583

582584
ctx := context.Background()
583585
backfillProgressCompletedCh := make(chan []roachpb.Span)
584-
const numSpans = 100
586+
const numRows = 100
587+
const numSpans = 20
585588
var isBlockingBackfillProgress atomic.Bool
586589
isBlockingBackfillProgress.Store(true)
587590

@@ -610,22 +613,35 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
610613
}
611614
return nil
612615
},
616+
AfterStage: func(p scplan.Plan, stageIdx int) error {
617+
if p.Stages[stageIdx].Type() != scop.BackfillType || !isBlockingBackfillProgress.Load() {
618+
return nil
619+
}
620+
isBlockingBackfillProgress.Store(false)
621+
close(backfillProgressCompletedCh)
622+
return nil
623+
},
613624
},
614625
},
615626
})
616627
defer tc.Stopper().Stop(ctx)
617628

618629
_, err := db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.batch_size = 10`)
619630
require.NoError(t, err)
631+
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.ingest_concurrency=4`)
632+
require.NoError(t, err)
620633
// Ensure that we checkpoint our progress to the backfill job so that
621634
// RESUMEs can get an up-to-date backfill progress.
622635
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.checkpoint_interval = '10ms'`)
623636
require.NoError(t, err)
624637
_, err = db.Exec(`CREATE TABLE t(i INT PRIMARY KEY)`)
625638
require.NoError(t, err)
626-
_, err = db.Exec(`INSERT INTO t SELECT generate_series(1, $1)`, numSpans)
639+
// Have a 100 splits each containing a 100 rows.
640+
_, err = db.Exec(`INSERT INTO t SELECT generate_series(1, $1)`, (numRows*numSpans)+1)
627641
require.NoError(t, err)
628-
_, err = db.Exec(`ALTER TABLE t SPLIT AT TABLE generate_series(1, $1)`, numSpans)
642+
for split := 0; split < numSpans; split++ {
643+
_, err = db.Exec(`ALTER TABLE t SPLIT AT VALUES ($1)`, numRows*numSpans)
644+
}
629645
require.NoError(t, err)
630646
var descID catid.DescID
631647
descIDRow := db.QueryRow(`SELECT 't'::regclass::oid`)
@@ -669,21 +685,28 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
669685
}
670686

671687
var completedSpans roachpb.SpanGroup
688+
var observedSpans []roachpb.Span
672689
receiveProgressUpdate := func() {
673-
progressUpdate := <-backfillProgressCompletedCh
674-
675-
// Make sure the progress update does not contain overlapping spans.
676-
for i, span1 := range progressUpdate {
677-
for j, span2 := range progressUpdate {
678-
if i <= j {
679-
continue
680-
}
681-
if span1.Overlaps(span2) {
682-
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
690+
updateCount := 2
691+
for isBlockingBackfillProgress.Load() && updateCount > 0 {
692+
progressUpdate := <-backfillProgressCompletedCh
693+
// Make sure the progress update does not contain overlapping spans.
694+
for i, span1 := range progressUpdate {
695+
for j, span2 := range progressUpdate {
696+
if i == j {
697+
continue
698+
}
699+
if span1.Overlaps(span2) {
700+
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
701+
}
683702
}
684703
}
704+
hasMoreSpans := completedSpans.Add(progressUpdate...)
705+
if hasMoreSpans {
706+
updateCount -= 1
707+
}
708+
observedSpans = append(observedSpans, progressUpdate...)
685709
}
686-
completedSpans.Add(progressUpdate...)
687710
}
688711

689712
ensureCompletedSpansAreCheckpointed := func() {
@@ -717,45 +740,35 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
717740
}, 5*time.Second)
718741
}
719742

720-
// Let the backfill step forward a bit before we do our PAUSE/RESUME
721-
// dance.
722-
for i := 0; i < 2; i++ {
743+
for isBlockingBackfillProgress.Load() {
723744
receiveProgressUpdate()
724-
}
725-
726-
ensureCompletedSpansAreCheckpointed()
727-
t.Logf("pausing backfill")
728-
_, err = db.Exec(`PAUSE JOB $1`, jobID)
729-
require.NoError(t, err)
730-
ensureJobState("paused")
731-
732-
t.Logf("resuming backfill")
733-
_, err = db.Exec(`RESUME JOB $1`, jobID)
734-
require.NoError(t, err)
735-
ensureJobState("running")
745+
ensureCompletedSpansAreCheckpointed()
746+
t.Logf("pausing backfill")
747+
_, err = db.Exec(`PAUSE JOB $1`, jobID)
748+
require.NoError(t, err)
749+
ensureJobState("paused")
736750

737-
// Step forward again before re-pausing.
738-
for i := 0; i < 2; i++ {
739-
receiveProgressUpdate()
751+
t.Logf("resuming backfill")
752+
_, err = db.Exec(`RESUME JOB $1`, jobID)
753+
require.NoError(t, err)
754+
ensureJobState("running")
740755
}
741-
742-
ensureCompletedSpansAreCheckpointed()
743-
isBlockingBackfillProgress.Store(false)
744-
745-
t.Logf("pausing backfill")
746-
_, err = db.Exec(`PAUSE JOB $1`, jobID)
747-
require.NoError(t, err)
748-
ensureJobState("paused")
749-
750-
t.Logf("resuming backfill")
751-
_, err = db.Exec(`RESUME JOB $1`, jobID)
752-
require.NoError(t, err)
753-
ensureJobState("running")
754-
755756
// Now we can wait for the job to succeed
756757
ensureJobState("succeeded")
757-
758758
if err = g.Wait(); err != nil {
759759
require.NoError(t, err)
760760
}
761+
// Make sure the spans we are adding do not overlap otherwise, this indicates
762+
// a bug. Where we computed chunks incorrectly. Each chunk should be an independent
763+
// piece of work.
764+
for i, span1 := range observedSpans {
765+
for j, span2 := range observedSpans {
766+
if i == j {
767+
continue
768+
}
769+
if span1.Overlaps(span2) {
770+
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
771+
}
772+
}
773+
}
761774
}

pkg/sql/rowexec/indexbackfiller.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,8 @@ func (ib *indexBackfiller) constructIndexEntries(
152152

153153
// Identify the Span for which we have constructed index entries. This is
154154
// used for reporting progress and updating the job details.
155-
completedSpan := ib.spec.Spans[i]
155+
completedSpan := roachpb.Span{Key: startKey, EndKey: ib.spec.Spans[i].EndKey}
156156
if todo.Key != nil {
157-
completedSpan.Key = startKey
158157
completedSpan.EndKey = todo.Key
159158
}
160159

@@ -510,21 +509,20 @@ func (ib *indexBackfiller) getProgressReportInterval() time.Duration {
510509
return indexBackfillProgressReportInterval
511510
}
512511

513-
// buildIndexEntryBatch constructs the index entries for a single indexBatch.
512+
// buildIndexEntryBatch constructs the index entries for a single indexBatch for
513+
// the given span. A non-nil resumeKey is returned when there is still work to
514+
// be done in this span (sp.Key < resumeKey < sp.EndKey), which happens if the
515+
// entire span does not fit within the batch size.
514516
func (ib *indexBackfiller) buildIndexEntryBatch(
515517
tctx context.Context, sp roachpb.Span, readAsOf hlc.Timestamp,
516-
) (roachpb.Key, []rowenc.IndexEntry, int64, error) {
518+
) (resumeKey roachpb.Key, entries []rowenc.IndexEntry, memUsedBuildingBatch int64, err error) {
517519
knobs := &ib.flowCtx.Cfg.TestingKnobs
518520
if knobs.RunBeforeBackfillChunk != nil {
519521
if err := knobs.RunBeforeBackfillChunk(sp); err != nil {
520522
return nil, nil, 0, err
521523
}
522524
}
523525

524-
var memUsedBuildingBatch int64
525-
var key roachpb.Key
526-
var entries []rowenc.IndexEntry
527-
528526
br := indexBatchRetry{
529527
nextChunkSize: ib.spec.ChunkSize,
530528
// Memory used while building index entries is released by another goroutine
@@ -563,7 +561,7 @@ func (ib *indexBackfiller) buildIndexEntryBatch(
563561

564562
// TODO(knz): do KV tracing in DistSQL processors.
565563
var err error
566-
entries, key, memUsedBuildingBatch, err = ib.BuildIndexEntriesChunk(
564+
entries, resumeKey, memUsedBuildingBatch, err = ib.BuildIndexEntriesChunk(
567565
ctx, txn.KV(), ib.desc, sp, br.nextChunkSize, false, /* traceKV */
568566
)
569567
return err
@@ -579,7 +577,7 @@ func (ib *indexBackfiller) buildIndexEntryBatch(
579577
log.VEventf(ctx, 3, "index backfill stats: entries %d, prepare %+v",
580578
len(entries), prepTime)
581579

582-
return key, entries, memUsedBuildingBatch, nil
580+
return resumeKey, entries, memUsedBuildingBatch, nil
583581
}
584582

585583
// Resume is part of the execinfra.Processor interface.

0 commit comments

Comments
 (0)