Skip to content

Commit 058a7f5

Browse files
committed
sql: preserve completed spans upon pushing backfill progress
This patch ensures that progress that we have completed before a backfill progress update gets preserved. Epic: none Fixes: #140358 Release note (bug fix): Fixed a bug where index backfill progress before a PAUSE/RESUME would not get tracked.
1 parent cc910e9 commit 058a7f5

File tree

8 files changed

+222
-9
lines changed

8 files changed

+222
-9
lines changed

pkg/roachpb/span_group.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (g *SpanGroup) Contains(k Key) bool {
6464
})
6565
}
6666

67-
// Encloses returns whether the provided Span is fully conained within the group
67+
// Encloses returns whether the provided Span is fully contained within the group
6868
// of Spans in the SpanGroup
6969
func (g *SpanGroup) Encloses(spans ...Span) bool {
7070
if g.rg == nil {

pkg/sql/create_function_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,14 +326,13 @@ COMMIT;
326326
tDB.Exec(t, `DELETE FROM t WHERE a = 2`)
327327

328328
// Make sure function cannot be used before job completes.
329-
testingKnob.RunBeforeBackfill = func() error {
329+
testingKnob.RunBeforeBackfill = func(_ []scexec.BackfillProgress) error {
330330
_, err = sqlDB.Exec(`SELECT f()`)
331331
require.Error(t, err, "")
332332
require.Contains(t, err.Error(), `function "f" is being added`)
333333
return nil
334334
}
335335

336-
//tDB.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints='newschemachanger.before.exec'`)
337336
_, err = sqlDB.Exec(`
338337
BEGIN;
339338
SET LOCAL autocommit_before_ddl = false;

pkg/sql/execinfra/server_config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,10 @@ type TestingKnobs struct {
242242
// function returns an error, or if the table has already been dropped.
243243
RunAfterBackfillChunk func()
244244

245+
// RunBeforeIndexBackfillProgressUpdate is called before updating the
246+
// progress for a single index backfill.
247+
RunBeforeIndexBackfillProgressUpdate func(completed []roachpb.Span)
248+
245249
// SerializeIndexBackfillCreationAndIngestion ensures that every index batch
246250
// created during an index backfill is also ingested before moving on to the
247251
// next batch or returning.

pkg/sql/index_backfiller.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,12 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
8383
if meta.BulkProcessorProgress == nil {
8484
return nil
8585
}
86-
progress.CompletedSpans = addCompleted(
87-
meta.BulkProcessorProgress.CompletedSpans...)
86+
progress.CompletedSpans = append(progress.CompletedSpans, addCompleted(
87+
meta.BulkProcessorProgress.CompletedSpans...)...)
88+
knobs := &ib.execCfg.DistSQLSrv.TestingKnobs
89+
if knobs.RunBeforeIndexBackfillProgressUpdate != nil {
90+
knobs.RunBeforeIndexBackfillProgressUpdate(progress.CompletedSpans)
91+
}
8892
return tracker.SetBackfillProgress(ctx, progress)
8993
}
9094
var spansToDo []roachpb.Span
@@ -98,6 +102,7 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
98102
if len(spansToDo) == 0 { // already done
99103
return nil
100104
}
105+
101106
now := ib.execCfg.DB.Clock().Now()
102107
// Pick now as the read timestamp for the backfill. It's safe to use this
103108
// timestamp to read even if we've partially backfilled at an earlier

pkg/sql/indexbackfiller_test.go

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ package sql_test
88
import (
99
"context"
1010
gosql "database/sql"
11+
"fmt"
1112
"sync"
1213
"sync/atomic"
1314
"testing"
15+
"time"
1416

1517
"github.com/cockroachdb/cockroach/pkg/base"
1618
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -29,15 +31,22 @@ import (
2931
"github.com/cockroachdb/cockroach/pkg/sql/catalog/fetchpb"
3032
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
3133
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
34+
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
3235
"github.com/cockroachdb/cockroach/pkg/sql/isql"
3336
"github.com/cockroachdb/cockroach/pkg/sql/row"
3437
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
3538
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
39+
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
40+
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
3641
"github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype"
3742
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3843
"github.com/cockroachdb/cockroach/pkg/sql/types"
44+
"github.com/cockroachdb/cockroach/pkg/testutils"
3945
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
46+
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
4047
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
48+
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
49+
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
4150
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
4251
"github.com/cockroachdb/cockroach/pkg/util/log"
4352
"github.com/cockroachdb/cockroach/pkg/util/mon"
@@ -553,3 +562,198 @@ INSERT INTO foo VALUES (1), (10), (100);
553562
t.Run(test.name, func(t *testing.T) { run(t, test) })
554563
}
555564
}
565+
566+
// TestIndexBackfillerResumePreservesProgress tests that spans completed during
567+
// a backfill are properly preserved during PAUSE/RESUMEs. In particular,
568+
// we test the following backfill sequence (where b[n] denotes the same
569+
// backfill, just at different points of progression):
570+
//
571+
// b[1] -> PAUSE -> RESUME -> b[2] -> PAUSE -> RESUME -> b[3]
572+
//
573+
// Before #140358, b[2] only checkpointed the spans it has completed since the
574+
// backfill was resumed -- leading to [b3] redoing work already completed since
575+
// b[1].
576+
func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
577+
defer leaktest.AfterTest(t)()
578+
defer log.Scope(t).Close(t)
579+
skip.UnderStress(t, "timing-sensitive test")
580+
581+
ctx := context.Background()
582+
backfillProgressUpdateCh := make(chan struct{})
583+
backfillProgressCompletedCh := make(chan []roachpb.Span)
584+
checkpointedSpansCh := make(chan []roachpb.Span)
585+
const numSpans = 100
586+
var isBlockingBackfillProgress atomic.Bool
587+
isBlockingBackfillProgress.Store(true)
588+
var isBlockingCheckpoint atomic.Bool
589+
590+
clusterArgs := base.TestClusterArgs{
591+
ServerArgs: base.TestServerArgs{
592+
Knobs: base.TestingKnobs{
593+
DistSQL: &execinfra.TestingKnobs{
594+
// We want to push progress every batch_size rows to control
595+
// the backfill incrementally.
596+
BulkAdderFlushesEveryBatch: true,
597+
RunBeforeIndexBackfillProgressUpdate: func(completed []roachpb.Span) {
598+
if isBlockingBackfillProgress.Load() {
599+
<-backfillProgressUpdateCh
600+
backfillProgressCompletedCh <- completed
601+
}
602+
},
603+
},
604+
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
605+
RunBeforeBackfill: func(progresses []scexec.BackfillProgress) error {
606+
if isBlockingCheckpoint.Load() && progresses != nil && progresses[0].CompletedSpans != nil {
607+
checkpointedSpansCh <- progresses[0].CompletedSpans
608+
}
609+
return nil
610+
},
611+
},
612+
},
613+
},
614+
}
615+
// Start the server with a testing knob
616+
tc := testcluster.NewTestCluster(t, 1, clusterArgs)
617+
tc.Start(t)
618+
defer tc.Stopper().Stop(ctx)
619+
db := tc.Conns[0]
620+
621+
_, err := db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.batch_size = 10`)
622+
require.NoError(t, err)
623+
// Ensure that we checkpoint our progress to the backfill job so that
624+
// RESUMEs can get an up-to-date backfill progress.
625+
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.checkpoint_interval = '0s'`)
626+
require.NoError(t, err)
627+
_, err = db.Exec(`CREATE TABLE t(i INT PRIMARY KEY)`)
628+
require.NoError(t, err)
629+
_, err = db.Exec(`INSERT INTO t SELECT generate_series(1, $1)`, numSpans)
630+
require.NoError(t, err)
631+
_, err = db.Exec(`ALTER TABLE t SPLIT AT TABLE generate_series(1, $1)`, numSpans)
632+
require.NoError(t, err)
633+
require.NoError(t, tc.WaitForFullReplication())
634+
var descID catid.DescID
635+
descIDRow := db.QueryRow(`SELECT 't'::regclass::oid`)
636+
err = descIDRow.Scan(&descID)
637+
require.NoError(t, err)
638+
639+
var jobID int
640+
g := ctxgroup.WithContext(ctx)
641+
g.GoCtx(func(ctx context.Context) error {
642+
_, err := db.Exec(`ALTER TABLE t ADD COLUMN j INT NOT NULL DEFAULT 42`)
643+
if err != nil && err.Error() != fmt.Sprintf("pq: job %d was paused before it completed", jobID) {
644+
return err
645+
}
646+
return nil
647+
})
648+
649+
g.GoCtx(func(ctx context.Context) error {
650+
testutils.SucceedsSoon(t, func() error {
651+
jobIDRow := db.QueryRow(`
652+
SELECT job_id FROM [SHOW JOBS]
653+
WHERE job_type = 'NEW SCHEMA CHANGE' AND description ILIKE '%ADD COLUMN%'`,
654+
)
655+
if err := jobIDRow.Scan(&jobID); err != nil {
656+
return err
657+
}
658+
return nil
659+
})
660+
661+
ensureJobState := func(targetState string) {
662+
testutils.SucceedsSoon(t, func() error {
663+
var jobState string
664+
statusRow := db.QueryRow(`SELECT status FROM [SHOW JOB $1]`, jobID)
665+
if err := statusRow.Scan(&jobState); err != nil {
666+
return err
667+
}
668+
if jobState != targetState {
669+
return errors.Errorf("expected job to be %s, but found status: %s",
670+
targetState, jobState)
671+
}
672+
return nil
673+
})
674+
}
675+
676+
// Let the backfill step forward a bit before we do our PAUSE/RESUME
677+
// dance.
678+
var spansCompletedBeforePause []roachpb.Span
679+
for i := 0; i < 2; i++ {
680+
backfillProgressUpdateCh <- struct{}{}
681+
spansCompletedBeforePause = <-backfillProgressCompletedCh
682+
}
683+
684+
_, err := db.Exec(`PAUSE JOB $1`, jobID)
685+
if err != nil {
686+
return err
687+
}
688+
ensureJobState("paused")
689+
690+
_, err = db.Exec(`RESUME JOB $1`, jobID)
691+
if err != nil {
692+
return err
693+
}
694+
ensureJobState("running")
695+
696+
for i := 0; i < 2; i++ {
697+
backfillProgressUpdateCh <- struct{}{}
698+
<-backfillProgressCompletedCh
699+
}
700+
isBlockingCheckpoint.Store(true)
701+
702+
_, err = db.Exec(`PAUSE JOB $1`, jobID)
703+
if err != nil {
704+
return err
705+
}
706+
ensureJobState("paused")
707+
708+
_, err = db.Exec(`RESUME JOB $1`, jobID)
709+
if err != nil {
710+
return err
711+
}
712+
ensureJobState("running")
713+
714+
var wg sync.WaitGroup
715+
wg.Add(1)
716+
go func() {
717+
defer wg.Done()
718+
timer := time.NewTimer(1 * time.Minute)
719+
defer timer.Stop()
720+
select {
721+
case checkpointed := <-checkpointedSpansCh:
722+
isBlockingCheckpoint.Store(false)
723+
var sg roachpb.SpanGroup
724+
sg.Add(checkpointed...)
725+
// Ensure that the spans we completed before any PAUSE is
726+
// fully contained in our checkpointed completed spans group.
727+
require.True(t, sg.Encloses(spansCompletedBeforePause...))
728+
case <-timer.C:
729+
require.Fail(t, "timed out waiting for checkpoint")
730+
}
731+
}()
732+
733+
wg.Add(1)
734+
go func() {
735+
defer wg.Done()
736+
737+
isBlockingBackfillProgress.Store(false)
738+
timer := time.NewTimer(1 * time.Minute)
739+
defer timer.Stop()
740+
select {
741+
case backfillProgressUpdateCh <- struct{}{}:
742+
<-backfillProgressCompletedCh
743+
case <-timer.C:
744+
require.Fail(t, "timed out waiting for backfill progress")
745+
}
746+
}()
747+
748+
// Wait for both operations to complete
749+
wg.Wait()
750+
751+
// Now we can wait for the job to succeed
752+
ensureJobState("succeeded")
753+
return nil
754+
})
755+
756+
if err = g.Wait(); err != nil {
757+
require.NoError(t, err)
758+
}
759+
}

pkg/sql/schema_changer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2540,7 +2540,7 @@ type SchemaChangerTestingKnobs struct {
25402540
// fixing the index backfill scan timestamp.
25412541
RunBeforeIndexBackfill func()
25422542

2543-
// RunBeforeIndexBackfill is called after the index backfill
2543+
// RunAfterIndexBackfill is called after the index backfill
25442544
// process is complete (including the temporary index merge)
25452545
// but before the final validation of the indexes.
25462546
RunAfterIndexBackfill func()

pkg/sql/schemachanger/scexec/exec_backfill.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ func runBackfiller(
317317
) error {
318318
if deps.GetTestingKnobs() != nil &&
319319
deps.GetTestingKnobs().RunBeforeBackfill != nil {
320-
err := deps.GetTestingKnobs().RunBeforeBackfill()
320+
err := deps.GetTestingKnobs().RunBeforeBackfill(backfillProgresses)
321321
if err != nil {
322322
return err
323323
}

pkg/sql/schemachanger/scexec/testing_knobs.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ type TestingKnobs struct {
3434
// error during stage execution.
3535
OnPostCommitError func(p scplan.Plan, stageIdx int, err error) error
3636

37-
// RunBeforeBackfill is called just before starting the backfill.
38-
RunBeforeBackfill func() error
37+
// RunBeforeBackfill is called just before starting the backfill, with the
38+
// BackfillProgress that we will be backfilling with.
39+
RunBeforeBackfill func(progresses []BackfillProgress) error
3940

4041
// RunBeforeMakingPostCommitPlan is called just before making the post commit
4142
// plan.

0 commit comments

Comments
 (0)