Skip to content

Commit a5c9f6e

Browse files
committed
changefeedccl: fix db-level feed error handling in changefeed processors
AllTargets can error when called from the changefeed processor due to flakes unrelated to changefeeds. This commit fixes the error handling so that it doesn't result in a panic. Part of: #147371 Epic: CRDB-1421 Release note: None
1 parent 8ab959a commit a5c9f6e

File tree

3 files changed

+38
-26
lines changed

3 files changed

+38
-26
lines changed

pkg/ccl/changefeedccl/changefeed.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,8 @@ type ChangefeedConfig struct {
4343
// makeChangefeedConfigFromJobDetails creates a ChangefeedConfig struct from any
4444
// version of the ChangefeedDetails protobuf.
4545
func makeChangefeedConfigFromJobDetails(
46-
ctx context.Context, d jobspb.ChangefeedDetails, execCfg *sql.ExecutorConfig,
46+
d jobspb.ChangefeedDetails, targets changefeedbase.Targets,
4747
) (ChangefeedConfig, error) {
48-
targets, err := AllTargets(ctx, d, execCfg)
49-
if err != nil {
50-
return ChangefeedConfig{}, err
51-
}
5248
return ChangefeedConfig{
5349
SinkURI: d.SinkURI,
5450
Opts: changefeedbase.MakeStatementOptions(d.Opts),

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -201,19 +201,20 @@ func newChangeAggregatorProcessor(
201201
) (_ execinfra.Processor, retErr error) {
202202
// Setup monitoring for this node drain.
203203
drainWatcher, drainDone := makeDrainWatcher(flowCtx)
204-
defer func() {
205-
if retErr != nil {
206-
drainDone()
207-
}
208-
}()
209-
210204
memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, mon.MakeName("changeagg-mem"))
211205
ca := &changeAggregator{
212206
spec: spec,
213207
memAcc: memMonitor.MakeBoundAccount(),
214208
checkForNodeDrain: drainWatcher.checkForNodeDrain,
215209
}
216210

211+
defer func() {
212+
if retErr != nil {
213+
ca.close()
214+
drainDone()
215+
}
216+
}()
217+
217218
if err := ca.Init(
218219
ctx,
219220
ca,
@@ -351,7 +352,19 @@ func (ca *changeAggregator) Start(ctx context.Context) {
351352
return
352353
}
353354

354-
feed, err := makeChangefeedConfigFromJobDetails(ctx, ca.spec.Feed, ca.FlowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig))
355+
execCfg := ca.FlowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig)
356+
if ca.knobs.OverrideExecCfg != nil {
357+
execCfg = ca.knobs.OverrideExecCfg(execCfg)
358+
}
359+
ca.targets, err = AllTargets(ctx, ca.spec.Feed, execCfg)
360+
if err != nil {
361+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting targets: %v", err)
362+
ca.MoveToDraining(err)
363+
ca.cancel()
364+
return
365+
}
366+
367+
feed, err := makeChangefeedConfigFromJobDetails(ca.spec.Feed, ca.targets)
355368
if err != nil {
356369
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error making changefeed config: %v", err)
357370
ca.MoveToDraining(err)
@@ -382,13 +395,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {
382395
return
383396
}
384397
ca.sliMetricsID = ca.sliMetrics.claimId()
385-
ca.targets, err = AllTargets(ctx, ca.spec.Feed, ca.FlowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig))
386-
if err != nil {
387-
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting targets: %v", err)
388-
ca.MoveToDraining(err)
389-
ca.cancel()
390-
return
391-
}
392398

393399
recorder := metricsRecorder(ca.sliMetrics)
394400
recorder, err = ca.wrapMetricsRecorderWithTelemetry(ctx, recorder, ca.targets)
@@ -1231,7 +1237,7 @@ func newChangeFrontierProcessor(
12311237
spec execinfrapb.ChangeFrontierSpec,
12321238
input execinfra.RowSource,
12331239
post *execinfrapb.PostProcessSpec,
1234-
) (execinfra.Processor, error) {
1240+
) (_ execinfra.Processor, retErr error) {
12351241
memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, mon.MakeName("changefntr-mem"))
12361242

12371243
cf := &changeFrontier{
@@ -1244,6 +1250,12 @@ func newChangeFrontierProcessor(
12441250
usageWgCancel: func() {},
12451251
}
12461252

1253+
defer func() {
1254+
if retErr != nil {
1255+
cf.close()
1256+
}
1257+
}()
1258+
12471259
if cfKnobs, ok := flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok {
12481260
cf.knobs = *cfKnobs
12491261
}
@@ -1306,7 +1318,11 @@ func newChangeFrontierProcessor(
13061318
if err != nil {
13071319
return nil, err
13081320
}
1309-
targets, err := AllTargets(ctx, spec.Feed, flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig))
1321+
execCfg := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig)
1322+
if cf.knobs.OverrideExecCfg != nil {
1323+
execCfg = cf.knobs.OverrideExecCfg(execCfg)
1324+
}
1325+
targets, err := AllTargets(ctx, spec.Feed, execCfg)
13101326
if err != nil {
13111327
return nil, err
13121328
}

pkg/ccl/changefeedccl/event_processing_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,16 +153,16 @@ func TestTopicForEvent(t *testing.T) {
153153
},
154154
} {
155155
t.Run(tc.name, func(t *testing.T) {
156-
cfg, err := makeChangefeedConfigFromJobDetails(context.Background(), tc.details, nil)
156+
// Can pass in nil for execCfg to AllTargets because we're not testing
157+
// database-level targets.
158+
targets, err := AllTargets(context.Background(), tc.details, nil)
159+
require.NoError(t, err)
160+
cfg, err := makeChangefeedConfigFromJobDetails(tc.details, targets)
157161
require.NoError(t, err)
158162
c := kvEventToRowConsumer{
159163
details: cfg,
160164
topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor),
161165
}
162-
// Can pass in nil for execCfg to AllTargets because we're not testing
163-
// database-level targets.
164-
targets, err := AllTargets(context.Background(), tc.details, nil)
165-
require.NoError(t, err)
166166
tn, err := MakeTopicNamer(targets)
167167
require.NoError(t, err)
168168

0 commit comments

Comments
 (0)