Skip to content

Commit 021bb15

Browse files
committed
changefeedccl: protect lagging tables with its own timestamp
Before this change, changefeeds watching multiple tables would protect all watched tables and system tables with a single protected timestamp (pts) record. All tables would be protected back to the highwater for the changefeed. In the event where one table lags behind the others, this would result in preventing more garbage collection than needed for many tables. This would become more of an issue when we introduce DB-level feeds because they will be implemented with multi-table feeds. We now protect lagging tables separately. We use per table highwaters to identify tables who are lagging behing the most advanced table by more than the new changefeed.protect_timestamp_bucketing_interval cluster setting. All tables at least that far behind will be protected by their own pts record. Epic: CRDB-1421 Fixes: #147785 Release note (performance): Multi-table changefeeds will protect each lagging user table from garbage collection with its own record. This will improve performance of multi-table changefeeds including upcoming db-level changefeeds by allowing more garbage collection.
1 parent e58fa7d commit 021bb15

File tree

11 files changed

+575
-89
lines changed

11 files changed

+575
-89
lines changed

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,14 @@ func makePlan(
459459
// Create progress config based on current settings.
460460
var progressConfig *execinfrapb.ChangefeedProgressConfig
461461
if execCtx.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V25_4) {
462+
perTableTrackingEnabled := changefeedbase.TrackPerTableProgress.Get(sv)
463+
perTableProtectedTimestampsEnabled := changefeedbase.PerTableProtectedTimestamps.Get(sv)
462464
progressConfig = &execinfrapb.ChangefeedProgressConfig{
463-
PerTableTracking: changefeedbase.TrackPerTableProgress.Get(sv),
465+
PerTableTracking: perTableTrackingEnabled,
466+
// If the per table pts flag was turned on between changefeed creation and now,
467+
// the per table pts records will be rewritten in the new format when the
468+
// highwater mark is updated in manageProtectedTimestamps.
469+
PerTableProtectedTimestamps: perTableTrackingEnabled && perTableProtectedTimestampsEnabled,
464470
}
465471
}
466472

pkg/ccl/changefeedccl/changefeed_job_info.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ const (
2323
)
2424

2525
const (
26-
// resolvedTablesFilename: ~changefeed/resolved_tables.binpb
27-
resolvedTablesFilename = jobInfoFilenamePrefix + "resolved_tables" + jobInfoFilenameExtension
26+
// resolvedTablesFilename: ~changefeed/resolved-tables.binpb
27+
resolvedTablesFilename = jobInfoFilenamePrefix + "resolved-tables" + jobInfoFilenameExtension
28+
// perTableProtectedTimestampsFilename: ~changefeed/per-table-protected-timestamps.binpb
29+
perTableProtectedTimestampsFilename = jobInfoFilenamePrefix + "per-table-protected-timestamps" + jobInfoFilenameExtension
2830
)
2931

3032
// writeChangefeedJobInfo writes a changefeed job info protobuf to the

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 189 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1898,7 +1898,6 @@ func (cf *changeFrontier) manageProtectedTimestamps(
18981898
defer sp.Finish()
18991899

19001900
ptsUpdateInterval := changefeedbase.ProtectTimestampInterval.Get(&cf.FlowCtx.Cfg.Settings.SV)
1901-
ptsUpdateLag := changefeedbase.ProtectTimestampLag.Get(&cf.FlowCtx.Cfg.Settings.SV)
19021901
if timeutil.Since(cf.lastProtectedTimestampUpdate) < ptsUpdateInterval {
19031902
return false, nil
19041903
}
@@ -1915,16 +1914,193 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19151914
}
19161915
}()
19171916

1917+
var ptsEntries changefeedpb.ProtectedTimestampRecords
1918+
if err := readChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, &ptsEntries, txn, cf.spec.JobID); err != nil {
1919+
return false, err
1920+
}
1921+
pts := cf.FlowCtx.Cfg.ProtectedTimestampProvider.WithTxn(txn)
1922+
1923+
highwater := func() hlc.Timestamp {
1924+
if cf.frontier.Frontier().Less(cf.highWaterAtStart) {
1925+
return cf.highWaterAtStart
1926+
}
1927+
return cf.frontier.Frontier()
1928+
}()
1929+
1930+
if cf.spec.ProgressConfig.PerTableProtectedTimestamps {
1931+
newPTS, updatedPerTablePTS, err := cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater)
1932+
if err != nil {
1933+
return false, err
1934+
}
1935+
updatedMainPTS, err := cf.advanceProtectedTimestamp(ctx, progress, pts, newPTS)
1936+
if err != nil {
1937+
return false, err
1938+
}
1939+
return updatedMainPTS || updatedPerTablePTS, nil
1940+
}
1941+
1942+
return cf.advanceProtectedTimestamp(ctx, progress, pts, highwater)
1943+
}
1944+
1945+
func (cf *changeFrontier) managePerTableProtectedTimestamps(
1946+
ctx context.Context,
1947+
txn isql.Txn,
1948+
ptsEntries *changefeedpb.ProtectedTimestampRecords,
1949+
highwater hlc.Timestamp,
1950+
) (newPTS hlc.Timestamp, updatedPerTablePTS bool, err error) {
1951+
var leastLaggingTimestamp hlc.Timestamp
1952+
for _, frontier := range cf.frontier.Frontiers() {
1953+
if frontier.Frontier().After(leastLaggingTimestamp) {
1954+
leastLaggingTimestamp = frontier.Frontier()
1955+
}
1956+
}
1957+
1958+
newPTS = func() hlc.Timestamp {
1959+
lagDuration := changefeedbase.ProtectTimestampBucketingInterval.Get(&cf.FlowCtx.Cfg.Settings.SV)
1960+
ptsLagCutoff := leastLaggingTimestamp.AddDuration(-lagDuration)
1961+
// If we are within the bucketing interval of having started the changefeed,
1962+
// we use the highwater as the PTS timestamp so as not to try to protect
1963+
// tables before the changefeed started.
1964+
if ptsLagCutoff.Less(highwater) {
1965+
return highwater
1966+
}
1967+
return ptsLagCutoff
1968+
}()
1969+
19181970
pts := cf.FlowCtx.Cfg.ProtectedTimestampProvider.WithTxn(txn)
1971+
tableIDsToRelease := make([]descpb.ID, 0)
1972+
for tableID, frontier := range cf.frontier.Frontiers() {
1973+
tableHighWater := func() hlc.Timestamp {
1974+
// If this table has not yet finished its initial scan, we use the highwater
1975+
// which is guaranteed to be at least the changefeed's creation time.
1976+
if frontier.Frontier().Less(highwater) {
1977+
return highwater
1978+
}
1979+
return frontier.Frontier()
1980+
}()
1981+
1982+
isLagging := tableHighWater.Less(newPTS)
1983+
1984+
if cf.knobs.IsTableLagging != nil && cf.knobs.IsTableLagging(tableID) {
1985+
isLagging = true
1986+
}
1987+
1988+
if !isLagging {
1989+
if ptsEntries.ProtectedTimestampRecords[tableID] != nil {
1990+
tableIDsToRelease = append(tableIDsToRelease, tableID)
1991+
}
1992+
continue
1993+
}
19191994

1920-
// Create / advance the protected timestamp record to the highwater mark
1921-
highWater := cf.frontier.Frontier()
1922-
if highWater.Less(cf.highWaterAtStart) {
1923-
highWater = cf.highWaterAtStart
1995+
if ptsEntries.ProtectedTimestampRecords[tableID] != nil {
1996+
if updated, err := cf.advancePerTableProtectedTimestampRecord(ctx, ptsEntries, tableID, tableHighWater, pts); err != nil {
1997+
return hlc.Timestamp{}, false, err
1998+
} else if updated {
1999+
updatedPerTablePTS = true
2000+
}
2001+
} else {
2002+
// TODO(#152448): Do not include system table protections in these records.
2003+
if err := cf.createPerTableProtectedTimestampRecord(ctx, txn, ptsEntries, tableID, tableHighWater, pts); err != nil {
2004+
return hlc.Timestamp{}, false, err
2005+
}
2006+
updatedPerTablePTS = true
2007+
}
2008+
}
2009+
2010+
if len(tableIDsToRelease) > 0 {
2011+
if err := cf.releasePerTableProtectedTimestampRecords(ctx, txn, ptsEntries, tableIDsToRelease, pts); err != nil {
2012+
return hlc.Timestamp{}, false, err
2013+
}
2014+
updatedPerTablePTS = true
19242015
}
2016+
2017+
return newPTS, updatedPerTablePTS, nil
2018+
}
2019+
2020+
func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
2021+
ctx context.Context,
2022+
txn isql.Txn,
2023+
ptsEntries *changefeedpb.ProtectedTimestampRecords,
2024+
tableIDs []descpb.ID,
2025+
pts protectedts.Storage,
2026+
) error {
2027+
for _, tableID := range tableIDs {
2028+
if err := pts.Release(ctx, *ptsEntries.ProtectedTimestampRecords[tableID]); err != nil {
2029+
return err
2030+
}
2031+
delete(ptsEntries.ProtectedTimestampRecords, tableID)
2032+
}
2033+
return writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID)
2034+
}
2035+
2036+
func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
2037+
ctx context.Context,
2038+
ptsEntries *changefeedpb.ProtectedTimestampRecords,
2039+
tableID descpb.ID,
2040+
tableHighWater hlc.Timestamp,
2041+
pts protectedts.Storage,
2042+
) (updated bool, err error) {
2043+
rec, err := pts.GetRecord(ctx, *ptsEntries.ProtectedTimestampRecords[tableID])
2044+
if err != nil {
2045+
return false, err
2046+
}
2047+
2048+
ptsUpdateLag := changefeedbase.ProtectTimestampLag.Get(&cf.FlowCtx.Cfg.Settings.SV)
2049+
if rec.Timestamp.AddDuration(ptsUpdateLag).After(tableHighWater) {
2050+
return false, nil
2051+
}
2052+
2053+
if err := pts.UpdateTimestamp(ctx, *ptsEntries.ProtectedTimestampRecords[tableID], tableHighWater); err != nil {
2054+
return false, err
2055+
}
2056+
return true, nil
2057+
}
2058+
2059+
func (cf *changeFrontier) createPerTableProtectedTimestampRecord(
2060+
ctx context.Context,
2061+
txn isql.Txn,
2062+
ptsEntries *changefeedpb.ProtectedTimestampRecords,
2063+
tableID descpb.ID,
2064+
tableHighWater hlc.Timestamp,
2065+
pts protectedts.Storage,
2066+
) error {
2067+
// If the table is lagging and doesn't have a per table PTS record,
2068+
// we create a new one.
2069+
targets := changefeedbase.Targets{}
2070+
if cf.targets.Size > 0 {
2071+
err := cf.targets.EachTarget(func(target changefeedbase.Target) error {
2072+
if target.DescID == tableID {
2073+
targets.Add(target)
2074+
}
2075+
return nil
2076+
})
2077+
if err != nil {
2078+
return err
2079+
}
2080+
}
2081+
ptr := createProtectedTimestampRecord(
2082+
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, targets, tableHighWater,
2083+
)
2084+
if ptsEntries.ProtectedTimestampRecords == nil {
2085+
ptsEntries.ProtectedTimestampRecords = make(map[descpb.ID]*uuid.UUID)
2086+
}
2087+
uuid := ptr.ID.GetUUID()
2088+
ptsEntries.ProtectedTimestampRecords[tableID] = &uuid
2089+
if err := pts.Protect(ctx, ptr); err != nil {
2090+
return err
2091+
}
2092+
return writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID)
2093+
}
2094+
2095+
func (cf *changeFrontier) advanceProtectedTimestamp(
2096+
ctx context.Context,
2097+
progress *jobspb.ChangefeedProgress,
2098+
pts protectedts.Storage,
2099+
timestamp hlc.Timestamp,
2100+
) (updated bool, err error) {
19252101
if progress.ProtectedTimestampRecord == uuid.Nil {
19262102
ptr := createProtectedTimestampRecord(
1927-
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, cf.targets, highWater,
2103+
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, cf.targets, timestamp,
19282104
)
19292105
progress.ProtectedTimestampRecord = ptr.ID.GetUUID()
19302106
return true, pts.Protect(ctx, ptr)
@@ -1942,7 +2118,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19422118
if preserveDeprecatedPts := cf.knobs.PreserveDeprecatedPts != nil && cf.knobs.PreserveDeprecatedPts(); preserveDeprecatedPts {
19432119
return false, nil
19442120
}
1945-
if err := cf.remakePTSRecord(ctx, pts, progress, highWater); err != nil {
2121+
if err := cf.remakePTSRecord(ctx, pts, progress, timestamp); err != nil {
19462122
return false, err
19472123
}
19482124
return true, nil
@@ -1955,26 +2131,28 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19552131
if preservePTSTargets := cf.knobs.PreservePTSTargets != nil && cf.knobs.PreservePTSTargets(); preservePTSTargets {
19562132
return false, nil
19572133
}
1958-
if err := cf.remakePTSRecord(ctx, pts, progress, highWater); err != nil {
2134+
if err := cf.remakePTSRecord(ctx, pts, progress, timestamp); err != nil {
19592135
return false, err
19602136
}
2137+
log.VEventf(ctx, 2, "remade PTS record %v to include all targets", progress.ProtectedTimestampRecord)
19612138
return true, nil
19622139
}
19632140

2141+
ptsUpdateLag := changefeedbase.ProtectTimestampLag.Get(&cf.FlowCtx.Cfg.Settings.SV)
19642142
// Only update the PTS timestamp if it is lagging behind the high
19652143
// watermark. This is to prevent a rush of updates to the PTS if the
19662144
// changefeed restarts, which can cause contention and second order effects
19672145
// on system tables.
1968-
if !rec.Timestamp.AddDuration(ptsUpdateLag).Less(highWater) {
2146+
if rec.Timestamp.AddDuration(ptsUpdateLag).After(timestamp) {
19692147
return false, nil
19702148
}
19712149

19722150
if cf.knobs.ManagePTSError != nil {
19732151
return false, cf.knobs.ManagePTSError()
19742152
}
19752153

1976-
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, highWater)
1977-
return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, highWater)
2154+
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, timestamp)
2155+
return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, timestamp)
19782156
}
19792157

19802158
func (cf *changeFrontier) remakePTSRecord(

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12026,21 +12026,21 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1202612026

1202712027
registry := s.Server.JobRegistry().(*jobs.Registry)
1202812028
metrics := registry.MetricsStruct().Changefeed.(*Metrics)
12029-
createPtsCount, _ := metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
12030-
managePtsCount, _ := metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
12029+
createPTSCount, _ := metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
12030+
managePTSCount, _ := metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
1203112031
managePTSErrorCount, _ := metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
12032-
require.Equal(t, int64(0), createPtsCount)
12033-
require.Equal(t, int64(0), managePtsCount)
12032+
require.Equal(t, int64(0), createPTSCount)
12033+
require.Equal(t, int64(0), managePTSCount)
1203412034
require.Equal(t, int64(0), managePTSErrorCount)
1203512035

1203612036
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
1203712037
testFeed := feed(t, f, createStmt)
1203812038
defer closeFeed(t, testFeed)
1203912039

12040-
createPtsCount, _ = metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
12041-
managePtsCount, _ = metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
12042-
require.Equal(t, int64(1), createPtsCount)
12043-
require.Equal(t, int64(0), managePtsCount)
12040+
createPTSCount, _ = metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
12041+
managePTSCount, _ = metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
12042+
require.Equal(t, int64(1), createPTSCount)
12043+
require.Equal(t, int64(0), managePTSCount)
1204412044

1204512045
eFeed, ok := testFeed.(cdctest.EnterpriseTestFeed)
1204612046
require.True(t, ok)
@@ -12091,9 +12091,9 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1209112091
require.NoError(t, err)
1209212092
require.Less(t, ts, ts2)
1209312093

12094-
managePtsCount, _ = metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
12094+
managePTSCount, _ = metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
1209512095
managePTSErrorCount, _ = metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
12096-
require.GreaterOrEqual(t, managePtsCount, int64(2))
12096+
require.GreaterOrEqual(t, managePTSCount, int64(2))
1209712097
require.Equal(t, int64(0), managePTSErrorCount)
1209812098
}
1209912099

@@ -12108,6 +12108,9 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1210812108
cdcTest(t, testFn, feedTestForceSink("kafka"), withTxnRetries)
1210912109
}
1211012110

12111+
// TestChangefeedProtectedTimestampUpdateError tests that a changefeed that
12112+
// errors while managing its protected timestamp records will increment the
12113+
// manage PTS error counter.
1211112114
func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
1211212115
defer leaktest.AfterTest(t)()
1211312116
defer log.Scope(t).Close(t)
@@ -12128,11 +12131,11 @@ func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
1212812131

1212912132
registry := s.Server.JobRegistry().(*jobs.Registry)
1213012133
metrics := registry.MetricsStruct().Changefeed.(*Metrics)
12131-
createPtsCount, _ := metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
12132-
managePtsCount, _ := metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
12134+
createPTSCount, _ := metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
12135+
managePTSCount, _ := metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
1213312136
managePTSErrorCount, _ := metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
12134-
require.Equal(t, int64(0), createPtsCount)
12135-
require.Equal(t, int64(0), managePtsCount)
12137+
require.Equal(t, int64(0), createPTSCount)
12138+
require.Equal(t, int64(0), managePTSCount)
1213612139
require.Equal(t, int64(0), managePTSErrorCount)
1213712140

1213812141
knobs := s.TestingKnobs.
@@ -12147,8 +12150,8 @@ func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
1214712150
testFeed := feed(t, f, createStmt)
1214812151
defer closeFeed(t, testFeed)
1214912152

12150-
createPtsCount, _ = metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
12151-
require.Equal(t, int64(1), createPtsCount)
12153+
createPTSCount, _ = metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
12154+
require.Equal(t, int64(1), createPTSCount)
1215212155
managePTSErrorCount, _ = metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
1215312156
require.Equal(t, int64(0), managePTSErrorCount)
1215412157

@@ -12159,7 +12162,6 @@ func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
1215912162
testutils.SucceedsSoon(t, func() error {
1216012163
managePTSErrorCount, _ = metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
1216112164
if managePTSErrorCount > 0 {
12162-
fmt.Println("manage protected timestamps test: manage pts error count", managePTSErrorCount)
1216312165
return nil
1216412166
}
1216512167
return errors.New("waiting for manage pts error")

pkg/ccl/changefeedccl/changefeedbase/settings.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,16 @@ var ProtectTimestampInterval = settings.RegisterDurationSetting(
207207
settings.PositiveDuration,
208208
settings.WithPublic)
209209

210+
// ProtectTimestampBucketingInterval controls the amount a table is allowed to lag behind the
211+
// most advanced table before a per-table protected timestamp record is created
212+
var ProtectTimestampBucketingInterval = settings.RegisterDurationSetting(
213+
settings.ApplicationLevel,
214+
"changefeed.protect_timestamp_bucketing_interval",
215+
"controls the amount a table is allowed to lag behind the most advanced table before a per-table protected timestamp record is created; "+
216+
"only used when changefeed.protected_timestamp.per_table.enabled is true",
217+
2*time.Minute,
218+
settings.PositiveDuration)
219+
210220
// ProtectTimestampLag controls how much the protected timestamp record should lag behind the high watermark
211221
var ProtectTimestampLag = settings.RegisterDurationSetting(
212222
settings.ApplicationLevel,
@@ -215,6 +225,15 @@ var ProtectTimestampLag = settings.RegisterDurationSetting(
215225
10*time.Minute,
216226
settings.PositiveDuration)
217227

228+
// PerTableProtectedTimestamps enables per-table protected timestamp records
229+
// instead of a single record for all tables in a changefeed.
230+
var PerTableProtectedTimestamps = settings.RegisterBoolSetting(
231+
settings.ApplicationLevel,
232+
"changefeed.protected_timestamp.per_table.enabled",
233+
"if true, creates separate protected timestamp records for each table in a changefeed; "+
234+
"if false, uses a single protected timestamp record for all tables",
235+
metamorphic.ConstantWithTestBool("changefeed.protected_timestamp.per_table.enabled", true))
236+
218237
// MaxProtectedTimestampAge controls the frequency of protected timestamp record updates
219238
var MaxProtectedTimestampAge = settings.RegisterDurationSetting(
220239
settings.ApplicationLevel,

0 commit comments

Comments
 (0)