Skip to content

Commit 703e97c

Browse files
committed
changefeedccl: remove shared PTS record for non-lagging tables
Previously, per-table PTS changefeeds used a single protected timestamp record for all non-lagging tables, while assigning individual records only to lagging ones. This commit removes that shared record; instead, all tables now receive their own per-table PTS record at startup, each updated using the table's highwater. Epic: CRDB-1421 Release note: None
1 parent a72af2c commit 703e97c

File tree

9 files changed

+294
-223
lines changed

9 files changed

+294
-223
lines changed

pkg/ccl/changefeedccl/cdcprogresspb/progress.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import "gogoproto/gogo.proto";
1313
message ProtectedTimestampRecords {
1414
map<uint32, bytes> protected_timestamp_records = 1 [
1515
(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID",
16-
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"
16+
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID",
17+
(gogoproto.nullable) = false
1718
];
1819
}

pkg/ccl/changefeedccl/changefeed_job_info.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,20 @@ func writeChangefeedJobInfo(
4545
return jobs.WriteChunkedFileToJobInfo(ctx, filename, data, txn, jobID)
4646
}
4747

48+
// deleteChangefeedJobInfo deletes a changefeed job info protobuf from the
49+
// job_info table. A changefeed-specific filename is required.
50+
func deleteChangefeedJobInfo(
51+
ctx context.Context, filename string, txn isql.Txn, jobID jobspb.JobID,
52+
) error {
53+
if !strings.HasPrefix(filename, jobInfoFilenamePrefix) {
54+
return errors.AssertionFailedf("filename %q must start with prefix %q", filename, jobInfoFilenamePrefix)
55+
}
56+
if !strings.HasSuffix(filename, jobInfoFilenameExtension) {
57+
return errors.AssertionFailedf("filename %q must end with extension %q", filename, jobInfoFilenameExtension)
58+
}
59+
return jobs.WriteChunkedFileToJobInfo(ctx, filename, nil, txn, jobID)
60+
}
61+
4862
// readChangefeedJobInfo reads a changefeed job info protobuf from the
4963
// job_info table. A changefeed-specific filename is required.
5064
func readChangefeedJobInfo(

pkg/ccl/changefeedccl/changefeed_job_info_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ func TestChangefeedJobInfoRoundTrip(t *testing.T) {
3535
uuid1 := uuid.MakeV4()
3636
uuid2 := uuid.MakeV4()
3737
ptsRecords := cdcprogresspb.ProtectedTimestampRecords{
38-
ProtectedTimestampRecords: map[descpb.ID]*uuid.UUID{
39-
descpb.ID(100): &uuid1,
40-
descpb.ID(200): &uuid2,
38+
ProtectedTimestampRecords: map[descpb.ID]uuid.UUID{
39+
descpb.ID(100): uuid1,
40+
descpb.ID(200): uuid2,
4141
},
4242
}
4343

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 23 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1957,6 +1957,10 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19571957
}
19581958
}()
19591959

1960+
if cf.knobs.ManagePTSError != nil {
1961+
return false, cf.knobs.ManagePTSError()
1962+
}
1963+
19601964
var ptsEntries cdcprogresspb.ProtectedTimestampRecords
19611965
if err := readChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, &ptsEntries, txn, cf.spec.JobID); err != nil {
19621966
return false, err
@@ -1971,15 +1975,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19711975
}()
19721976

19731977
if cf.spec.ProgressConfig.PerTableProtectedTimestamps {
1974-
newPTS, updatedPerTablePTS, err := cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater)
1975-
if err != nil {
1976-
return false, err
1977-
}
1978-
updatedMainPTS, err := cf.advanceProtectedTimestamp(ctx, progress, pts, newPTS)
1979-
if err != nil {
1980-
return false, err
1981-
}
1982-
return updatedMainPTS || updatedPerTablePTS, nil
1978+
return cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater)
19831979
}
19841980

19851981
return cf.advanceProtectedTimestamp(ctx, progress, pts, highwater)
@@ -1990,28 +1986,8 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
19901986
txn isql.Txn,
19911987
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
19921988
highwater hlc.Timestamp,
1993-
) (newPTS hlc.Timestamp, updatedPerTablePTS bool, err error) {
1994-
var leastLaggingTimestamp hlc.Timestamp
1995-
for _, frontier := range cf.frontier.Frontiers() {
1996-
if frontier.Frontier().After(leastLaggingTimestamp) {
1997-
leastLaggingTimestamp = frontier.Frontier()
1998-
}
1999-
}
2000-
2001-
newPTS = func() hlc.Timestamp {
2002-
lagDuration := changefeedbase.ProtectTimestampBucketingInterval.Get(&cf.FlowCtx.Cfg.Settings.SV)
2003-
ptsLagCutoff := leastLaggingTimestamp.AddDuration(-lagDuration)
2004-
// If we are within the bucketing interval of having started the changefeed,
2005-
// we use the highwater as the PTS timestamp so as not to try to protect
2006-
// tables before the changefeed started.
2007-
if ptsLagCutoff.Less(highwater) {
2008-
return highwater
2009-
}
2010-
return ptsLagCutoff
2011-
}()
2012-
1989+
) (updatedPerTablePTS bool, err error) {
20131990
pts := cf.FlowCtx.Cfg.ProtectedTimestampProvider.WithTxn(txn)
2014-
tableIDsToRelease := make([]descpb.ID, 0)
20151991
tableIDsToCreate := make(map[descpb.ID]hlc.Timestamp)
20161992
for tableID, frontier := range cf.frontier.Frontiers() {
20171993
tableHighWater := func() hlc.Timestamp {
@@ -2023,66 +1999,35 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20231999
return frontier.Frontier()
20242000
}()
20252001

2026-
isLagging := tableHighWater.Less(newPTS)
2027-
2028-
if cf.knobs.IsTableLagging != nil && cf.knobs.IsTableLagging(tableID) {
2029-
isLagging = true
2030-
}
2031-
2032-
if !isLagging {
2033-
if ptsEntries.ProtectedTimestampRecords[tableID] != nil {
2034-
tableIDsToRelease = append(tableIDsToRelease, tableID)
2035-
}
2036-
continue
2037-
}
2038-
2039-
if ptsEntries.ProtectedTimestampRecords[tableID] != nil {
2002+
if ptsEntries.ProtectedTimestampRecords[tableID] != uuid.Nil {
20402003
if updated, err := cf.advancePerTableProtectedTimestampRecord(ctx, ptsEntries, tableID, tableHighWater, pts); err != nil {
2041-
return hlc.Timestamp{}, false, err
2004+
return false, err
20422005
} else if updated {
20432006
updatedPerTablePTS = true
20442007
}
20452008
} else {
20462009
// TODO(#152448): Do not include system table protections in these records.
2010+
// TODO(#153894): Newly added/dropped tables should be caught and
2011+
// protected when starting the frontier, not here.
20472012
tableIDsToCreate[tableID] = tableHighWater
20482013
}
20492014
}
20502015

2051-
if len(tableIDsToRelease) > 0 {
2052-
if err := cf.releasePerTableProtectedTimestampRecords(ctx, ptsEntries, tableIDsToRelease, pts); err != nil {
2053-
return hlc.Timestamp{}, false, err
2054-
}
2055-
}
2056-
20572016
if len(tableIDsToCreate) > 0 {
2058-
if err := cf.createPerTableProtectedTimestampRecords(ctx, ptsEntries, tableIDsToCreate, pts); err != nil {
2059-
return hlc.Timestamp{}, false, err
2017+
if err := cf.createPerTableProtectedTimestampRecords(
2018+
ctx, ptsEntries, tableIDsToCreate, pts,
2019+
); err != nil {
2020+
return false, err
20602021
}
2061-
}
2062-
2063-
if len(tableIDsToRelease) > 0 || len(tableIDsToCreate) > 0 {
2064-
if err := writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID); err != nil {
2065-
return hlc.Timestamp{}, false, err
2022+
if err := writeChangefeedJobInfo(
2023+
ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID,
2024+
); err != nil {
2025+
return false, err
20662026
}
20672027
updatedPerTablePTS = true
20682028
}
20692029

2070-
return newPTS, updatedPerTablePTS, nil
2071-
}
2072-
2073-
func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
2074-
ctx context.Context,
2075-
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
2076-
tableIDs []descpb.ID,
2077-
pts protectedts.Storage,
2078-
) error {
2079-
for _, tableID := range tableIDs {
2080-
if err := pts.Release(ctx, *ptsEntries.ProtectedTimestampRecords[tableID]); err != nil {
2081-
return err
2082-
}
2083-
delete(ptsEntries.ProtectedTimestampRecords, tableID)
2084-
}
2085-
return nil
2030+
return updatedPerTablePTS, nil
20862031
}
20872032

20882033
func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
@@ -2092,7 +2037,7 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20922037
tableHighWater hlc.Timestamp,
20932038
pts protectedts.Storage,
20942039
) (updated bool, err error) {
2095-
rec, err := pts.GetRecord(ctx, *ptsEntries.ProtectedTimestampRecords[tableID])
2040+
rec, err := pts.GetRecord(ctx, ptsEntries.ProtectedTimestampRecords[tableID])
20962041
if err != nil {
20972042
return false, err
20982043
}
@@ -2102,7 +2047,7 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
21022047
return false, nil
21032048
}
21042049

2105-
if err := pts.UpdateTimestamp(ctx, *ptsEntries.ProtectedTimestampRecords[tableID], tableHighWater); err != nil {
2050+
if err := pts.UpdateTimestamp(ctx, ptsEntries.ProtectedTimestampRecords[tableID], tableHighWater); err != nil {
21062051
return false, err
21072052
}
21082053
return true, nil
@@ -2115,7 +2060,7 @@ func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
21152060
pts protectedts.Storage,
21162061
) error {
21172062
if ptsEntries.ProtectedTimestampRecords == nil {
2118-
ptsEntries.ProtectedTimestampRecords = make(map[descpb.ID]*uuid.UUID)
2063+
ptsEntries.ProtectedTimestampRecords = make(map[descpb.ID]uuid.UUID)
21192064
}
21202065
for tableID, tableHighWater := range tableIDsToCreate {
21212066
targets, err := cf.createPerTablePTSTargets(tableID)
@@ -2126,7 +2071,7 @@ func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
21262071
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, targets, tableHighWater,
21272072
)
21282073
uuid := ptr.ID.GetUUID()
2129-
ptsEntries.ProtectedTimestampRecords[tableID] = &uuid
2074+
ptsEntries.ProtectedTimestampRecords[tableID] = uuid
21302075
if err := pts.Protect(ctx, ptr); err != nil {
21312076
return err
21322077
}
@@ -2210,10 +2155,6 @@ func (cf *changeFrontier) advanceProtectedTimestamp(
22102155
return false, nil
22112156
}
22122157

2213-
if cf.knobs.ManagePTSError != nil {
2214-
return false, cf.knobs.ManagePTSError()
2215-
}
2216-
22172158
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, timestamp)
22182159
return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, timestamp)
22192160
}

0 commit comments

Comments
 (0)