Skip to content

Commit c1e847b

Browse files
craig[bot]aerfreialyshanjahani-crl
committed
149616: changefeedccl: protect lagging tables with its own timestamp r=andyyang890 a=aerfrei Before this change, changefeeds watching multiple tables would protect all watched tables and system tables with a single protected timestamp (pts) record. All tables would be protected back to the highwater for the changefeed. In the event where one table lags behind the others, this would result in preventing more garbage collection than needed for many tables. This would become more of an issue when we introduce DB-level feeds because they will be implemented with multi-table feeds. We now protect lagging tables separately. We use per table highwaters to identify tables who are lagging behing the most advanced table by more than the new changefeed.protect_timestamp_bucketing_interval cluster setting. All tables at least that far behind will be protected by their own pts record. Epic: CRDB-1421 Fixes: #147785 Release note (performance): Multi-table changefeeds will protect each lagging user table from garbage collection with its own record. This will improve performance of multi-table changefeeds including upcoming db-level changefeeds by allowing more garbage collection. 152190: contention: periodically log resolved contention events r=alyshanjahani-crl a=alyshanjahani-crl This commit adds logging to the flushAndResolve method in the contention event store. This method is called periodically at an interval defined by the (non-public) cluster setting sql.contention.event_store.resolution_interval. By logging aggregated contention event information, users now have the ability to find which waiting statements/txns are experiencing high contention through logs (previously this was only available via insights or SQL activity). Additionally, this aggregated event information contains the blocking txn and contended key - information that was previously only available via querying crdb_internal.transaction_contention_events. The logs are aggregated by a composite identifier of (waiting stmt, waiting txn, blocking txn, key), and the total contention time observed for that combination. They are structured logs. For example: ``` {"Timestamp":1756224167482848000,"EventType":"aggregated_contention_info", "WaitingStmtFingerprintId":"\\x000000000000007b", "WaitingTxnFingerprintId":"\\x00000000000001c8", "BlockingTxnFingerprintId":"\\x0000000000000315", "ContendedKey":"test-key-1","Duration":300000000} ``` Since this information is available via logs, it provides a solution for users who desire more persistence of contention event information. Part of: https://cockroachlabs.atlassian.net/browse/CRDB-53592 Release note (ops change): Adds logging of contention events. Co-authored-by: Aerin Freilich <[email protected]> Co-authored-by: Alyshan Jahani <[email protected]>
3 parents 3693763 + 021bb15 + 8de4c80 commit c1e847b

23 files changed

+942
-93
lines changed

docs/generated/eventlog.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,35 @@ is shut down inside the same process as the KV layer.
434434
| `InstanceID` | The ID of the server instance. | no |
435435
| `TenantName` | The name of the tenant at the time the event was emitted. | yes |
436436

437+
## Contention events
438+
439+
Aggregated information about contention events.
440+
441+
Events in this category are logged to the `SQL_EXEC` channel.
442+
443+
444+
### `aggregated_contention_info`
445+
446+
An event of type `aggregated_contention_info` is recorded periodically when contention events
447+
are resolved.
448+
449+
450+
| Field | Description | Sensitive |
451+
|--|--|--|
452+
| `WaitingStmtFingerprintId` | | no |
453+
| `WaitingTxnFingerprintId` | | no |
454+
| `BlockingTxnFingerprintId` | | no |
455+
| `ContendedKey` | | partially |
456+
| `Duration` | | no |
457+
458+
459+
#### Common fields
460+
461+
| Field | Description | Sensitive |
462+
|--|--|--|
463+
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
464+
| `EventType` | The type of the event. | no |
465+
437466
## Debugging events
438467

439468
Events in this category pertain to debugging operations performed by

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,14 @@ func makePlan(
459459
// Create progress config based on current settings.
460460
var progressConfig *execinfrapb.ChangefeedProgressConfig
461461
if execCtx.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V25_4) {
462+
perTableTrackingEnabled := changefeedbase.TrackPerTableProgress.Get(sv)
463+
perTableProtectedTimestampsEnabled := changefeedbase.PerTableProtectedTimestamps.Get(sv)
462464
progressConfig = &execinfrapb.ChangefeedProgressConfig{
463-
PerTableTracking: changefeedbase.TrackPerTableProgress.Get(sv),
465+
PerTableTracking: perTableTrackingEnabled,
466+
// If the per table pts flag was turned on between changefeed creation and now,
467+
// the per table pts records will be rewritten in the new format when the
468+
// highwater mark is updated in manageProtectedTimestamps.
469+
PerTableProtectedTimestamps: perTableTrackingEnabled && perTableProtectedTimestampsEnabled,
464470
}
465471
}
466472

pkg/ccl/changefeedccl/changefeed_job_info.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ const (
2323
)
2424

2525
const (
26-
// resolvedTablesFilename: ~changefeed/resolved_tables.binpb
27-
resolvedTablesFilename = jobInfoFilenamePrefix + "resolved_tables" + jobInfoFilenameExtension
26+
// resolvedTablesFilename: ~changefeed/resolved-tables.binpb
27+
resolvedTablesFilename = jobInfoFilenamePrefix + "resolved-tables" + jobInfoFilenameExtension
28+
// perTableProtectedTimestampsFilename: ~changefeed/per-table-protected-timestamps.binpb
29+
perTableProtectedTimestampsFilename = jobInfoFilenamePrefix + "per-table-protected-timestamps" + jobInfoFilenameExtension
2830
)
2931

3032
// writeChangefeedJobInfo writes a changefeed job info protobuf to the

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 189 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1914,7 +1914,6 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19141914
defer sp.Finish()
19151915

19161916
ptsUpdateInterval := changefeedbase.ProtectTimestampInterval.Get(&cf.FlowCtx.Cfg.Settings.SV)
1917-
ptsUpdateLag := changefeedbase.ProtectTimestampLag.Get(&cf.FlowCtx.Cfg.Settings.SV)
19181917
if timeutil.Since(cf.lastProtectedTimestampUpdate) < ptsUpdateInterval {
19191918
return false, nil
19201919
}
@@ -1931,16 +1930,193 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19311930
}
19321931
}()
19331932

1933+
var ptsEntries changefeedpb.ProtectedTimestampRecords
1934+
if err := readChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, &ptsEntries, txn, cf.spec.JobID); err != nil {
1935+
return false, err
1936+
}
1937+
pts := cf.FlowCtx.Cfg.ProtectedTimestampProvider.WithTxn(txn)
1938+
1939+
highwater := func() hlc.Timestamp {
1940+
if cf.frontier.Frontier().Less(cf.highWaterAtStart) {
1941+
return cf.highWaterAtStart
1942+
}
1943+
return cf.frontier.Frontier()
1944+
}()
1945+
1946+
if cf.spec.ProgressConfig.PerTableProtectedTimestamps {
1947+
newPTS, updatedPerTablePTS, err := cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater)
1948+
if err != nil {
1949+
return false, err
1950+
}
1951+
updatedMainPTS, err := cf.advanceProtectedTimestamp(ctx, progress, pts, newPTS)
1952+
if err != nil {
1953+
return false, err
1954+
}
1955+
return updatedMainPTS || updatedPerTablePTS, nil
1956+
}
1957+
1958+
return cf.advanceProtectedTimestamp(ctx, progress, pts, highwater)
1959+
}
1960+
1961+
func (cf *changeFrontier) managePerTableProtectedTimestamps(
1962+
ctx context.Context,
1963+
txn isql.Txn,
1964+
ptsEntries *changefeedpb.ProtectedTimestampRecords,
1965+
highwater hlc.Timestamp,
1966+
) (newPTS hlc.Timestamp, updatedPerTablePTS bool, err error) {
1967+
var leastLaggingTimestamp hlc.Timestamp
1968+
for _, frontier := range cf.frontier.Frontiers() {
1969+
if frontier.Frontier().After(leastLaggingTimestamp) {
1970+
leastLaggingTimestamp = frontier.Frontier()
1971+
}
1972+
}
1973+
1974+
newPTS = func() hlc.Timestamp {
1975+
lagDuration := changefeedbase.ProtectTimestampBucketingInterval.Get(&cf.FlowCtx.Cfg.Settings.SV)
1976+
ptsLagCutoff := leastLaggingTimestamp.AddDuration(-lagDuration)
1977+
// If we are within the bucketing interval of having started the changefeed,
1978+
// we use the highwater as the PTS timestamp so as not to try to protect
1979+
// tables before the changefeed started.
1980+
if ptsLagCutoff.Less(highwater) {
1981+
return highwater
1982+
}
1983+
return ptsLagCutoff
1984+
}()
1985+
19341986
pts := cf.FlowCtx.Cfg.ProtectedTimestampProvider.WithTxn(txn)
1987+
tableIDsToRelease := make([]descpb.ID, 0)
1988+
for tableID, frontier := range cf.frontier.Frontiers() {
1989+
tableHighWater := func() hlc.Timestamp {
1990+
// If this table has not yet finished its initial scan, we use the highwater
1991+
// which is guaranteed to be at least the changefeed's creation time.
1992+
if frontier.Frontier().Less(highwater) {
1993+
return highwater
1994+
}
1995+
return frontier.Frontier()
1996+
}()
1997+
1998+
isLagging := tableHighWater.Less(newPTS)
1999+
2000+
if cf.knobs.IsTableLagging != nil && cf.knobs.IsTableLagging(tableID) {
2001+
isLagging = true
2002+
}
2003+
2004+
if !isLagging {
2005+
if ptsEntries.ProtectedTimestampRecords[tableID] != nil {
2006+
tableIDsToRelease = append(tableIDsToRelease, tableID)
2007+
}
2008+
continue
2009+
}
19352010

1936-
// Create / advance the protected timestamp record to the highwater mark
1937-
highWater := cf.frontier.Frontier()
1938-
if highWater.Less(cf.highWaterAtStart) {
1939-
highWater = cf.highWaterAtStart
2011+
if ptsEntries.ProtectedTimestampRecords[tableID] != nil {
2012+
if updated, err := cf.advancePerTableProtectedTimestampRecord(ctx, ptsEntries, tableID, tableHighWater, pts); err != nil {
2013+
return hlc.Timestamp{}, false, err
2014+
} else if updated {
2015+
updatedPerTablePTS = true
2016+
}
2017+
} else {
2018+
// TODO(#152448): Do not include system table protections in these records.
2019+
if err := cf.createPerTableProtectedTimestampRecord(ctx, txn, ptsEntries, tableID, tableHighWater, pts); err != nil {
2020+
return hlc.Timestamp{}, false, err
2021+
}
2022+
updatedPerTablePTS = true
2023+
}
2024+
}
2025+
2026+
if len(tableIDsToRelease) > 0 {
2027+
if err := cf.releasePerTableProtectedTimestampRecords(ctx, txn, ptsEntries, tableIDsToRelease, pts); err != nil {
2028+
return hlc.Timestamp{}, false, err
2029+
}
2030+
updatedPerTablePTS = true
19402031
}
2032+
2033+
return newPTS, updatedPerTablePTS, nil
2034+
}
2035+
2036+
func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
2037+
ctx context.Context,
2038+
txn isql.Txn,
2039+
ptsEntries *changefeedpb.ProtectedTimestampRecords,
2040+
tableIDs []descpb.ID,
2041+
pts protectedts.Storage,
2042+
) error {
2043+
for _, tableID := range tableIDs {
2044+
if err := pts.Release(ctx, *ptsEntries.ProtectedTimestampRecords[tableID]); err != nil {
2045+
return err
2046+
}
2047+
delete(ptsEntries.ProtectedTimestampRecords, tableID)
2048+
}
2049+
return writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID)
2050+
}
2051+
2052+
func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
2053+
ctx context.Context,
2054+
ptsEntries *changefeedpb.ProtectedTimestampRecords,
2055+
tableID descpb.ID,
2056+
tableHighWater hlc.Timestamp,
2057+
pts protectedts.Storage,
2058+
) (updated bool, err error) {
2059+
rec, err := pts.GetRecord(ctx, *ptsEntries.ProtectedTimestampRecords[tableID])
2060+
if err != nil {
2061+
return false, err
2062+
}
2063+
2064+
ptsUpdateLag := changefeedbase.ProtectTimestampLag.Get(&cf.FlowCtx.Cfg.Settings.SV)
2065+
if rec.Timestamp.AddDuration(ptsUpdateLag).After(tableHighWater) {
2066+
return false, nil
2067+
}
2068+
2069+
if err := pts.UpdateTimestamp(ctx, *ptsEntries.ProtectedTimestampRecords[tableID], tableHighWater); err != nil {
2070+
return false, err
2071+
}
2072+
return true, nil
2073+
}
2074+
2075+
func (cf *changeFrontier) createPerTableProtectedTimestampRecord(
2076+
ctx context.Context,
2077+
txn isql.Txn,
2078+
ptsEntries *changefeedpb.ProtectedTimestampRecords,
2079+
tableID descpb.ID,
2080+
tableHighWater hlc.Timestamp,
2081+
pts protectedts.Storage,
2082+
) error {
2083+
// If the table is lagging and doesn't have a per table PTS record,
2084+
// we create a new one.
2085+
targets := changefeedbase.Targets{}
2086+
if cf.targets.Size > 0 {
2087+
err := cf.targets.EachTarget(func(target changefeedbase.Target) error {
2088+
if target.DescID == tableID {
2089+
targets.Add(target)
2090+
}
2091+
return nil
2092+
})
2093+
if err != nil {
2094+
return err
2095+
}
2096+
}
2097+
ptr := createProtectedTimestampRecord(
2098+
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, targets, tableHighWater,
2099+
)
2100+
if ptsEntries.ProtectedTimestampRecords == nil {
2101+
ptsEntries.ProtectedTimestampRecords = make(map[descpb.ID]*uuid.UUID)
2102+
}
2103+
uuid := ptr.ID.GetUUID()
2104+
ptsEntries.ProtectedTimestampRecords[tableID] = &uuid
2105+
if err := pts.Protect(ctx, ptr); err != nil {
2106+
return err
2107+
}
2108+
return writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID)
2109+
}
2110+
2111+
func (cf *changeFrontier) advanceProtectedTimestamp(
2112+
ctx context.Context,
2113+
progress *jobspb.ChangefeedProgress,
2114+
pts protectedts.Storage,
2115+
timestamp hlc.Timestamp,
2116+
) (updated bool, err error) {
19412117
if progress.ProtectedTimestampRecord == uuid.Nil {
19422118
ptr := createProtectedTimestampRecord(
1943-
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, cf.targets, highWater,
2119+
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, cf.targets, timestamp,
19442120
)
19452121
progress.ProtectedTimestampRecord = ptr.ID.GetUUID()
19462122
return true, pts.Protect(ctx, ptr)
@@ -1958,7 +2134,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19582134
if preserveDeprecatedPts := cf.knobs.PreserveDeprecatedPts != nil && cf.knobs.PreserveDeprecatedPts(); preserveDeprecatedPts {
19592135
return false, nil
19602136
}
1961-
if err := cf.remakePTSRecord(ctx, pts, progress, highWater); err != nil {
2137+
if err := cf.remakePTSRecord(ctx, pts, progress, timestamp); err != nil {
19622138
return false, err
19632139
}
19642140
return true, nil
@@ -1971,26 +2147,28 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19712147
if preservePTSTargets := cf.knobs.PreservePTSTargets != nil && cf.knobs.PreservePTSTargets(); preservePTSTargets {
19722148
return false, nil
19732149
}
1974-
if err := cf.remakePTSRecord(ctx, pts, progress, highWater); err != nil {
2150+
if err := cf.remakePTSRecord(ctx, pts, progress, timestamp); err != nil {
19752151
return false, err
19762152
}
2153+
log.VEventf(ctx, 2, "remade PTS record %v to include all targets", progress.ProtectedTimestampRecord)
19772154
return true, nil
19782155
}
19792156

2157+
ptsUpdateLag := changefeedbase.ProtectTimestampLag.Get(&cf.FlowCtx.Cfg.Settings.SV)
19802158
// Only update the PTS timestamp if it is lagging behind the high
19812159
// watermark. This is to prevent a rush of updates to the PTS if the
19822160
// changefeed restarts, which can cause contention and second order effects
19832161
// on system tables.
1984-
if !rec.Timestamp.AddDuration(ptsUpdateLag).Less(highWater) {
2162+
if rec.Timestamp.AddDuration(ptsUpdateLag).After(timestamp) {
19852163
return false, nil
19862164
}
19872165

19882166
if cf.knobs.ManagePTSError != nil {
19892167
return false, cf.knobs.ManagePTSError()
19902168
}
19912169

1992-
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, highWater)
1993-
return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, highWater)
2170+
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, timestamp)
2171+
return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, timestamp)
19942172
}
19952173

19962174
func (cf *changeFrontier) remakePTSRecord(

0 commit comments

Comments
 (0)