Skip to content

Commit a64757f

Browse files
committed
changefeedccl: remove additional job info write from pts management
In per table PTS management, if a new table started lagging while another table stopped lagging, we would do two separate job info table writes. We would do one to create the new PTS record and one to remove the old one. With this change, we only do one job info table write. Epic: CRDB-1421 Release note: None
1 parent bf78585 commit a64757f

File tree

1 file changed

+22
-18
lines changed

1 file changed

+22
-18
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2022,14 +2022,19 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20222022
}
20232023

20242024
if len(tableIDsToRelease) > 0 {
2025-
if err := cf.releasePerTableProtectedTimestampRecords(ctx, txn, ptsEntries, tableIDsToRelease, pts); err != nil {
2025+
if err := cf.releasePerTableProtectedTimestampRecords(ctx, ptsEntries, tableIDsToRelease, pts); err != nil {
20262026
return hlc.Timestamp{}, false, err
20272027
}
2028-
updatedPerTablePTS = true
20292028
}
20302029

20312030
if len(tableIDsToCreate) > 0 {
2032-
if err := cf.createPerTableProtectedTimestampRecords(ctx, txn, ptsEntries, tableIDsToCreate, pts); err != nil {
2031+
if err := cf.createPerTableProtectedTimestampRecords(ctx, ptsEntries, tableIDsToCreate, pts); err != nil {
2032+
return hlc.Timestamp{}, false, err
2033+
}
2034+
}
2035+
2036+
if len(tableIDsToRelease) > 0 || len(tableIDsToCreate) > 0 {
2037+
if err := writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID); err != nil {
20332038
return hlc.Timestamp{}, false, err
20342039
}
20352040
updatedPerTablePTS = true
@@ -2040,7 +2045,6 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20402045

20412046
func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
20422047
ctx context.Context,
2043-
txn isql.Txn,
20442048
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
20452049
tableIDs []descpb.ID,
20462050
pts protectedts.Storage,
@@ -2051,7 +2055,7 @@ func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
20512055
}
20522056
delete(ptsEntries.ProtectedTimestampRecords, tableID)
20532057
}
2054-
return writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID)
2058+
return nil
20552059
}
20562060

20572061
func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
@@ -2079,7 +2083,6 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20792083

20802084
func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
20812085
ctx context.Context,
2082-
txn isql.Txn,
20832086
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
20842087
tableIDsToCreate map[descpb.ID]hlc.Timestamp,
20852088
pts protectedts.Storage,
@@ -2088,7 +2091,7 @@ func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
20882091
ptsEntries.ProtectedTimestampRecords = make(map[descpb.ID]*uuid.UUID)
20892092
}
20902093
for tableID, tableHighWater := range tableIDsToCreate {
2091-
targets, err := cf.createPerTablePTSTarget(tableID)
2094+
targets, err := cf.createPerTablePTSTargets(tableID)
20922095
if err != nil {
20932096
return err
20942097
}
@@ -2101,22 +2104,23 @@ func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
21012104
return err
21022105
}
21032106
}
2104-
return writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID)
2107+
return nil
21052108
}
21062109

2107-
func (cf *changeFrontier) createPerTablePTSTarget(
2110+
func (cf *changeFrontier) createPerTablePTSTargets(
21082111
tableID descpb.ID,
21092112
) (changefeedbase.Targets, error) {
21102113
targets := changefeedbase.Targets{}
2111-
if cf.targets.Size > 0 {
2112-
if found, err := cf.targets.EachHavingTableID(tableID, func(target changefeedbase.Target) error {
2113-
targets.Add(target)
2114-
return nil
2115-
}); err != nil {
2116-
return changefeedbase.Targets{}, err
2117-
} else if !found {
2118-
return changefeedbase.Targets{}, errors.AssertionFailedf("attempted to create a per-table PTS record for table %d, but no target was found", tableID)
2119-
}
2114+
if found, err := cf.targets.EachHavingTableID(tableID, func(target changefeedbase.Target) error {
2115+
targets.Add(target)
2116+
return nil
2117+
}); err != nil {
2118+
return changefeedbase.Targets{}, err
2119+
} else if !found {
2120+
return changefeedbase.Targets{}, errors.AssertionFailedf(
2121+
"attempted to create a per-table PTS record for table %d, but no target was found",
2122+
tableID,
2123+
)
21202124
}
21212125
if targets.Size != 1 {
21222126
return changefeedbase.Targets{}, errors.AssertionFailedf("expected 1 target, got %d", targets.Size)

0 commit comments

Comments
 (0)