Skip to content

Commit fcf3364

Browse files
committed
changefeedccl: improve target creation for lagging tables
Previously, we would iterate through all feed targets to create the pts targets for a lagging tables, which was very inefficient. We improve this efficiency by using the underlying map by tableID. We also do only a single write to the jobInfo table instead of one for each record we create. Informs: #147788 Epic: CRDB-1421 Release note: None
1 parent d71deec commit fcf3364

File tree

1 file changed

+43
-27
lines changed

1 file changed

+43
-27
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1985,6 +1985,7 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
19851985

19861986
pts := cf.FlowCtx.Cfg.ProtectedTimestampProvider.WithTxn(txn)
19871987
tableIDsToRelease := make([]descpb.ID, 0)
1988+
tableIDsToCreate := make(map[descpb.ID]hlc.Timestamp)
19881989
for tableID, frontier := range cf.frontier.Frontiers() {
19891990
tableHighWater := func() hlc.Timestamp {
19901991
// If this table has not yet finished its initial scan, we use the highwater
@@ -2016,10 +2017,7 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20162017
}
20172018
} else {
20182019
// TODO(#152448): Do not include system table protections in these records.
2019-
if err := cf.createPerTableProtectedTimestampRecord(ctx, txn, ptsEntries, tableID, tableHighWater, pts); err != nil {
2020-
return hlc.Timestamp{}, false, err
2021-
}
2022-
updatedPerTablePTS = true
2020+
tableIDsToCreate[tableID] = tableHighWater
20232021
}
20242022
}
20252023

@@ -2030,6 +2028,13 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20302028
updatedPerTablePTS = true
20312029
}
20322030

2031+
if len(tableIDsToCreate) > 0 {
2032+
if err := cf.createPerTableProtectedTimestampRecords(ctx, txn, ptsEntries, tableIDsToCreate, pts); err != nil {
2033+
return hlc.Timestamp{}, false, err
2034+
}
2035+
updatedPerTablePTS = true
2036+
}
2037+
20332038
return newPTS, updatedPerTablePTS, nil
20342039
}
20352040

@@ -2072,40 +2077,51 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20722077
return true, nil
20732078
}
20742079

2075-
func (cf *changeFrontier) createPerTableProtectedTimestampRecord(
2080+
func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
20762081
ctx context.Context,
20772082
txn isql.Txn,
20782083
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
2079-
tableID descpb.ID,
2080-
tableHighWater hlc.Timestamp,
2084+
tableIDsToCreate map[descpb.ID]hlc.Timestamp,
20812085
pts protectedts.Storage,
20822086
) error {
2083-
// If the table is lagging and doesn't have a per table PTS record,
2084-
// we create a new one.
2085-
targets := changefeedbase.Targets{}
2086-
if cf.targets.Size > 0 {
2087-
err := cf.targets.EachTarget(func(target changefeedbase.Target) error {
2088-
if target.DescID == tableID {
2089-
targets.Add(target)
2090-
}
2091-
return nil
2092-
})
2087+
if ptsEntries.ProtectedTimestampRecords == nil {
2088+
ptsEntries.ProtectedTimestampRecords = make(map[descpb.ID]*uuid.UUID)
2089+
}
2090+
for tableID, tableHighWater := range tableIDsToCreate {
2091+
targets, err := cf.createPerTablePTSTarget(tableID)
20932092
if err != nil {
20942093
return err
20952094
}
2095+
ptr := createProtectedTimestampRecord(
2096+
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, targets, tableHighWater,
2097+
)
2098+
uuid := ptr.ID.GetUUID()
2099+
ptsEntries.ProtectedTimestampRecords[tableID] = &uuid
2100+
if err := pts.Protect(ctx, ptr); err != nil {
2101+
return err
2102+
}
20962103
}
2097-
ptr := createProtectedTimestampRecord(
2098-
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, targets, tableHighWater,
2099-
)
2100-
if ptsEntries.ProtectedTimestampRecords == nil {
2101-
ptsEntries.ProtectedTimestampRecords = make(map[descpb.ID]*uuid.UUID)
2104+
return writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID)
2105+
}
2106+
2107+
func (cf *changeFrontier) createPerTablePTSTarget(
2108+
tableID descpb.ID,
2109+
) (changefeedbase.Targets, error) {
2110+
targets := changefeedbase.Targets{}
2111+
if cf.targets.Size > 0 {
2112+
if found, err := cf.targets.EachHavingTableID(tableID, func(target changefeedbase.Target) error {
2113+
targets.Add(target)
2114+
return nil
2115+
}); err != nil {
2116+
return changefeedbase.Targets{}, err
2117+
} else if !found {
2118+
return changefeedbase.Targets{}, errors.AssertionFailedf("attempted to create a per-table PTS record for table %d, but no target was found", tableID)
2119+
}
21022120
}
2103-
uuid := ptr.ID.GetUUID()
2104-
ptsEntries.ProtectedTimestampRecords[tableID] = &uuid
2105-
if err := pts.Protect(ctx, ptr); err != nil {
2106-
return err
2121+
if targets.Size != 1 {
2122+
return changefeedbase.Targets{}, errors.AssertionFailedf("expected 1 target, got %d", targets.Size)
21072123
}
2108-
return writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID)
2124+
return targets, nil
21092125
}
21102126

21112127
func (cf *changeFrontier) advanceProtectedTimestamp(

0 commit comments

Comments
 (0)