Skip to content

Commit a119dce

Browse files
authored
Merge pull request #153596 from fqazi/blathers/backport-release-25.2-153583
release-25.2: sql/schemachanger: fix index creation failures after pause/resume
2 parents 73d680e + 904baf4 commit a119dce

File tree

4 files changed

+74
-62
lines changed

4 files changed

+74
-62
lines changed

pkg/sql/backfill/backfill.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -948,19 +948,21 @@ func (ib *IndexBackfiller) init(
948948
// that needs to be freed once the returned IndexEntry slice is freed. This is
949949
// returned for the successful and failure cases. It is the callers responsibility
950950
// to clear the associated bound account when appropriate.
951+
// A non-nil resumeKey is returned when there is still work to be done in this
952+
// span (sp.Key < resumeKey < sp.EndKey), which happens if the entire span does
953+
// not fit within the batch size.
951954
func (ib *IndexBackfiller) BuildIndexEntriesChunk(
952955
ctx context.Context,
953956
txn *kv.Txn,
954957
tableDesc catalog.TableDescriptor,
955958
sp roachpb.Span,
956959
chunkSize int64,
957960
traceKV bool,
958-
) ([]rowenc.IndexEntry, roachpb.Key, int64, error) {
961+
) (entries []rowenc.IndexEntry, resumeKey roachpb.Key, memUsedPerChunk int64, err error) {
959962
// This ought to be chunkSize but in most tests we are actually building smaller
960963
// indexes so use a smaller value.
961964
const initBufferSize = 1000
962965
const sizeOfIndexEntry = int64(unsafe.Sizeof(rowenc.IndexEntry{}))
963-
var memUsedPerChunk int64
964966

965967
indexEntriesInChunkInitialBufferSize :=
966968
sizeOfIndexEntry * initBufferSize * int64(len(ib.added))
@@ -969,7 +971,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
969971
"failed to initialize empty buffer to store the index entries of all rows in the chunk")
970972
}
971973
memUsedPerChunk += indexEntriesInChunkInitialBufferSize
972-
entries := make([]rowenc.IndexEntry, 0, initBufferSize*int64(len(ib.added)))
974+
entries = make([]rowenc.IndexEntry, 0, initBufferSize*int64(len(ib.added)))
973975

974976
var fetcherCols []descpb.ColumnID
975977
for i, c := range ib.cols {
@@ -1202,7 +1204,6 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
12021204
ib.ShrinkBoundAccount(ctx, shrinkSize)
12031205
memUsedPerChunk -= shrinkSize
12041206

1205-
var resumeKey roachpb.Key
12061207
if fetcher.Key() != nil {
12071208
resumeKey = make(roachpb.Key, len(fetcher.Key()))
12081209
copy(resumeKey, fetcher.Key())

pkg/sql/index_backfiller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
106106

107107
knobs := &ib.execCfg.DistSQLSrv.TestingKnobs
108108
if knobs.RunBeforeIndexBackfillProgressUpdate != nil {
109-
knobs.RunBeforeIndexBackfillProgressUpdate(ctx, progress.CompletedSpans)
109+
knobs.RunBeforeIndexBackfillProgressUpdate(ctx, meta.BulkProcessorProgress.CompletedSpans)
110110
}
111111
return tracker.SetBackfillProgress(ctx, progress)
112112
}

pkg/sql/indexbackfiller_test.go

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import (
3737
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
3838
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
3939
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
40+
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
41+
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan"
4042
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
4143
"github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype"
4244
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
@@ -581,7 +583,8 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
581583

582584
ctx := context.Background()
583585
backfillProgressCompletedCh := make(chan []roachpb.Span)
584-
const numSpans = 100
586+
const numRows = 100
587+
const numSpans = 20
585588
var isBlockingBackfillProgress atomic.Bool
586589
isBlockingBackfillProgress.Store(true)
587590

@@ -610,22 +613,35 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
610613
}
611614
return nil
612615
},
616+
AfterStage: func(p scplan.Plan, stageIdx int) error {
617+
if p.Stages[stageIdx].Type() != scop.BackfillType || !isBlockingBackfillProgress.Load() {
618+
return nil
619+
}
620+
isBlockingBackfillProgress.Store(false)
621+
close(backfillProgressCompletedCh)
622+
return nil
623+
},
613624
},
614625
},
615626
})
616627
defer tc.Stopper().Stop(ctx)
617628

618629
_, err := db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.batch_size = 10`)
619630
require.NoError(t, err)
631+
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.ingest_concurrency=4`)
632+
require.NoError(t, err)
620633
// Ensure that we checkpoint our progress to the backfill job so that
621634
// RESUMEs can get an up-to-date backfill progress.
622635
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.checkpoint_interval = '10ms'`)
623636
require.NoError(t, err)
624637
_, err = db.Exec(`CREATE TABLE t(i INT PRIMARY KEY)`)
625638
require.NoError(t, err)
626-
_, err = db.Exec(`INSERT INTO t SELECT generate_series(1, $1)`, numSpans)
639+
// Have a 100 splits each containing a 100 rows.
640+
_, err = db.Exec(`INSERT INTO t SELECT generate_series(1, $1)`, (numRows*numSpans)+1)
627641
require.NoError(t, err)
628-
_, err = db.Exec(`ALTER TABLE t SPLIT AT TABLE generate_series(1, $1)`, numSpans)
642+
for split := 0; split < numSpans; split++ {
643+
_, err = db.Exec(`ALTER TABLE t SPLIT AT VALUES ($1)`, numRows*numSpans)
644+
}
629645
require.NoError(t, err)
630646
var descID catid.DescID
631647
descIDRow := db.QueryRow(`SELECT 't'::regclass::oid`)
@@ -669,21 +685,28 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
669685
}
670686

671687
var completedSpans roachpb.SpanGroup
688+
var observedSpans []roachpb.Span
672689
receiveProgressUpdate := func() {
673-
progressUpdate := <-backfillProgressCompletedCh
674-
675-
// Make sure the progress update does not contain overlapping spans.
676-
for i, span1 := range progressUpdate {
677-
for j, span2 := range progressUpdate {
678-
if i <= j {
679-
continue
680-
}
681-
if span1.Overlaps(span2) {
682-
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
690+
updateCount := 2
691+
for isBlockingBackfillProgress.Load() && updateCount > 0 {
692+
progressUpdate := <-backfillProgressCompletedCh
693+
// Make sure the progress update does not contain overlapping spans.
694+
for i, span1 := range progressUpdate {
695+
for j, span2 := range progressUpdate {
696+
if i == j {
697+
continue
698+
}
699+
if span1.Overlaps(span2) {
700+
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
701+
}
683702
}
684703
}
704+
hasMoreSpans := completedSpans.Add(progressUpdate...)
705+
if hasMoreSpans {
706+
updateCount -= 1
707+
}
708+
observedSpans = append(observedSpans, progressUpdate...)
685709
}
686-
completedSpans.Add(progressUpdate...)
687710
}
688711

689712
ensureCompletedSpansAreCheckpointed := func() {
@@ -717,45 +740,35 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
717740
}, 5*time.Second)
718741
}
719742

720-
// Let the backfill step forward a bit before we do our PAUSE/RESUME
721-
// dance.
722-
for i := 0; i < 2; i++ {
743+
for isBlockingBackfillProgress.Load() {
723744
receiveProgressUpdate()
724-
}
725-
726-
ensureCompletedSpansAreCheckpointed()
727-
t.Logf("pausing backfill")
728-
_, err = db.Exec(`PAUSE JOB $1`, jobID)
729-
require.NoError(t, err)
730-
ensureJobState("paused")
731-
732-
t.Logf("resuming backfill")
733-
_, err = db.Exec(`RESUME JOB $1`, jobID)
734-
require.NoError(t, err)
735-
ensureJobState("running")
745+
ensureCompletedSpansAreCheckpointed()
746+
t.Logf("pausing backfill")
747+
_, err = db.Exec(`PAUSE JOB $1`, jobID)
748+
require.NoError(t, err)
749+
ensureJobState("paused")
736750

737-
// Step forward again before re-pausing.
738-
for i := 0; i < 2; i++ {
739-
receiveProgressUpdate()
751+
t.Logf("resuming backfill")
752+
_, err = db.Exec(`RESUME JOB $1`, jobID)
753+
require.NoError(t, err)
754+
ensureJobState("running")
740755
}
741-
742-
ensureCompletedSpansAreCheckpointed()
743-
isBlockingBackfillProgress.Store(false)
744-
745-
t.Logf("pausing backfill")
746-
_, err = db.Exec(`PAUSE JOB $1`, jobID)
747-
require.NoError(t, err)
748-
ensureJobState("paused")
749-
750-
t.Logf("resuming backfill")
751-
_, err = db.Exec(`RESUME JOB $1`, jobID)
752-
require.NoError(t, err)
753-
ensureJobState("running")
754-
755756
// Now we can wait for the job to succeed
756757
ensureJobState("succeeded")
757-
758758
if err = g.Wait(); err != nil {
759759
require.NoError(t, err)
760760
}
761+
// Make sure the spans we are adding do not overlap otherwise, this indicates
762+
// a bug. Where we computed chunks incorrectly. Each chunk should be an independent
763+
// piece of work.
764+
for i, span1 := range observedSpans {
765+
for j, span2 := range observedSpans {
766+
if i == j {
767+
continue
768+
}
769+
if span1.Overlaps(span2) {
770+
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
771+
}
772+
}
773+
}
761774
}

pkg/sql/rowexec/indexbackfiller.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,8 @@ func (ib *indexBackfiller) constructIndexEntries(
152152

153153
// Identify the Span for which we have constructed index entries. This is
154154
// used for reporting progress and updating the job details.
155-
completedSpan := ib.spec.Spans[i]
155+
completedSpan := roachpb.Span{Key: startKey, EndKey: ib.spec.Spans[i].EndKey}
156156
if todo.Key != nil {
157-
completedSpan.Key = startKey
158157
completedSpan.EndKey = todo.Key
159158
}
160159

@@ -510,21 +509,20 @@ func (ib *indexBackfiller) getProgressReportInterval() time.Duration {
510509
return indexBackfillProgressReportInterval
511510
}
512511

513-
// buildIndexEntryBatch constructs the index entries for a single indexBatch.
512+
// buildIndexEntryBatch constructs the index entries for a single indexBatch for
513+
// the given span. A non-nil resumeKey is returned when there is still work to
514+
// be done in this span (sp.Key < resumeKey < sp.EndKey), which happens if the
515+
// entire span does not fit within the batch size.
514516
func (ib *indexBackfiller) buildIndexEntryBatch(
515517
tctx context.Context, sp roachpb.Span, readAsOf hlc.Timestamp,
516-
) (roachpb.Key, []rowenc.IndexEntry, int64, error) {
518+
) (resumeKey roachpb.Key, entries []rowenc.IndexEntry, memUsedBuildingBatch int64, err error) {
517519
knobs := &ib.flowCtx.Cfg.TestingKnobs
518520
if knobs.RunBeforeBackfillChunk != nil {
519521
if err := knobs.RunBeforeBackfillChunk(sp); err != nil {
520522
return nil, nil, 0, err
521523
}
522524
}
523525

524-
var memUsedBuildingBatch int64
525-
var key roachpb.Key
526-
var entries []rowenc.IndexEntry
527-
528526
br := indexBatchRetry{
529527
nextChunkSize: ib.spec.ChunkSize,
530528
// Memory used while building index entries is released by another goroutine
@@ -563,7 +561,7 @@ func (ib *indexBackfiller) buildIndexEntryBatch(
563561

564562
// TODO(knz): do KV tracing in DistSQL processors.
565563
var err error
566-
entries, key, memUsedBuildingBatch, err = ib.BuildIndexEntriesChunk(
564+
entries, resumeKey, memUsedBuildingBatch, err = ib.BuildIndexEntriesChunk(
567565
ctx, txn.KV(), ib.desc, sp, br.nextChunkSize, false, /* traceKV */
568566
)
569567
return err
@@ -579,7 +577,7 @@ func (ib *indexBackfiller) buildIndexEntryBatch(
579577
log.VEventf(ctx, 3, "index backfill stats: entries %d, prepare %+v",
580578
len(entries), prepTime)
581579

582-
return key, entries, memUsedBuildingBatch, nil
580+
return resumeKey, entries, memUsedBuildingBatch, nil
583581
}
584582

585583
// Resume is part of the execinfra.Processor interface.

0 commit comments

Comments
 (0)