Skip to content

Commit 1d556c8

Browse files
committed
changefeedccl: fix timestamp for desc fetches during planning
When a changefeed is planned and executed, there are several places where target table descriptors are fetched. With a db-level changefeed, the set of tables can change during changefeed startup. Now, the timestamp is set to the schema timestamp for retrieving table descriptors. Adding a schema_ts to the protobuf for ChangeAggregatorSpec and ChangeFrontierSpec. Fixes: #154549 Epic: CRDB-1421 Release note: None
1 parent ae0b3d7 commit 1d556c8

13 files changed

+284
-67
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,11 @@ func generateAndValidateNewTargets(
448448
return nil, nil, hlc.Timestamp{}, nil, err
449449
}
450450

451-
prevTargets, err := AllTargets(ctx, prevDetails, p.ExecCfg())
451+
targetTS := prevProgress.GetHighWater()
452+
if targetTS == nil || targetTS.IsEmpty() {
453+
targetTS = &prevDetails.StatementTime
454+
}
455+
prevTargets, err := AllTargets(ctx, prevDetails, p.ExecCfg(), *targetTS)
452456
if err != nil {
453457
return nil, nil, hlc.Timestamp{}, nil, err
454458
}

pkg/ccl/changefeedccl/changefeed.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ func makeChangefeedConfigFromJobDetails(
6262
// from the statement time name map in old protos
6363
// or the TargetSpecifications in new ones.
6464
func AllTargets(
65-
ctx context.Context, cd jobspb.ChangefeedDetails, execCfg *sql.ExecutorConfig,
65+
ctx context.Context,
66+
cd jobspb.ChangefeedDetails,
67+
execCfg *sql.ExecutorConfig,
68+
timestamp hlc.Timestamp,
6669
) (changefeedbase.Targets, error) {
6770
targets := changefeedbase.Targets{}
6871
var err error
@@ -76,7 +79,7 @@ func AllTargets(
7679
if len(cd.TargetSpecifications) > 1 {
7780
return changefeedbase.Targets{}, errors.AssertionFailedf("database-level changefeed is not supported with multiple targets")
7881
}
79-
targets, err = getTargetsFromDatabaseSpec(ctx, ts, execCfg)
82+
targets, err = getTargetsFromDatabaseSpec(ctx, ts, execCfg, timestamp)
8083
if err != nil {
8184
return changefeedbase.Targets{}, err
8285
}
@@ -110,11 +113,17 @@ func AllTargets(
110113
}
111114

112115
func getTargetsFromDatabaseSpec(
113-
ctx context.Context, ts jobspb.ChangefeedTargetSpecification, execCfg *sql.ExecutorConfig,
116+
ctx context.Context,
117+
ts jobspb.ChangefeedTargetSpecification,
118+
execCfg *sql.ExecutorConfig,
119+
timestamp hlc.Timestamp,
114120
) (targets changefeedbase.Targets, err error) {
115121
err = sql.DescsTxn(ctx, execCfg, func(
116122
ctx context.Context, txn isql.Txn, descs *descs.Collection,
117123
) error {
124+
if err := txn.KV().SetFixedTimestamp(ctx, timestamp); err != nil {
125+
return errors.Wrapf(err, "setting timestamp for table descriptor fetch")
126+
}
118127
databaseDescriptor, err := descs.ByIDWithLeased(txn.KV()).Get().Database(ctx, ts.DescID)
119128
if err != nil {
120129
return err

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 46 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -56,35 +56,16 @@ const (
5656
changeFrontierProcName = `changefntr`
5757
)
5858

59-
// distChangefeedFlow plans and runs a distributed changefeed.
60-
//
61-
// One or more ChangeAggregator processors watch table data for changes. These
62-
// transform the changed kvs into changed rows and either emit them to a sink
63-
// (such as kafka) or, if there is no sink, forward them in columns 1,2,3 (where
64-
// they will be eventually returned directly via pgwire). In either case,
65-
// periodically a span will become resolved as of some timestamp, meaning that
66-
// no new rows will ever be emitted at or below that timestamp. These span-level
67-
// resolved timestamps are emitted as a marshaled `jobspb.ResolvedSpan` proto in
68-
// column 0.
69-
//
70-
// The flow will always have exactly one ChangeFrontier processor which all the
71-
// ChangeAggregators feed into. It collects all span-level resolved timestamps
72-
// and aggregates them into a changefeed-level resolved timestamp, which is the
73-
// minimum of the span-level resolved timestamps. This changefeed-level resolved
74-
// timestamp is emitted into the changefeed sink (or returned to the gateway if
75-
// there is no sink) whenever it advances. ChangeFrontier also updates the
76-
// progress of the changefeed's corresponding system job.
77-
func distChangefeedFlow(
59+
// computeDistChangefeedTimestamps computes the initialHighWater and schemaTS
60+
// for a changefeed run, and mutates localState.progress when appropriate
61+
// (e.g., to set the high-water if initial scan should be skipped). It also
62+
// invokes testing knobs that observe the initial high-water.
63+
func computeDistChangefeedTimestamps(
7864
ctx context.Context,
7965
execCtx sql.JobExecContext,
80-
jobID jobspb.JobID,
8166
details jobspb.ChangefeedDetails,
82-
description string,
8367
localState *cachedState,
84-
resultsCh chan<- tree.Datums,
85-
onTracingEvent func(ctx context.Context, meta *execinfrapb.TracingAggregatorEvents),
86-
targets changefeedbase.Targets,
87-
) error {
68+
) (initialHighWater hlc.Timestamp, schemaTS hlc.Timestamp, _ error) {
8869
opts := changefeedbase.MakeStatementOptions(details.Opts)
8970
progress := localState.progress
9071

@@ -100,44 +81,38 @@ func distChangefeedFlow(
10081
// cursor but we have a request to not have an initial scan.
10182
initialScanType, err := opts.GetInitialScanType()
10283
if err != nil {
103-
return err
84+
return hlc.Timestamp{}, hlc.Timestamp{}, err
10485
}
10586
if noHighWater && initialScanType == changefeedbase.NoInitialScan {
10687
// If there is a cursor, the statement time has already been set to it.
10788
progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime}
10889
}
10990
}
11091

111-
var initialHighWater hlc.Timestamp
112-
schemaTS := details.StatementTime
113-
{
114-
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
115-
initialHighWater = *h
116-
// If we have a high-water set, use it to compute the spans, since the
117-
// ones at the statement time may have been garbage collected by now.
118-
schemaTS = initialHighWater
119-
}
120-
121-
// We want to fetch the target spans as of the timestamp following the
122-
// highwater unless the highwater corresponds to a timestamp of an initial
123-
// scan. This logic is irritatingly complex but extremely important. Namely,
124-
// we may be here because the schema changed at the current resolved
125-
// timestamp. However, an initial scan should be performed at exactly the
126-
// timestamp specified; initial scans can be created at the timestamp of a
127-
// schema change and thus should see the side-effect of the schema change.
128-
isRestartAfterCheckpointOrNoInitialScan := progress.GetHighWater() != nil
129-
if isRestartAfterCheckpointOrNoInitialScan {
130-
schemaTS = schemaTS.Next()
131-
}
92+
schemaTS = details.StatementTime
93+
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
94+
initialHighWater = *h
95+
// If we have a high-water set, use it to compute the spans, since the
96+
// ones at the statement time may have been garbage collected by now.
97+
schemaTS = initialHighWater
98+
}
99+
// We want to fetch the target spans as of the timestamp following the
100+
// highwater unless the highwater corresponds to a timestamp of an initial
101+
// scan. This logic is irritatingly complex but extremely important. Namely,
102+
// we may be here because the schema changed at the current resolved
103+
// timestamp. However, an initial scan should be performed at exactly the
104+
// timestamp specified; initial scans can be created at the timestamp of a
105+
// schema change and thus should see the side-effect of the schema change.
106+
if progress.GetHighWater() != nil {
107+
schemaTS = schemaTS.Next()
132108
}
133109

134110
if knobs, ok := execCtx.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs); ok {
135111
if knobs != nil && knobs.StartDistChangefeedInitialHighwater != nil {
136112
knobs.StartDistChangefeedInitialHighwater(ctx, initialHighWater)
137113
}
138114
}
139-
return startDistChangefeed(
140-
ctx, execCtx, jobID, schemaTS, details, description, initialHighWater, localState, resultsCh, onTracingEvent, targets)
115+
return initialHighWater, schemaTS, nil
141116
}
142117

143118
func fetchTableDescriptors(
@@ -225,7 +200,24 @@ func fetchSpansForTables(
225200
sd, tableDescs[0], initialHighwater, target, sc)
226201
}
227202

228-
// startDistChangefeed starts distributed changefeed execution.
203+
// startDistChangefeed plans and runs a distributed changefeed.
204+
//
205+
// One or more ChangeAggregator processors watch table data for changes. These
206+
// transform the changed kvs into changed rows and either emit them to a sink
207+
// (such as kafka) or, if there is no sink, forward them in columns 1,2,3 (where
208+
// they will be eventually returned directly via pgwire). In either case,
209+
// periodically a span will become resolved as of some timestamp, meaning that
210+
// no new rows will ever be emitted at or below that timestamp. These span-level
211+
// resolved timestamps are emitted as a marshaled `jobspb.ResolvedSpan` proto in
212+
// column 0.
213+
//
214+
// The flow will always have exactly one ChangeFrontier processor which all the
215+
// ChangeAggregators feed into. It collects all span-level resolved timestamps
216+
// and aggregates them into a changefeed-level resolved timestamp, which is the
217+
// minimum of the span-level resolved timestamps. This changefeed-level resolved
218+
// timestamp is emitted into the changefeed sink (or returned to the gateway if
219+
// there is no sink) whenever it advances. ChangeFrontier also updates the
220+
// progress of the changefeed's corresponding system job.
229221
func startDistChangefeed(
230222
ctx context.Context,
231223
execCtx sql.JobExecContext,
@@ -270,7 +262,7 @@ func startDistChangefeed(
270262
}
271263
}
272264
p, planCtx, err := makePlan(execCtx, jobID, details, description, initialHighWater,
273-
trackedSpans, spanLevelCheckpoint, localState.drainingNodes)(ctx, dsp)
265+
trackedSpans, spanLevelCheckpoint, localState.drainingNodes, schemaTS)(ctx, dsp)
274266
if err != nil {
275267
return err
276268
}
@@ -395,6 +387,7 @@ func makePlan(
395387
trackedSpans []roachpb.Span,
396388
spanLevelCheckpoint *jobspb.TimestampSpansMap,
397389
drainingNodes []roachpb.NodeID,
390+
schemaTS hlc.Timestamp,
398391
) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
399392
return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
400393
sv := &execCtx.ExecCfg().Settings.SV
@@ -535,6 +528,7 @@ func makePlan(
535528
Description: description,
536529
ProgressConfig: progressConfig,
537530
ResolvedSpans: resolvedSpans,
531+
SchemaTS: &schemaTS,
538532
}
539533
}
540534

@@ -551,6 +545,7 @@ func makePlan(
551545
Description: description,
552546
ProgressConfig: progressConfig,
553547
ResolvedSpans: resolvedSpans,
548+
SchemaTS: &schemaTS,
554549
}
555550

556551
if haveKnobs && maybeCfKnobs.OnDistflowSpec != nil {

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,8 @@ func (ca *changeAggregator) Start(ctx context.Context) {
378378
if ca.knobs.OverrideExecCfg != nil {
379379
execCfg = ca.knobs.OverrideExecCfg(execCfg)
380380
}
381-
ca.targets, err = AllTargets(ctx, ca.spec.Feed, execCfg)
381+
targetTS := ca.spec.GetSchemaTS()
382+
ca.targets, err = AllTargets(ctx, ca.spec.Feed, execCfg, targetTS)
382383
if err != nil {
383384
log.Changefeed.Warningf(ca.Ctx(), "moving to draining due to error getting targets: %v", err)
384385
ca.MoveToDraining(err)
@@ -1352,7 +1353,8 @@ func newChangeFrontierProcessor(
13521353
if cf.knobs.OverrideExecCfg != nil {
13531354
execCfg = cf.knobs.OverrideExecCfg(execCfg)
13541355
}
1355-
targets, err := AllTargets(ctx, spec.Feed, execCfg)
1356+
targetTS := cf.spec.GetSchemaTS()
1357+
targets, err := AllTargets(ctx, cf.spec.Feed, execCfg, targetTS)
13561358
if err != nil {
13571359
return nil, err
13581360
}

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,15 @@ func coreChangefeed(
534534
knobs.BeforeDistChangefeed()
535535
}
536536

537-
err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, description, localState, resultsCh, nil, targets)
537+
initialHighWater, schemaTS, err := computeDistChangefeedTimestamps(ctx, p, details, localState)
538+
if err != nil {
539+
return err
540+
}
541+
maybeCfKnobs, haveKnobs := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs)
542+
if haveKnobs && maybeCfKnobs.AfterComputeDistChangefeedTimestamps != nil {
543+
maybeCfKnobs.AfterComputeDistChangefeedTimestamps(ctx)
544+
}
545+
err = startDistChangefeed(ctx, p, 0 /* jobID */, schemaTS, details, description, initialHighWater, localState, resultsCh, nil, targets)
538546
if err == nil {
539547
log.Changefeed.Infof(ctx, "core changefeed completed with no error")
540548
return nil
@@ -744,7 +752,7 @@ func createChangefeedJobRecord(
744752
return nil, changefeedbase.Targets{}, errors.AssertionFailedf("unknown changefeed level: %s", changefeedStmt.Level)
745753
}
746754

747-
targets, err := AllTargets(ctx, details, p.ExecCfg())
755+
targets, err := AllTargets(ctx, details, p.ExecCfg(), statementTime)
748756
if err != nil {
749757
return nil, changefeedbase.Targets{}, err
750758
}
@@ -1757,13 +1765,22 @@ func (b *changefeedResumer) resumeWithRetries(
17571765

17581766
confPoller := make(chan struct{})
17591767
g := ctxgroup.WithContext(ctx)
1760-
targets, err := AllTargets(ctx, details, execCfg)
1768+
initialHighWater, schemaTS, err := computeDistChangefeedTimestamps(ctx, jobExec, details, localState)
1769+
if err != nil {
1770+
return err
1771+
}
1772+
maybeCfKnobs, haveKnobs := execCfg.DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs)
1773+
if haveKnobs && maybeCfKnobs.AfterComputeDistChangefeedTimestamps != nil {
1774+
maybeCfKnobs.AfterComputeDistChangefeedTimestamps(ctx)
1775+
}
1776+
targets, err := AllTargets(ctx, details, execCfg, schemaTS)
17611777
if err != nil {
17621778
return err
17631779
}
17641780
g.GoCtx(func(ctx context.Context) error {
17651781
defer close(confPoller)
1766-
return distChangefeedFlow(ctx, jobExec, jobID, details, description, localState, startedCh, onTracingEvent, targets)
1782+
return startDistChangefeed(ctx, jobExec, jobID, schemaTS, details, description,
1783+
initialHighWater, localState, startedCh, onTracingEvent, targets)
17671784
})
17681785
g.GoCtx(func(ctx context.Context) error {
17691786
t := time.NewTicker(15 * time.Second)
@@ -2012,7 +2029,12 @@ func (b *changefeedResumer) OnFailOrCancel(
20122029

20132030
var numTargets uint
20142031
if b.job != nil {
2015-
targets, err := AllTargets(ctx, b.job.Details().(jobspb.ChangefeedDetails), execCfg)
2032+
targetsTS := progress.GetHighWater()
2033+
if targetsTS == nil || targetsTS.IsEmpty() {
2034+
details := b.job.Details().(jobspb.ChangefeedDetails)
2035+
targetsTS = &details.StatementTime
2036+
}
2037+
targets, err := AllTargets(ctx, b.job.Details().(jobspb.ChangefeedDetails), execCfg, *targetsTS)
20162038
if err != nil {
20172039
return err
20182040
}

0 commit comments

Comments
 (0)