Skip to content

Commit 4fe0aa8

Browse files
craig[bot]KeithCh
andcommitted
Merge #152598
152598: changefeedccl: fix db-level feed error handling in changefeed processors r=andyyang890,aerfrei a=KeithCh AllTargets can error when called from the changefeed processor due to flakes unrelated to changefeeds. This commit fixes the error handling so that it doesn't result in a panic. Part of: #147371 Epic: CRDB-1421 Release note: none Co-authored-by: Keith Chow <[email protected]>
2 parents a5cca5e + a5c9f6e commit 4fe0aa8

File tree

3 files changed

+38
-26
lines changed

3 files changed

+38
-26
lines changed

pkg/ccl/changefeedccl/changefeed.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,8 @@ type ChangefeedConfig struct {
4343
// makeChangefeedConfigFromJobDetails creates a ChangefeedConfig struct from any
4444
// version of the ChangefeedDetails protobuf.
4545
func makeChangefeedConfigFromJobDetails(
46-
ctx context.Context, d jobspb.ChangefeedDetails, execCfg *sql.ExecutorConfig,
46+
d jobspb.ChangefeedDetails, targets changefeedbase.Targets,
4747
) (ChangefeedConfig, error) {
48-
targets, err := AllTargets(ctx, d, execCfg)
49-
if err != nil {
50-
return ChangefeedConfig{}, err
51-
}
5248
return ChangefeedConfig{
5349
SinkURI: d.SinkURI,
5450
Opts: changefeedbase.MakeStatementOptions(d.Opts),

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -201,19 +201,20 @@ func newChangeAggregatorProcessor(
201201
) (_ execinfra.Processor, retErr error) {
202202
// Setup monitoring for this node drain.
203203
drainWatcher, drainDone := makeDrainWatcher(flowCtx)
204-
defer func() {
205-
if retErr != nil {
206-
drainDone()
207-
}
208-
}()
209-
210204
memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, mon.MakeName("changeagg-mem"))
211205
ca := &changeAggregator{
212206
spec: spec,
213207
memAcc: memMonitor.MakeBoundAccount(),
214208
checkForNodeDrain: drainWatcher.checkForNodeDrain,
215209
}
216210

211+
defer func() {
212+
if retErr != nil {
213+
ca.close()
214+
drainDone()
215+
}
216+
}()
217+
217218
if err := ca.Init(
218219
ctx,
219220
ca,
@@ -351,7 +352,19 @@ func (ca *changeAggregator) Start(ctx context.Context) {
351352
return
352353
}
353354

354-
feed, err := makeChangefeedConfigFromJobDetails(ctx, ca.spec.Feed, ca.FlowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig))
355+
execCfg := ca.FlowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig)
356+
if ca.knobs.OverrideExecCfg != nil {
357+
execCfg = ca.knobs.OverrideExecCfg(execCfg)
358+
}
359+
ca.targets, err = AllTargets(ctx, ca.spec.Feed, execCfg)
360+
if err != nil {
361+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting targets: %v", err)
362+
ca.MoveToDraining(err)
363+
ca.cancel()
364+
return
365+
}
366+
367+
feed, err := makeChangefeedConfigFromJobDetails(ca.spec.Feed, ca.targets)
355368
if err != nil {
356369
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error making changefeed config: %v", err)
357370
ca.MoveToDraining(err)
@@ -382,13 +395,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {
382395
return
383396
}
384397
ca.sliMetricsID = ca.sliMetrics.claimId()
385-
ca.targets, err = AllTargets(ctx, ca.spec.Feed, ca.FlowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig))
386-
if err != nil {
387-
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting targets: %v", err)
388-
ca.MoveToDraining(err)
389-
ca.cancel()
390-
return
391-
}
392398

393399
recorder := metricsRecorder(ca.sliMetrics)
394400
recorder, err = ca.wrapMetricsRecorderWithTelemetry(ctx, recorder, ca.targets)
@@ -1231,7 +1237,7 @@ func newChangeFrontierProcessor(
12311237
spec execinfrapb.ChangeFrontierSpec,
12321238
input execinfra.RowSource,
12331239
post *execinfrapb.PostProcessSpec,
1234-
) (execinfra.Processor, error) {
1240+
) (_ execinfra.Processor, retErr error) {
12351241
memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, mon.MakeName("changefntr-mem"))
12361242

12371243
cf := &changeFrontier{
@@ -1244,6 +1250,12 @@ func newChangeFrontierProcessor(
12441250
usageWgCancel: func() {},
12451251
}
12461252

1253+
defer func() {
1254+
if retErr != nil {
1255+
cf.close()
1256+
}
1257+
}()
1258+
12471259
if cfKnobs, ok := flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok {
12481260
cf.knobs = *cfKnobs
12491261
}
@@ -1306,7 +1318,11 @@ func newChangeFrontierProcessor(
13061318
if err != nil {
13071319
return nil, err
13081320
}
1309-
targets, err := AllTargets(ctx, spec.Feed, flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig))
1321+
execCfg := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig)
1322+
if cf.knobs.OverrideExecCfg != nil {
1323+
execCfg = cf.knobs.OverrideExecCfg(execCfg)
1324+
}
1325+
targets, err := AllTargets(ctx, spec.Feed, execCfg)
13101326
if err != nil {
13111327
return nil, err
13121328
}

pkg/ccl/changefeedccl/event_processing_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,16 +153,16 @@ func TestTopicForEvent(t *testing.T) {
153153
},
154154
} {
155155
t.Run(tc.name, func(t *testing.T) {
156-
cfg, err := makeChangefeedConfigFromJobDetails(context.Background(), tc.details, nil)
156+
// Can pass in nil for execCfg to AllTargets because we're not testing
157+
// database-level targets.
158+
targets, err := AllTargets(context.Background(), tc.details, nil)
159+
require.NoError(t, err)
160+
cfg, err := makeChangefeedConfigFromJobDetails(tc.details, targets)
157161
require.NoError(t, err)
158162
c := kvEventToRowConsumer{
159163
details: cfg,
160164
topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor),
161165
}
162-
// Can pass in nil for execCfg to AllTargets because we're not testing
163-
// database-level targets.
164-
targets, err := AllTargets(context.Background(), tc.details, nil)
165-
require.NoError(t, err)
166166
tn, err := MakeTopicNamer(targets)
167167
require.NoError(t, err)
168168

0 commit comments

Comments
 (0)