Skip to content

Commit 3fe68dc

Browse files
authored
Merge pull request #151430 from yuzefovich/backport25.3-150165
release-25.3: sql: ensure that index backfill monitor is closed in error case
2 parents b794812 + 7cacd46 commit 3fe68dc

File tree

6 files changed

+56
-40
lines changed

6 files changed

+56
-40
lines changed

pkg/sql/backfill.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,10 +1043,7 @@ func (sc *SchemaChanger) distIndexBackfill(
10431043
)
10441044
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
10451045
chunkSize := sc.getChunkSize(indexBatchSize)
1046-
spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes, 0)
1047-
if err != nil {
1048-
return err
1049-
}
1046+
spec := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes, 0)
10501047
p, err = sc.distSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, todoSpans)
10511048
return err
10521049
}); err != nil {
@@ -1351,10 +1348,7 @@ func (sc *SchemaChanger) distColumnBackfill(
13511348
planCtx := sc.distSQLPlanner.NewPlanningCtx(
13521349
ctx, &evalCtx, nil /* planner */, txn.KV(), FullDistribution,
13531350
)
1354-
spec, err := initColumnBackfillerSpec(tableDesc, duration, chunkSize, backfillUpdateChunkSizeThresholdBytes, readAsOf)
1355-
if err != nil {
1356-
return err
1357-
}
1351+
spec := initColumnBackfillerSpec(tableDesc, duration, chunkSize, backfillUpdateChunkSizeThresholdBytes, readAsOf)
13581352
plan, err := sc.distSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, todoSpans)
13591353
if err != nil {
13601354
return err
@@ -2917,11 +2911,9 @@ func indexBackfillInTxn(
29172911
tableDesc catalog.TableDescriptor,
29182912
traceKV bool,
29192913
) error {
2920-
var indexBackfillerMon *mon.BytesMonitor
2921-
if evalCtx.Planner.Mon() != nil {
2922-
indexBackfillerMon = execinfra.NewMonitor(ctx, evalCtx.Planner.Mon(),
2923-
mon.MakeName("local-index-backfill-mon"))
2924-
}
2914+
indexBackfillerMon := execinfra.NewMonitor(
2915+
ctx, evalCtx.Planner.Mon(), mon.MakeName("local-index-backfill-mon"),
2916+
)
29252917

29262918
var backfiller backfill.IndexBackfiller
29272919
if err := backfiller.InitForLocalUse(

pkg/sql/backfill/backfill.go

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -595,13 +595,25 @@ func (ib *IndexBackfiller) ContainsInvertedIndex() bool {
595595
// InitForLocalUse initializes an IndexBackfiller for use during local execution
596596
// within a transaction. In this case, the entire backfill process is occurring
597597
// on the gateway as part of the user's transaction.
598+
//
599+
// Non-nil memory monitor must be provided. If an error is returned, it'll be
600+
// stopped automatically; otherwise, the backfiller takes ownership of the
601+
// monitor.
598602
func (ib *IndexBackfiller) InitForLocalUse(
599603
ctx context.Context,
600604
evalCtx *eval.Context,
601605
semaCtx *tree.SemaContext,
602606
desc catalog.TableDescriptor,
603607
mon *mon.BytesMonitor,
604-
) error {
608+
) (retErr error) {
609+
if mon == nil {
610+
return errors.AssertionFailedf("memory monitor must be provided")
611+
}
612+
defer func() {
613+
if retErr != nil {
614+
mon.Stop(ctx)
615+
}
616+
}()
605617

606618
// Initialize ib.added.
607619
if err := ib.initIndexes(ctx, evalCtx, desc, nil /* allowList */, 0 /*sourceIndex*/, nil); err != nil {
@@ -626,7 +638,8 @@ func (ib *IndexBackfiller) InitForLocalUse(
626638
ib.valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
627639
})
628640

629-
return ib.init(evalCtx, predicates, colExprs, mon)
641+
ib.init(evalCtx, predicates, colExprs, mon)
642+
return nil
630643
}
631644

632645
// constructExprs is a helper to construct the index and column expressions
@@ -737,14 +750,27 @@ func constructExprs(
737750
// backfill operation manages its own transactions. This separation is necessary
738751
// due to the different procedure for accessing user defined type metadata as
739752
// part of a distributed flow.
753+
//
754+
// Non-nil memory monitor must be provided. If an error is returned, it'll be
755+
// stopped automatically; otherwise, the backfiller takes ownership of the
756+
// monitor.
740757
func (ib *IndexBackfiller) InitForDistributedUse(
741758
ctx context.Context,
742759
flowCtx *execinfra.FlowCtx,
743760
desc catalog.TableDescriptor,
744761
allowList []catid.IndexID,
745762
sourceIndexID catid.IndexID,
746763
mon *mon.BytesMonitor,
747-
) error {
764+
) (retErr error) {
765+
if mon == nil {
766+
return errors.AssertionFailedf("memory monitor must be provided")
767+
}
768+
defer func() {
769+
if retErr != nil {
770+
mon.Stop(ctx)
771+
}
772+
}()
773+
748774
// We'll be modifying the eval.Context in BuildIndexEntriesChunk, so we need
749775
// to make a copy.
750776
evalCtx := flowCtx.NewEvalCtx()
@@ -801,7 +827,8 @@ func (ib *IndexBackfiller) InitForDistributedUse(
801827
ib.valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
802828
})
803829

804-
return ib.init(evalCtx, predicates, colExprs, mon)
830+
ib.init(evalCtx, predicates, colExprs, mon)
831+
return nil
805832
}
806833

807834
// Close releases the resources used by the IndexBackfiller. It can be called
@@ -933,12 +960,15 @@ func (ib *IndexBackfiller) initIndexes(
933960
}
934961

935962
// init completes the initialization of an IndexBackfiller.
963+
//
964+
// The IndexBackfiller takes ownership of the monitor which must be non-nil.
965+
// It'll be closed when the backfiller is closed.
936966
func (ib *IndexBackfiller) init(
937967
evalCtx *eval.Context,
938968
predicateExprs map[descpb.IndexID]tree.TypedExpr,
939969
colExprs map[descpb.ColumnID]tree.TypedExpr,
940970
mon *mon.BytesMonitor,
941-
) error {
971+
) {
942972
ib.evalCtx = evalCtx
943973
ib.predicates = predicateExprs
944974
ib.colExprs = colExprs
@@ -959,12 +989,8 @@ func (ib *IndexBackfiller) init(
959989
}
960990

961991
// Create a bound account associated with the index backfiller monitor.
962-
if mon == nil {
963-
return errors.AssertionFailedf("no memory monitor linked to IndexBackfiller during init")
964-
}
965992
ib.mon = mon
966993
ib.muBoundAccount.boundAccount = mon.MakeBoundAccount()
967-
return nil
968994
}
969995

970996
// BuildIndexEntriesChunk reads a chunk of rows from a table using the span sp

pkg/sql/distsql_plan_backfill.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@ func initColumnBackfillerSpec(
2828
chunkSize int64,
2929
updateChunkSizeThresholdBytes uint64,
3030
readAsOf hlc.Timestamp,
31-
) (execinfrapb.BackfillerSpec, error) {
31+
) execinfrapb.BackfillerSpec {
3232
return execinfrapb.BackfillerSpec{
3333
Table: *tbl.TableDesc(),
3434
Duration: duration,
3535
ChunkSize: chunkSize,
3636
UpdateChunkSizeThresholdBytes: updateChunkSizeThresholdBytes,
3737
ReadAsOf: readAsOf,
3838
Type: execinfrapb.BackfillerSpec_Column,
39-
}, nil
39+
}
4040
}
4141

4242
func initIndexBackfillerSpec(
@@ -46,7 +46,7 @@ func initIndexBackfillerSpec(
4646
chunkSize int64,
4747
indexesToBackfill []descpb.IndexID,
4848
sourceIndexID descpb.IndexID,
49-
) (execinfrapb.BackfillerSpec, error) {
49+
) execinfrapb.BackfillerSpec {
5050
return execinfrapb.BackfillerSpec{
5151
Table: desc,
5252
WriteAsOf: writeAsOf,
@@ -55,21 +55,21 @@ func initIndexBackfillerSpec(
5555
ChunkSize: chunkSize,
5656
IndexesToBackfill: indexesToBackfill,
5757
SourceIndexID: sourceIndexID,
58-
}, nil
58+
}
5959
}
6060

6161
func initIndexBackfillMergerSpec(
6262
desc descpb.TableDescriptor,
6363
addedIndexes []descpb.IndexID,
6464
temporaryIndexes []descpb.IndexID,
6565
mergeTimestamp hlc.Timestamp,
66-
) (execinfrapb.IndexBackfillMergerSpec, error) {
66+
) execinfrapb.IndexBackfillMergerSpec {
6767
return execinfrapb.IndexBackfillMergerSpec{
6868
Table: desc,
6969
AddedIndexes: addedIndexes,
7070
TemporaryIndexes: temporaryIndexes,
7171
MergeTimestamp: mergeTimestamp,
72-
}, nil
72+
}
7373
}
7474

7575
var initialSplitsPerProcessor = settings.RegisterIntSetting(

pkg/sql/index_backfiller.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,11 @@ func (ib *IndexBackfillPlanner) plan(
207207
// batch size. Also plumb in a testing knob.
208208
chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV)
209209
const writeAtRequestTimestamp = true
210-
spec, err := initIndexBackfillerSpec(
210+
spec := initIndexBackfillerSpec(
211211
*td.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize,
212212
indexesToBackfill, sourceIndexID,
213213
)
214-
if err != nil {
215-
return err
216-
}
214+
var err error
217215
p, err = ib.execCfg.DistSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, sourceSpans)
218216
return err
219217
}); err != nil {

pkg/sql/mvcc_backfiller.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,8 @@ func (im *IndexBackfillerMergePlanner) plan(
137137
ctx, &extEvalCtx, nil /* planner */, txn.KV(), FullDistribution,
138138
)
139139

140-
spec, err := initIndexBackfillMergerSpec(*tableDesc.TableDesc(), addedIndexes, temporaryIndexes, mergeTimestamp)
141-
if err != nil {
142-
return err
143-
}
140+
spec := initIndexBackfillMergerSpec(*tableDesc.TableDesc(), addedIndexes, temporaryIndexes, mergeTimestamp)
141+
var err error
144142
p, err = im.execCfg.DistSQLPlanner.createIndexBackfillerMergePhysicalPlan(ctx, planCtx, spec, todoSpanList)
145143
return err
146144
}); err != nil {

pkg/sql/rowexec/indexbackfiller.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,9 @@ func newIndexBackfiller(
8484
processorID int32,
8585
spec execinfrapb.BackfillerSpec,
8686
) (*indexBackfiller, error) {
87-
indexBackfillerMon := execinfra.NewMonitor(ctx, flowCtx.Cfg.BackfillerMonitor,
88-
mon.MakeName("index-backfill-mon"))
87+
indexBackfillerMon := execinfra.NewMonitor(
88+
ctx, flowCtx.Cfg.BackfillerMonitor, mon.MakeName("index-backfill-mon"),
89+
)
8990
ib := &indexBackfiller{
9091
desc: flowCtx.TableDescriptor(ctx, &spec.Table),
9192
spec: spec,
@@ -94,8 +95,9 @@ func newIndexBackfiller(
9495
filter: backfill.IndexMutationFilter,
9596
}
9697

97-
if err := ib.IndexBackfiller.InitForDistributedUse(ctx, flowCtx, ib.desc,
98-
ib.spec.IndexesToBackfill, ib.spec.SourceIndexID, indexBackfillerMon); err != nil {
98+
if err := ib.IndexBackfiller.InitForDistributedUse(
99+
ctx, flowCtx, ib.desc, ib.spec.IndexesToBackfill, ib.spec.SourceIndexID, indexBackfillerMon,
100+
); err != nil {
99101
return nil, err
100102
}
101103

0 commit comments

Comments
 (0)