Skip to content

Commit 72b992c

Browse files
committed
changefeedccl: fix race advancing frontier in schemafeed
In the schema feed, when in `updateTableHistory`, we check that the current frontier is less than the current time. However, since we release the mutex protecting frontier while validating table descriptors, it's possible for another routine to advance the frontier before the first routine tries to advance it. For example, another routine may call pauseOrResumePolling and pause polling at the same time it advances the frontier. As a consequence, it's possible for the first routine to assert fail due to the current frontier being greater than the current time when it tries to advance it. This change fixes this race by checking that the frontier is greater than the current time (endTS) again before trying to advance the frontier. If the frontier is less than or equal to the current time, the frontier does not need to be advanced and it returns. It's worth keeping the original check in, since it avoids the need to validate descriptors, and releasing the mutex also prevents the routine from holding it while validating. Epic: none Fixes: #148963 Release note (bug fix): Fixes a race condition when advancing a changefeed aggregator's frontier. When hit, the race condition could result in an internal error that would shut down the kvfeed and cause the changefeed to retry.
1 parent 59789f4 commit 72b992c

File tree

1 file changed

+18
-2
lines changed

1 file changed

+18
-2
lines changed

pkg/ccl/changefeedccl/schemafeed/schema_feed.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,16 @@ func (tf *schemaFeed) ingestDescriptors(
633633
return tf.adjustTimestamps(startTS, endTS, validateErr)
634634
}
635635

636+
// frontierAdvanceCheckEnabled controls whether the changefeed will
637+
// attempt to advance the frontier depending on the relation between the
638+
// last recorded frontier and the current time.
639+
var frontierAdvanceCheckEnabled = settings.RegisterBoolSetting(
640+
settings.ApplicationLevel,
641+
"changefeed.frontier_advance_check.enabled",
642+
"if true, attempts to advance the frontier only if the last recorded frontier is less than the current time",
643+
true,
644+
)
645+
636646
// adjustTimestamps adjusts the frontier or error timestamp appropriately.
637647
func (tf *schemaFeed) adjustTimestamps(startTS, endTS hlc.Timestamp, validateErr error) error {
638648
tf.mu.Lock()
@@ -644,10 +654,16 @@ func (tf *schemaFeed) adjustTimestamps(startTS, endTS hlc.Timestamp, validateErr
644654
}
645655
return validateErr
646656
}
647-
648-
if frontier := tf.mu.ts.frontier; frontier.Less(startTS) {
657+
frontier := tf.mu.ts.frontier
658+
if frontier.Less(startTS) {
649659
return errors.Errorf(`gap between %s and %s`, frontier, startTS)
650660
}
661+
// If the current frontier is greater than the endTS,
662+
// then we do not need to advance the frontier. In fact,
663+
// trying to advance the frontier could result in an error.
664+
if endTS.LessEq(frontier) && frontierAdvanceCheckEnabled.Get(&tf.settings.SV) {
665+
return nil
666+
}
651667
return tf.mu.ts.advanceFrontier(endTS)
652668
}
653669

0 commit comments

Comments
 (0)