Skip to content

Commit b495ecb

Browse files
craig[bot]aerfreispilchen
committed
153421: changefeedccl: remove shared PTS record for non-lagging tables r=log-head,andyyang890 a=aerfrei Previously, per-table PTS changefeeds used a single protected timestamp record for all non-lagging tables, while assigning individual records only to lagging ones. This commit removes that shared record; instead, all tables now receive their own per-table PTS record at startup, each updated using the table's highwater. Fixes #153681 Epic: [CRDB-1421](https://cockroachlabs.atlassian.net/browse/CRDB-1421) Release note: None 154135: sql/inspect: skip INSPECT checks for non-applicable spans earlier r=spilchen a=spilchen INSPECT jobs generate a set of checks along with a separate list of spans, which only apply to some of the checks. Previously, filtering to ensure a span applied to a check happened late in the process. This commit moves that filtering earlier, so checks immediately skip spans that are not relevant. This improves efficiency and prepares the system for upcoming progress tracking logic, which will be added in a separate PR. Informs: #148297 Epic: CRDB-30356 Release note: none Co-authored-by: Aerin Freilich <[email protected]> Co-authored-by: Matt Spilchen <[email protected]>
3 parents 83c038f + 887bdfb + 90ceff4 commit b495ecb

15 files changed

+386
-235
lines changed

pkg/ccl/changefeedccl/cdcprogresspb/progress.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import "gogoproto/gogo.proto";
1313
message ProtectedTimestampRecords {
1414
map<uint32, bytes> protected_timestamp_records = 1 [
1515
(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID",
16-
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"
16+
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID",
17+
(gogoproto.nullable) = false
1718
];
1819
}

pkg/ccl/changefeedccl/changefeed_job_info.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,20 @@ func writeChangefeedJobInfo(
4545
return jobs.WriteChunkedFileToJobInfo(ctx, filename, data, txn, jobID)
4646
}
4747

48+
// deleteChangefeedJobInfo deletes a changefeed job info protobuf from the
49+
// job_info table. A changefeed-specific filename is required.
50+
func deleteChangefeedJobInfo(
51+
ctx context.Context, filename string, txn isql.Txn, jobID jobspb.JobID,
52+
) error {
53+
if !strings.HasPrefix(filename, jobInfoFilenamePrefix) {
54+
return errors.AssertionFailedf("filename %q must start with prefix %q", filename, jobInfoFilenamePrefix)
55+
}
56+
if !strings.HasSuffix(filename, jobInfoFilenameExtension) {
57+
return errors.AssertionFailedf("filename %q must end with extension %q", filename, jobInfoFilenameExtension)
58+
}
59+
return jobs.WriteChunkedFileToJobInfo(ctx, filename, nil, txn, jobID)
60+
}
61+
4862
// readChangefeedJobInfo reads a changefeed job info protobuf from the
4963
// job_info table. A changefeed-specific filename is required.
5064
func readChangefeedJobInfo(

pkg/ccl/changefeedccl/changefeed_job_info_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ func TestChangefeedJobInfoRoundTrip(t *testing.T) {
3535
uuid1 := uuid.MakeV4()
3636
uuid2 := uuid.MakeV4()
3737
ptsRecords := cdcprogresspb.ProtectedTimestampRecords{
38-
ProtectedTimestampRecords: map[descpb.ID]*uuid.UUID{
39-
descpb.ID(100): &uuid1,
40-
descpb.ID(200): &uuid2,
38+
ProtectedTimestampRecords: map[descpb.ID]uuid.UUID{
39+
descpb.ID(100): uuid1,
40+
descpb.ID(200): uuid2,
4141
},
4242
}
4343

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 23 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1957,6 +1957,10 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19571957
}
19581958
}()
19591959

1960+
if cf.knobs.ManagePTSError != nil {
1961+
return false, cf.knobs.ManagePTSError()
1962+
}
1963+
19601964
var ptsEntries cdcprogresspb.ProtectedTimestampRecords
19611965
if err := readChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, &ptsEntries, txn, cf.spec.JobID); err != nil {
19621966
return false, err
@@ -1971,15 +1975,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19711975
}()
19721976

19731977
if cf.spec.ProgressConfig.PerTableProtectedTimestamps {
1974-
newPTS, updatedPerTablePTS, err := cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater)
1975-
if err != nil {
1976-
return false, err
1977-
}
1978-
updatedMainPTS, err := cf.advanceProtectedTimestamp(ctx, progress, pts, newPTS)
1979-
if err != nil {
1980-
return false, err
1981-
}
1982-
return updatedMainPTS || updatedPerTablePTS, nil
1978+
return cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater)
19831979
}
19841980

19851981
return cf.advanceProtectedTimestamp(ctx, progress, pts, highwater)
@@ -1990,28 +1986,8 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
19901986
txn isql.Txn,
19911987
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
19921988
highwater hlc.Timestamp,
1993-
) (newPTS hlc.Timestamp, updatedPerTablePTS bool, err error) {
1994-
var leastLaggingTimestamp hlc.Timestamp
1995-
for _, frontier := range cf.frontier.Frontiers() {
1996-
if frontier.Frontier().After(leastLaggingTimestamp) {
1997-
leastLaggingTimestamp = frontier.Frontier()
1998-
}
1999-
}
2000-
2001-
newPTS = func() hlc.Timestamp {
2002-
lagDuration := changefeedbase.ProtectTimestampBucketingInterval.Get(&cf.FlowCtx.Cfg.Settings.SV)
2003-
ptsLagCutoff := leastLaggingTimestamp.AddDuration(-lagDuration)
2004-
// If we are within the bucketing interval of having started the changefeed,
2005-
// we use the highwater as the PTS timestamp so as not to try to protect
2006-
// tables before the changefeed started.
2007-
if ptsLagCutoff.Less(highwater) {
2008-
return highwater
2009-
}
2010-
return ptsLagCutoff
2011-
}()
2012-
1989+
) (updatedPerTablePTS bool, err error) {
20131990
pts := cf.FlowCtx.Cfg.ProtectedTimestampProvider.WithTxn(txn)
2014-
tableIDsToRelease := make([]descpb.ID, 0)
20151991
tableIDsToCreate := make(map[descpb.ID]hlc.Timestamp)
20161992
for tableID, frontier := range cf.frontier.Frontiers() {
20171993
tableHighWater := func() hlc.Timestamp {
@@ -2023,66 +1999,35 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20231999
return frontier.Frontier()
20242000
}()
20252001

2026-
isLagging := tableHighWater.Less(newPTS)
2027-
2028-
if cf.knobs.IsTableLagging != nil && cf.knobs.IsTableLagging(tableID) {
2029-
isLagging = true
2030-
}
2031-
2032-
if !isLagging {
2033-
if ptsEntries.ProtectedTimestampRecords[tableID] != nil {
2034-
tableIDsToRelease = append(tableIDsToRelease, tableID)
2035-
}
2036-
continue
2037-
}
2038-
2039-
if ptsEntries.ProtectedTimestampRecords[tableID] != nil {
2002+
if ptsEntries.ProtectedTimestampRecords[tableID] != uuid.Nil {
20402003
if updated, err := cf.advancePerTableProtectedTimestampRecord(ctx, ptsEntries, tableID, tableHighWater, pts); err != nil {
2041-
return hlc.Timestamp{}, false, err
2004+
return false, err
20422005
} else if updated {
20432006
updatedPerTablePTS = true
20442007
}
20452008
} else {
20462009
// TODO(#152448): Do not include system table protections in these records.
2010+
// TODO(#153894): Newly added/dropped tables should be caught and
2011+
// protected when starting the frontier, not here.
20472012
tableIDsToCreate[tableID] = tableHighWater
20482013
}
20492014
}
20502015

2051-
if len(tableIDsToRelease) > 0 {
2052-
if err := cf.releasePerTableProtectedTimestampRecords(ctx, ptsEntries, tableIDsToRelease, pts); err != nil {
2053-
return hlc.Timestamp{}, false, err
2054-
}
2055-
}
2056-
20572016
if len(tableIDsToCreate) > 0 {
2058-
if err := cf.createPerTableProtectedTimestampRecords(ctx, ptsEntries, tableIDsToCreate, pts); err != nil {
2059-
return hlc.Timestamp{}, false, err
2017+
if err := cf.createPerTableProtectedTimestampRecords(
2018+
ctx, ptsEntries, tableIDsToCreate, pts,
2019+
); err != nil {
2020+
return false, err
20602021
}
2061-
}
2062-
2063-
if len(tableIDsToRelease) > 0 || len(tableIDsToCreate) > 0 {
2064-
if err := writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID); err != nil {
2065-
return hlc.Timestamp{}, false, err
2022+
if err := writeChangefeedJobInfo(
2023+
ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID,
2024+
); err != nil {
2025+
return false, err
20662026
}
20672027
updatedPerTablePTS = true
20682028
}
20692029

2070-
return newPTS, updatedPerTablePTS, nil
2071-
}
2072-
2073-
func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
2074-
ctx context.Context,
2075-
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
2076-
tableIDs []descpb.ID,
2077-
pts protectedts.Storage,
2078-
) error {
2079-
for _, tableID := range tableIDs {
2080-
if err := pts.Release(ctx, *ptsEntries.ProtectedTimestampRecords[tableID]); err != nil {
2081-
return err
2082-
}
2083-
delete(ptsEntries.ProtectedTimestampRecords, tableID)
2084-
}
2085-
return nil
2030+
return updatedPerTablePTS, nil
20862031
}
20872032

20882033
func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
@@ -2092,7 +2037,7 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20922037
tableHighWater hlc.Timestamp,
20932038
pts protectedts.Storage,
20942039
) (updated bool, err error) {
2095-
rec, err := pts.GetRecord(ctx, *ptsEntries.ProtectedTimestampRecords[tableID])
2040+
rec, err := pts.GetRecord(ctx, ptsEntries.ProtectedTimestampRecords[tableID])
20962041
if err != nil {
20972042
return false, err
20982043
}
@@ -2102,7 +2047,7 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
21022047
return false, nil
21032048
}
21042049

2105-
if err := pts.UpdateTimestamp(ctx, *ptsEntries.ProtectedTimestampRecords[tableID], tableHighWater); err != nil {
2050+
if err := pts.UpdateTimestamp(ctx, ptsEntries.ProtectedTimestampRecords[tableID], tableHighWater); err != nil {
21062051
return false, err
21072052
}
21082053
return true, nil
@@ -2115,7 +2060,7 @@ func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
21152060
pts protectedts.Storage,
21162061
) error {
21172062
if ptsEntries.ProtectedTimestampRecords == nil {
2118-
ptsEntries.ProtectedTimestampRecords = make(map[descpb.ID]*uuid.UUID)
2063+
ptsEntries.ProtectedTimestampRecords = make(map[descpb.ID]uuid.UUID)
21192064
}
21202065
for tableID, tableHighWater := range tableIDsToCreate {
21212066
targets, err := cf.createPerTablePTSTargets(tableID)
@@ -2126,7 +2071,7 @@ func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
21262071
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, targets, tableHighWater,
21272072
)
21282073
uuid := ptr.ID.GetUUID()
2129-
ptsEntries.ProtectedTimestampRecords[tableID] = &uuid
2074+
ptsEntries.ProtectedTimestampRecords[tableID] = uuid
21302075
if err := pts.Protect(ctx, ptr); err != nil {
21312076
return err
21322077
}
@@ -2210,10 +2155,6 @@ func (cf *changeFrontier) advanceProtectedTimestamp(
22102155
return false, nil
22112156
}
22122157

2213-
if cf.knobs.ManagePTSError != nil {
2214-
return false, cf.knobs.ManagePTSError()
2215-
}
2216-
22172158
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, timestamp)
22182159
return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, timestamp)
22192160
}

0 commit comments

Comments
 (0)