Skip to content

Commit 3f6a4f4

Browse files
craig[bot]jbowensfqazistevendanna
committed
153538: roachtest: deflake disk-full roachtest r=RaduBerinde a=jbowens This is a speculative fix for a disk-full roachtest failure. This roachtest induces an out-of-disk scenario on a node and ensures that removing the automatic emergency ballast allows recovery. In #153445 it appears disk space is exhausted on the node, but the node never notices because the write workload is insufficient to trigger any write I/O to new pages. This commit adjusts the disk-full roachtest's workload to run a little longer (10m vs 4m) and to write larger-sized values (512-byte) to ensure we continue to write a nontrivial volume of data after we intentionally exhaust available disk space. Epic: none Fixes: #153445. Release note: none 153583: sql/schemachanger: fix index creation failures after pause/resume r=fqazi a=fqazi Previously, when the index backfiller was tracking completed spans, the final chunk of a span would cause an update to be emitted that incorrectly indicated the entire span was complete. While this was not an issue with a single ingest goroutine, it caused problems with the introduction of multiple ingest goroutines. As a result, if the job was paused and resumed after this incorrect progress update, the backfiller could skip rows, leading to validation errors. To address this, this patch ensures that progress updates for the final chunk of a span correctly report only the work completed in that chunk. Fixes: #153522 Release note (bug fix): Addressed a bug where index creation could fail due to validation errors if the schema change was retried or paused/resumed during the backfill. 153589: kvnemesis: correctly handle errors from DeleteRangeUsingTombstoneOperation r=miraradeva a=stevendanna Previously we were initializing the result in the DeleteRangeUsingTombstoneOperation case using the err variable that had been set to nil. Now, we correctly using the Err from the result. While here, I've also re-arranged the code so that the cases are a bit more consistent. Epic: none Release note: None Co-authored-by: Jackson Owens <[email protected]> Co-authored-by: Faizan Qazi <[email protected]> Co-authored-by: Steven Danna <[email protected]>
4 parents 1572ff3 + ea1a0a2 + 161a3f7 + 8fbbb45 commit 3f6a4f4

File tree

6 files changed

+94
-81
lines changed

6 files changed

+94
-81
lines changed

pkg/cmd/roachtest/tests/disk_full.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ func registerDiskFull(r registry.Registry) {
5454
m.Go(func(ctx context.Context) error {
5555
cmd := fmt.Sprintf(
5656
"./cockroach workload run kv --tolerate-errors --init --read-percent=0"+
57-
" --concurrency=10 --duration=4m {pgurl:2-%d}",
57+
" --min-block-bytes=512 --max-block-bytes=512"+
58+
" --concurrency=10 --duration=10m {pgurl:2-%d}",
5859
len(c.CRDBNodes()))
5960
c.Run(ctx, option.WithNodes(c.WorkloadNode()), cmd)
6061
return nil

pkg/kv/kvnemesis/applier.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -640,18 +640,17 @@ func applyBatchOp(
640640
}
641641
subO.Result.ResumeSpan = res.ResumeSpan
642642
case *PutOperation:
643-
err := b.Results[resultIdx].Err
644-
subO.Result = resultInit(ctx, err)
645-
subO.Result.ResumeSpan = b.Results[resultIdx].ResumeSpan
643+
res := b.Results[resultIdx]
644+
subO.Result = resultInit(ctx, res.Err)
645+
subO.Result.ResumeSpan = res.ResumeSpan
646646
case *ScanOperation:
647647
res := b.Results[resultIdx]
648-
kvs, err := res.Rows, res.Err
649-
if err != nil {
650-
subO.Result = resultInit(ctx, err)
648+
if res.Err != nil {
649+
subO.Result = resultInit(ctx, res.Err)
651650
} else {
652651
subO.Result.Type = ResultType_Values
653-
subO.Result.Values = make([]KeyValue, len(kvs))
654-
for j, kv := range kvs {
652+
subO.Result.Values = make([]KeyValue, len(res.Rows))
653+
for j, kv := range res.Rows {
655654
subO.Result.Values[j] = KeyValue{
656655
Key: []byte(kv.Key),
657656
Value: kv.Value.RawBytes,
@@ -660,26 +659,27 @@ func applyBatchOp(
660659
}
661660
subO.Result.ResumeSpan = res.ResumeSpan
662661
case *DeleteOperation:
663-
err := b.Results[resultIdx].Err
664-
subO.Result = resultInit(ctx, err)
665-
subO.Result.ResumeSpan = b.Results[resultIdx].ResumeSpan
662+
res := b.Results[resultIdx]
663+
subO.Result = resultInit(ctx, res.Err)
664+
subO.Result.ResumeSpan = res.ResumeSpan
666665
case *DeleteRangeOperation:
667666
res := b.Results[resultIdx]
668-
keys, err := res.Keys, res.Err
669-
if err != nil {
670-
subO.Result = resultInit(ctx, err)
667+
if res.Err != nil {
668+
subO.Result = resultInit(ctx, res.Err)
671669
} else {
672670
subO.Result.Type = ResultType_Keys
673-
subO.Result.Keys = make([][]byte, len(keys))
674-
for j, key := range keys {
671+
subO.Result.Keys = make([][]byte, len(res.Keys))
672+
for j, key := range res.Keys {
675673
subO.Result.Keys[j] = key
676674
}
677675
}
678676
subO.Result.ResumeSpan = res.ResumeSpan
679677
case *DeleteRangeUsingTombstoneOperation:
680-
subO.Result = resultInit(ctx, err)
681-
subO.Result.ResumeSpan = b.Results[resultIdx].ResumeSpan
678+
res := b.Results[resultIdx]
679+
subO.Result = resultInit(ctx, res.Err)
680+
subO.Result.ResumeSpan = res.ResumeSpan
682681
case *MutateBatchHeaderOperation:
682+
// NB: MutateBatchHeaderOperation cannot fail.
683683
subO.Result = resultInit(ctx, nil)
684684
case *AddSSTableOperation:
685685
panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`))

pkg/sql/backfill/backfill.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,19 +1010,21 @@ func (ib *IndexBackfiller) init(
10101010
// that needs to be freed once the returned IndexEntry slice is freed. This is
10111011
// returned for the successful and failure cases. It is the callers responsibility
10121012
// to clear the associated bound account when appropriate.
1013+
// A non-nil resumeKey is returned when there is still work to be done in this
1014+
// span (sp.Key < resumeKey < sp.EndKey), which happens if the entire span does
1015+
// not fit within the batch size.
10131016
func (ib *IndexBackfiller) BuildIndexEntriesChunk(
10141017
ctx context.Context,
10151018
txn *kv.Txn,
10161019
tableDesc catalog.TableDescriptor,
10171020
sp roachpb.Span,
10181021
chunkSize int64,
10191022
traceKV bool,
1020-
) ([]rowenc.IndexEntry, roachpb.Key, int64, error) {
1023+
) (entries []rowenc.IndexEntry, resumeKey roachpb.Key, memUsedPerChunk int64, err error) {
10211024
// This ought to be chunkSize but in most tests we are actually building smaller
10221025
// indexes so use a smaller value.
10231026
const initBufferSize = 1000
10241027
const sizeOfIndexEntry = int64(unsafe.Sizeof(rowenc.IndexEntry{}))
1025-
var memUsedPerChunk int64
10261028

10271029
indexEntriesInChunkInitialBufferSize :=
10281030
sizeOfIndexEntry * initBufferSize * int64(len(ib.added))
@@ -1031,7 +1033,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
10311033
"failed to initialize empty buffer to store the index entries of all rows in the chunk")
10321034
}
10331035
memUsedPerChunk += indexEntriesInChunkInitialBufferSize
1034-
entries := make([]rowenc.IndexEntry, 0, initBufferSize*int64(len(ib.added)))
1036+
entries = make([]rowenc.IndexEntry, 0, initBufferSize*int64(len(ib.added)))
10351037

10361038
var fetcherCols []descpb.ColumnID
10371039
for i, c := range ib.cols {
@@ -1264,7 +1266,6 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
12641266
ib.ShrinkBoundAccount(ctx, shrinkSize)
12651267
memUsedPerChunk -= shrinkSize
12661268

1267-
var resumeKey roachpb.Key
12681269
if fetcher.Key() != nil {
12691270
resumeKey = make(roachpb.Key, len(fetcher.Key()))
12701271
copy(resumeKey, fetcher.Key())

pkg/sql/index_backfiller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
106106

107107
knobs := &ib.execCfg.DistSQLSrv.TestingKnobs
108108
if knobs.RunBeforeIndexBackfillProgressUpdate != nil {
109-
knobs.RunBeforeIndexBackfillProgressUpdate(ctx, progress.CompletedSpans)
109+
knobs.RunBeforeIndexBackfillProgressUpdate(ctx, meta.BulkProcessorProgress.CompletedSpans)
110110
}
111111
return tracker.SetBackfillProgress(ctx, progress)
112112
}

pkg/sql/indexbackfiller_test.go

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import (
3737
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
3838
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
3939
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
40+
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
41+
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan"
4042
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
4143
"github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype"
4244
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
@@ -582,7 +584,8 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
582584

583585
ctx := context.Background()
584586
backfillProgressCompletedCh := make(chan []roachpb.Span)
585-
const numSpans = 100
587+
const numRows = 100
588+
const numSpans = 20
586589
var isBlockingBackfillProgress atomic.Bool
587590
isBlockingBackfillProgress.Store(true)
588591

@@ -611,22 +614,35 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
611614
}
612615
return nil
613616
},
617+
AfterStage: func(p scplan.Plan, stageIdx int) error {
618+
if p.Stages[stageIdx].Type() != scop.BackfillType || !isBlockingBackfillProgress.Load() {
619+
return nil
620+
}
621+
isBlockingBackfillProgress.Store(false)
622+
close(backfillProgressCompletedCh)
623+
return nil
624+
},
614625
},
615626
},
616627
})
617628
defer tc.Stopper().Stop(ctx)
618629

619630
_, err := db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.batch_size = 10`)
620631
require.NoError(t, err)
632+
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.ingest_concurrency=4`)
633+
require.NoError(t, err)
621634
// Ensure that we checkpoint our progress to the backfill job so that
622635
// RESUMEs can get an up-to-date backfill progress.
623636
_, err = db.Exec(`SET CLUSTER SETTING bulkio.index_backfill.checkpoint_interval = '10ms'`)
624637
require.NoError(t, err)
625638
_, err = db.Exec(`CREATE TABLE t(i INT PRIMARY KEY)`)
626639
require.NoError(t, err)
627-
_, err = db.Exec(`INSERT INTO t SELECT generate_series(1, $1)`, numSpans)
640+
// Have a 100 splits each containing a 100 rows.
641+
_, err = db.Exec(`INSERT INTO t SELECT generate_series(1, $1)`, (numRows*numSpans)+1)
628642
require.NoError(t, err)
629-
_, err = db.Exec(`ALTER TABLE t SPLIT AT TABLE generate_series(1, $1)`, numSpans)
643+
for split := 0; split < numSpans; split++ {
644+
_, err = db.Exec(`ALTER TABLE t SPLIT AT VALUES ($1)`, numRows*numSpans)
645+
}
630646
require.NoError(t, err)
631647
var descID catid.DescID
632648
descIDRow := db.QueryRow(`SELECT 't'::regclass::oid`)
@@ -670,21 +686,28 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
670686
}
671687

672688
var completedSpans roachpb.SpanGroup
689+
var observedSpans []roachpb.Span
673690
receiveProgressUpdate := func() {
674-
progressUpdate := <-backfillProgressCompletedCh
675-
676-
// Make sure the progress update does not contain overlapping spans.
677-
for i, span1 := range progressUpdate {
678-
for j, span2 := range progressUpdate {
679-
if i <= j {
680-
continue
681-
}
682-
if span1.Overlaps(span2) {
683-
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
691+
updateCount := 2
692+
for isBlockingBackfillProgress.Load() && updateCount > 0 {
693+
progressUpdate := <-backfillProgressCompletedCh
694+
// Make sure the progress update does not contain overlapping spans.
695+
for i, span1 := range progressUpdate {
696+
for j, span2 := range progressUpdate {
697+
if i == j {
698+
continue
699+
}
700+
if span1.Overlaps(span2) {
701+
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
702+
}
684703
}
685704
}
705+
hasMoreSpans := completedSpans.Add(progressUpdate...)
706+
if hasMoreSpans {
707+
updateCount -= 1
708+
}
709+
observedSpans = append(observedSpans, progressUpdate...)
686710
}
687-
completedSpans.Add(progressUpdate...)
688711
}
689712

690713
ensureCompletedSpansAreCheckpointed := func() {
@@ -718,45 +741,35 @@ func TestIndexBackfillerResumePreservesProgress(t *testing.T) {
718741
}, 5*time.Second)
719742
}
720743

721-
// Let the backfill step forward a bit before we do our PAUSE/RESUME
722-
// dance.
723-
for i := 0; i < 2; i++ {
744+
for isBlockingBackfillProgress.Load() {
724745
receiveProgressUpdate()
725-
}
726-
727-
ensureCompletedSpansAreCheckpointed()
728-
t.Logf("pausing backfill")
729-
_, err = db.Exec(`PAUSE JOB $1`, jobID)
730-
require.NoError(t, err)
731-
ensureJobState("paused")
732-
733-
t.Logf("resuming backfill")
734-
_, err = db.Exec(`RESUME JOB $1`, jobID)
735-
require.NoError(t, err)
736-
ensureJobState("running")
746+
ensureCompletedSpansAreCheckpointed()
747+
t.Logf("pausing backfill")
748+
_, err = db.Exec(`PAUSE JOB $1`, jobID)
749+
require.NoError(t, err)
750+
ensureJobState("paused")
737751

738-
// Step forward again before re-pausing.
739-
for i := 0; i < 2; i++ {
740-
receiveProgressUpdate()
752+
t.Logf("resuming backfill")
753+
_, err = db.Exec(`RESUME JOB $1`, jobID)
754+
require.NoError(t, err)
755+
ensureJobState("running")
741756
}
742-
743-
ensureCompletedSpansAreCheckpointed()
744-
isBlockingBackfillProgress.Store(false)
745-
746-
t.Logf("pausing backfill")
747-
_, err = db.Exec(`PAUSE JOB $1`, jobID)
748-
require.NoError(t, err)
749-
ensureJobState("paused")
750-
751-
t.Logf("resuming backfill")
752-
_, err = db.Exec(`RESUME JOB $1`, jobID)
753-
require.NoError(t, err)
754-
ensureJobState("running")
755-
756757
// Now we can wait for the job to succeed
757758
ensureJobState("succeeded")
758-
759759
if err = g.Wait(); err != nil {
760760
require.NoError(t, err)
761761
}
762+
// Make sure the spans we are adding do not overlap otherwise, this indicates
763+
// a bug. Where we computed chunks incorrectly. Each chunk should be an independent
764+
// piece of work.
765+
for i, span1 := range observedSpans {
766+
for j, span2 := range observedSpans {
767+
if i == j {
768+
continue
769+
}
770+
if span1.Overlaps(span2) {
771+
t.Fatalf("progress update contains overlapping spans: %s and %s", span1, span2)
772+
}
773+
}
774+
}
762775
}

pkg/sql/rowexec/indexbackfiller.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,8 @@ func (ib *indexBackfiller) constructIndexEntries(
162162

163163
// Identify the Span for which we have constructed index entries. This is
164164
// used for reporting progress and updating the job details.
165-
completedSpan := ib.spec.Spans[i]
165+
completedSpan := roachpb.Span{Key: startKey, EndKey: ib.spec.Spans[i].EndKey}
166166
if todo.Key != nil {
167-
completedSpan.Key = startKey
168167
completedSpan.EndKey = todo.Key
169168
}
170169

@@ -529,21 +528,20 @@ func (ib *indexBackfiller) getProgressReportInterval() time.Duration {
529528
return indexBackfillProgressReportInterval
530529
}
531530

532-
// buildIndexEntryBatch constructs the index entries for a single indexBatch.
531+
// buildIndexEntryBatch constructs the index entries for a single indexBatch for
532+
// the given span. A non-nil resumeKey is returned when there is still work to
533+
// be done in this span (sp.Key < resumeKey < sp.EndKey), which happens if the
534+
// entire span does not fit within the batch size.
533535
func (ib *indexBackfiller) buildIndexEntryBatch(
534536
tctx context.Context, sp roachpb.Span, readAsOf hlc.Timestamp,
535-
) (roachpb.Key, []rowenc.IndexEntry, int64, error) {
537+
) (resumeKey roachpb.Key, entries []rowenc.IndexEntry, memUsedBuildingBatch int64, err error) {
536538
knobs := &ib.flowCtx.Cfg.TestingKnobs
537539
if knobs.RunBeforeBackfillChunk != nil {
538540
if err := knobs.RunBeforeBackfillChunk(sp); err != nil {
539541
return nil, nil, 0, err
540542
}
541543
}
542544

543-
var memUsedBuildingBatch int64
544-
var key roachpb.Key
545-
var entries []rowenc.IndexEntry
546-
547545
br := indexBatchRetry{
548546
nextChunkSize: ib.spec.ChunkSize,
549547
// Memory used while building index entries is released by another goroutine
@@ -582,7 +580,7 @@ func (ib *indexBackfiller) buildIndexEntryBatch(
582580

583581
// TODO(knz): do KV tracing in DistSQL processors.
584582
var err error
585-
entries, key, memUsedBuildingBatch, err = ib.BuildIndexEntriesChunk(
583+
entries, resumeKey, memUsedBuildingBatch, err = ib.BuildIndexEntriesChunk(
586584
ctx, txn.KV(), ib.desc, sp, br.nextChunkSize, false, /* traceKV */
587585
)
588586
return err
@@ -598,7 +596,7 @@ func (ib *indexBackfiller) buildIndexEntryBatch(
598596
log.VEventf(ctx, 3, "index backfill stats: entries %d, prepare %+v",
599597
len(entries), prepTime)
600598

601-
return key, entries, memUsedBuildingBatch, nil
599+
return resumeKey, entries, memUsedBuildingBatch, nil
602600
}
603601

604602
// Resume is part of the execinfra.Processor interface.

0 commit comments

Comments
 (0)