Skip to content

Commit b216877

Browse files
craig[bot]rafiss
andcommitted
Merge #147511
147511: sql: fix memory leak in index backfill progress tracking r=rafiss a=rafiss Previously, when resuming an index backfill job, the completed spans from the job's checkpoint would be repeatedly added to the progress tracker on each update, causing unbounded memory growth. For large tables with many ranges, this could exhaust available memory and cause the backfill to fail. This change fixes the memory leak by initializing the completed spans group once with the checkpointed progress at job startup, then only adding new spans from subsequent batches. A test-only assertion is added to detect overlapping spans in progress updates. The test is also updated to verify that checkpointed spans properly enclose all completed work and to remove flaky timing dependencies. These test changes also address the test issues that were present since the test was first added in 058a7f5. The testing changes include: - Use a context in the testing hook so that when the job is paused, the testing hook does not keep blocking. - Fixed a race condition by getting the progress from the persisted progress, rather than a different hook. - Avoid using `require` / `t.Fatal` from inside a goroutine, which violates the expectations of testing.T. fixes #140358 fixes #147209 Release note (bug fix): Fixed a memory leak in index backfill jobs where completed spans were duplicated in memory on each progress update after resuming from a checkpoint. This could cause out-of-memory errors when backfilling indexes on large tables with many ranges. Co-authored-by: Rafi Shamim <[email protected]>
2 parents d1b279d + 48a6512 commit b216877

File tree

3 files changed

+144
-126
lines changed

3 files changed

+144
-126
lines changed

pkg/sql/execinfra/server_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ type TestingKnobs struct {
244244

245245
// RunBeforeIndexBackfillProgressUpdate is called before updating the
246246
// progress for a single index backfill.
247-
RunBeforeIndexBackfillProgressUpdate func(completed []roachpb.Span)
247+
RunBeforeIndexBackfillProgressUpdate func(ctx context.Context, completed []roachpb.Span)
248248

249249
// SerializeIndexBackfillCreationAndIngestion ensures that every index batch
250250
// created during an index backfill is also ingested before moving on to the

pkg/sql/index_backfiller.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
2222
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2323
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
24+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
2425
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2526
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
27+
"github.com/cockroachdb/errors"
2628
)
2729

2830
// IndexBackfillPlanner holds dependencies for an index backfiller
@@ -77,17 +79,34 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
7779
completed.g.Add(c...)
7880
return completed.g.Slice()
7981
}
82+
// Add spans that were already completed before the job resumed.
83+
addCompleted(progress.CompletedSpans...)
8084
updateFunc := func(
8185
ctx context.Context, meta *execinfrapb.ProducerMetadata,
8286
) error {
8387
if meta.BulkProcessorProgress == nil {
8488
return nil
8589
}
86-
progress.CompletedSpans = append(progress.CompletedSpans, addCompleted(
87-
meta.BulkProcessorProgress.CompletedSpans...)...)
90+
progress.CompletedSpans = addCompleted(meta.BulkProcessorProgress.CompletedSpans...)
91+
// Make sure the progress update does not contain overlapping spans.
92+
// This is a sanity check that only runs in test configurations, since it
93+
// is an expensive n^2 check.
94+
if buildutil.CrdbTestBuild {
95+
for i, span1 := range progress.CompletedSpans {
96+
for j, span2 := range progress.CompletedSpans {
97+
if i <= j {
98+
continue
99+
}
100+
if span1.Overlaps(span2) {
101+
return errors.Newf("progress update contains overlapping spans: %s and %s", span1, span2)
102+
}
103+
}
104+
}
105+
}
106+
88107
knobs := &ib.execCfg.DistSQLSrv.TestingKnobs
89108
if knobs.RunBeforeIndexBackfillProgressUpdate != nil {
90-
knobs.RunBeforeIndexBackfillProgressUpdate(progress.CompletedSpans)
109+
knobs.RunBeforeIndexBackfillProgressUpdate(ctx, progress.CompletedSpans)
91110
}
92111
return tracker.SetBackfillProgress(ctx, progress)
93112
}

pkg/sql/indexbackfiller_test.go

Lines changed: 121 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,12 @@ import (
4343
"github.com/cockroachdb/cockroach/pkg/sql/types"
4444
"github.com/cockroachdb/cockroach/pkg/testutils"
4545
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
46-
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
4746
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
48-
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
4947
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
5048
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
5149
"github.com/cockroachdb/cockroach/pkg/util/log"
5250
"github.com/cockroachdb/cockroach/pkg/util/mon"
51+
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
5352
"github.com/cockroachdb/errors"
5453
"github.com/stretchr/testify/require"
5554
"google.golang.org/protobuf/proto"
@@ -576,61 +575,55 @@ INSERT INTO foo VALUES (1), (10), (100);
576575
func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
577576
defer leaktest.AfterTest(t)()
578577
defer log.Scope(t).Close(t)
579-
skip.UnderStress(t, "timing-sensitive test")
580578

581579
ctx := context.Background()
582-
backfillProgressUpdateCh := make(chan struct{})
583580
backfillProgressCompletedCh := make(chan []roachpb.Span)
584-
checkpointedSpansCh := make(chan []roachpb.Span)
585581
const numSpans = 100
586582
var isBlockingBackfillProgress atomic.Bool
587583
isBlockingBackfillProgress.Store(true)
588-
var isBlockingCheckpoint atomic.Bool
589584

590-
clusterArgs := base.TestClusterArgs{
591-
ServerArgs: base.TestServerArgs{
592-
Knobs: base.TestingKnobs{
593-
DistSQL: &execinfra.TestingKnobs{
594-
// We want to push progress every batch_size rows to control
595-
// the backfill incrementally.
596-
BulkAdderFlushesEveryBatch: true,
597-
RunBeforeIndexBackfillProgressUpdate: func(completed []roachpb.Span) {
598-
if isBlockingBackfillProgress.Load() {
599-
<-backfillProgressUpdateCh
600-
backfillProgressCompletedCh <- completed
585+
// Start the server with testing knob.
586+
tc, db, _ := serverutils.StartServer(t, base.TestServerArgs{
587+
Knobs: base.TestingKnobs{
588+
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
589+
DistSQL: &execinfra.TestingKnobs{
590+
// We want to push progress every batch_size rows to control
591+
// the backfill incrementally.
592+
BulkAdderFlushesEveryBatch: true,
593+
RunBeforeIndexBackfillProgressUpdate: func(ctx context.Context, completed []roachpb.Span) {
594+
if isBlockingBackfillProgress.Load() {
595+
select {
596+
case <-ctx.Done():
597+
case backfillProgressCompletedCh <- completed:
598+
t.Logf("before index backfill progress update, completed spans: %v", completed)
601599
}
602-
},
600+
}
603601
},
604-
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
605-
RunBeforeBackfill: func(progresses []scexec.BackfillProgress) error {
606-
if isBlockingCheckpoint.Load() && progresses != nil && progresses[0].CompletedSpans != nil {
607-
checkpointedSpansCh <- progresses[0].CompletedSpans
608-
}
609-
return nil
610-
},
602+
},
603+
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
604+
RunBeforeBackfill: func(progresses []scexec.BackfillProgress) error {
605+
if progresses != nil {
606+
t.Logf("before resuming backfill, checkpointed spans: %v", progresses[0].CompletedSpans)
607+
}
608+
return nil
611609
},
612610
},
613611
},
614-
}
615-
// Start the server with a testing knob
616-
tc := testcluster.NewTestCluster(t, 1, clusterArgs)
617-
tc.Start(t)
612+
})
618613
defer tc.Stopper().Stop(ctx)
619-
db := tc.Conns[0]
620614

621615
_, err := db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.batch_size = 10`)
622616
require.NoError(t, err)
623617
// Ensure that we checkpoint our progress to the backfill job so that
624618
// RESUMEs can get an up-to-date backfill progress.
625-
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.checkpoint_interval = '0s'`)
619+
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.checkpoint_interval = '10ms'`)
626620
require.NoError(t, err)
627621
_, err = db.Exec(`CREATE TABLE t(i INT PRIMARY KEY)`)
628622
require.NoError(t, err)
629623
_, err = db.Exec(`INSERT INTO t SELECT generate_series(1, $1)`, numSpans)
630624
require.NoError(t, err)
631625
_, err = db.Exec(`ALTER TABLE t SPLIT AT TABLE generate_series(1, $1)`, numSpans)
632626
require.NoError(t, err)
633-
require.NoError(t, tc.WaitForFullReplication())
634627
var descID catid.DescID
635628
descIDRow := db.QueryRow(`SELECT 't'::regclass::oid`)
636629
err = descIDRow.Scan(&descID)
@@ -646,112 +639,118 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
646639
return nil
647640
})
648641

649-
g.GoCtx(func(ctx context.Context) error {
650-
testutils.SucceedsSoon(t, func() error {
651-
jobIDRow := db.QueryRow(`
642+
testutils.SucceedsWithin(t, func() error {
643+
jobIDRow := db.QueryRow(`
652644
SELECT job_id FROM [SHOW JOBS]
653-
WHERE job_type = 'NEW SCHEMA CHANGE' AND description ILIKE '%ADD COLUMN%'`,
654-
)
655-
if err := jobIDRow.Scan(&jobID); err != nil {
645+
WHERE job_type = 'NEW SCHEMA CHANGE' AND description ILIKE '%ADD COLUMN j%'`,
646+
)
647+
if err := jobIDRow.Scan(&jobID); err != nil {
648+
return err
649+
}
650+
return nil
651+
}, 5*time.Second)
652+
653+
ensureJobState := func(targetState string) {
654+
testutils.SucceedsWithin(t, func() error {
655+
var jobState string
656+
statusRow := db.QueryRow(`SELECT status FROM [SHOW JOB $1]`, jobID)
657+
if err := statusRow.Scan(&jobState); err != nil {
656658
return err
657659
}
660+
if jobState != targetState {
661+
return errors.Errorf("expected job to be %s, but found status: %s",
662+
targetState, jobState)
663+
}
658664
return nil
659-
})
665+
}, 5*time.Second)
666+
}
660667

661-
ensureJobState := func(targetState string) {
662-
testutils.SucceedsSoon(t, func() error {
663-
var jobState string
664-
statusRow := db.QueryRow(`SELECT status FROM [SHOW JOB $1]`, jobID)
665-
if err := statusRow.Scan(&jobState); err != nil {
666-
return err
668+
var completedSpans roachpb.SpanGroup
669+
receiveProgressUpdate := func() {
670+
progressUpdate := <-backfillProgressCompletedCh
671+
672+
// Make sure the progress update does not contain overlapping spans.
673+
for i, span1 := range progressUpdate {
674+
for j, span2 := range progressUpdate {
675+
if i <= j {
676+
continue
667677
}
668-
if jobState != targetState {
669-
return errors.Errorf("expected job to be %s, but found status: %s",
670-
targetState, jobState)
678+
if span1.Overlaps(span2) {
679+
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
671680
}
672-
return nil
673-
})
681+
}
674682
}
683+
completedSpans.Add(progressUpdate...)
684+
}
675685

676-
// Let the backfill step forward a bit before we do our PAUSE/RESUME
677-
// dance.
678-
var spansCompletedBeforePause []roachpb.Span
679-
for i := 0; i < 2; i++ {
680-
backfillProgressUpdateCh <- struct{}{}
681-
spansCompletedBeforePause = <-backfillProgressCompletedCh
682-
}
686+
ensureCompletedSpansAreCheckpointed := func() {
687+
testutils.SucceedsWithin(t, func() error {
688+
stmt := `SELECT payload FROM crdb_internal.system_jobs WHERE id = $1`
689+
var payloadBytes []byte
690+
if err := db.QueryRowContext(ctx, stmt, jobID).Scan(&payloadBytes); err != nil {
691+
return err
692+
}
683693

684-
_, err := db.Exec(`PAUSE JOB $1`, jobID)
685-
if err != nil {
686-
return err
687-
}
688-
ensureJobState("paused")
694+
payload := &jobspb.Payload{}
695+
if err := protoutil.Unmarshal(payloadBytes, payload); err != nil {
696+
return err
697+
}
689698

690-
_, err = db.Exec(`RESUME JOB $1`, jobID)
691-
if err != nil {
692-
return err
693-
}
694-
ensureJobState("running")
699+
schemaChangeProgress := *(payload.Details.(*jobspb.Payload_NewSchemaChange).NewSchemaChange)
700+
var checkpointedSpans []roachpb.Span
701+
if len(schemaChangeProgress.BackfillProgress) > 0 {
702+
checkpointedSpans = schemaChangeProgress.BackfillProgress[0].CompletedSpans
703+
}
704+
var sg roachpb.SpanGroup
705+
sg.Add(checkpointedSpans...)
706+
// Ensure that the spans we already completed are fully contained in our
707+
// checkpointed completed spans group.
708+
if !sg.Encloses(completedSpans.Slice()...) {
709+
return errors.Errorf("checkpointed spans %v do not enclose completed spans %v",
710+
checkpointedSpans, completedSpans.Slice())
711+
}
695712

696-
for i := 0; i < 2; i++ {
697-
backfillProgressUpdateCh <- struct{}{}
698-
<-backfillProgressCompletedCh
699-
}
700-
isBlockingCheckpoint.Store(true)
713+
return nil
714+
}, 5*time.Second)
715+
}
701716

702-
_, err = db.Exec(`PAUSE JOB $1`, jobID)
703-
if err != nil {
704-
return err
705-
}
706-
ensureJobState("paused")
717+
// Let the backfill step forward a bit before we do our PAUSE/RESUME
718+
// dance.
719+
for i := 0; i < 2; i++ {
720+
receiveProgressUpdate()
721+
}
707722

708-
_, err = db.Exec(`RESUME JOB $1`, jobID)
709-
if err != nil {
710-
return err
711-
}
712-
ensureJobState("running")
713-
714-
var wg sync.WaitGroup
715-
wg.Add(1)
716-
go func() {
717-
defer wg.Done()
718-
timer := time.NewTimer(1 * time.Minute)
719-
defer timer.Stop()
720-
select {
721-
case checkpointed := <-checkpointedSpansCh:
722-
isBlockingCheckpoint.Store(false)
723-
var sg roachpb.SpanGroup
724-
sg.Add(checkpointed...)
725-
// Ensure that the spans we completed before any PAUSE is
726-
// fully contained in our checkpointed completed spans group.
727-
require.True(t, sg.Encloses(spansCompletedBeforePause...))
728-
case <-timer.C:
729-
require.Fail(t, "timed out waiting for checkpoint")
730-
}
731-
}()
732-
733-
wg.Add(1)
734-
go func() {
735-
defer wg.Done()
736-
737-
isBlockingBackfillProgress.Store(false)
738-
timer := time.NewTimer(1 * time.Minute)
739-
defer timer.Stop()
740-
select {
741-
case backfillProgressUpdateCh <- struct{}{}:
742-
<-backfillProgressCompletedCh
743-
case <-timer.C:
744-
require.Fail(t, "timed out waiting for backfill progress")
745-
}
746-
}()
723+
ensureCompletedSpansAreCheckpointed()
724+
t.Logf("pausing backfill")
725+
_, err = db.Exec(`PAUSE JOB $1`, jobID)
726+
require.NoError(t, err)
727+
ensureJobState("paused")
747728

748-
// Wait for both operations to complete
749-
wg.Wait()
729+
t.Logf("resuming backfill")
730+
_, err = db.Exec(`RESUME JOB $1`, jobID)
731+
require.NoError(t, err)
732+
ensureJobState("running")
750733

751-
// Now we can wait for the job to succeed
752-
ensureJobState("succeeded")
753-
return nil
754-
})
734+
// Step forward again before re-pausing.
735+
for i := 0; i < 2; i++ {
736+
receiveProgressUpdate()
737+
}
738+
739+
ensureCompletedSpansAreCheckpointed()
740+
isBlockingBackfillProgress.Store(false)
741+
742+
t.Logf("pausing backfill")
743+
_, err = db.Exec(`PAUSE JOB $1`, jobID)
744+
require.NoError(t, err)
745+
ensureJobState("paused")
746+
747+
t.Logf("resuming backfill")
748+
_, err = db.Exec(`RESUME JOB $1`, jobID)
749+
require.NoError(t, err)
750+
ensureJobState("running")
751+
752+
// Now we can wait for the job to succeed
753+
ensureJobState("succeeded")
755754

756755
if err = g.Wait(); err != nil {
757756
require.NoError(t, err)

0 commit comments

Comments
 (0)