@@ -1107,6 +1107,9 @@ type Manager struct {
1107
1107
1108
1108
// rangeFeed current range feed on system.descriptors.
1109
1109
rangeFeed * rangefeed.RangeFeed
1110
+
1111
+ // rangeFeedRestartInProgress tracks if a range feed restart is in progress.
1112
+ rangeFeedRestartInProgress bool
1110
1113
}
1111
1114
1112
1115
// closeTimeStamp for the range feed, which is the timestamp
@@ -1786,6 +1789,19 @@ func (m *Manager) GetSafeReplicationTS() hlc.Timestamp {
1786
1789
return m .closeTimestamp .Load ().(hlc.Timestamp )
1787
1790
}
1788
1791
1792
+ // closeRangeFeed closes the currently open range feed, which will involve
1793
+ // temporarily releasing the lease manager mutex.
1794
+ func (m * Manager ) closeRangeFeedLocked () {
1795
+ // We cannot terminate the range feed while holding the lease manager
1796
+ // lock, since there may be event handlers that need the lock that need to
1797
+ // drain.
1798
+ oldRangeFeed := m .mu .rangeFeed
1799
+ m .mu .rangeFeed = nil
1800
+ m .mu .Unlock () // nolint:deferunlockcheck
1801
+ oldRangeFeed .Close ()
1802
+ m .mu .Lock () // nolint:deferunlockcheck
1803
+ }
1804
+
1789
1805
// watchForUpdates will watch a rangefeed on the system.descriptor table for
1790
1806
// updates.
1791
1807
func (m * Manager ) watchForUpdates (ctx context.Context ) {
@@ -1847,14 +1863,12 @@ func (m *Manager) watchForUpdates(ctx context.Context) {
1847
1863
m .closeTimestamp .Store (checkpoint .ResolvedTS )
1848
1864
}
1849
1865
1850
- // If we already started a range feed terminate it first
1866
+ // Assert that the range feed is already terminated.
1851
1867
if m .mu .rangeFeed != nil {
1852
- m .mu .rangeFeed .Close ()
1853
- m .mu .rangeFeed = nil
1854
- if m .testingKnobs .RangeFeedResetChannel != nil {
1855
- close (m .testingKnobs .RangeFeedResetChannel )
1856
- m .testingKnobs .RangeFeedResetChannel = nil
1868
+ if buildutil .CrdbTestBuild {
1869
+ panic (errors .AssertionFailedf ("range feed was not closed before a restart attempt" ))
1857
1870
}
1871
+ log .Warningf (ctx , "range feed was not closed before a restart attempt" )
1858
1872
}
1859
1873
// Ignore errors here because they indicate that the server is shutting down.
1860
1874
// Also note that the range feed automatically shuts down when the server
@@ -2011,12 +2025,28 @@ func (m *Manager) handleRangeFeedError(ctx context.Context) {
2011
2025
}
2012
2026
2013
2027
func (m * Manager ) restartLeasingRangeFeedLocked (ctx context.Context ) {
2028
+ // If someone else is already starting a range feed then exit early.
2029
+ if m .mu .rangeFeedRestartInProgress {
2030
+ return
2031
+ }
2014
2032
log .Warning (ctx , "attempting restart of leasing range feed" )
2033
+ // We will temporarily release the lock closing the range feed,
2034
+ // in case things need to drain before termination. It is possible for
2035
+ // another restart to enter once we release the lock.
2036
+ m .mu .rangeFeedRestartInProgress = true
2037
+ if m .mu .rangeFeed != nil {
2038
+ m .closeRangeFeedLocked ()
2039
+ if m .testingKnobs .RangeFeedResetChannel != nil {
2040
+ close (m .testingKnobs .RangeFeedResetChannel )
2041
+ m .testingKnobs .RangeFeedResetChannel = nil
2042
+ }
2043
+ }
2015
2044
// Attempt a range feed restart if it has been down too long.
2016
2045
m .watchForUpdates (ctx )
2017
2046
// Track when the last restart occurred.
2018
2047
m .mu .rangeFeedIsUnavailableAt = timeutil .Now ()
2019
2048
m .mu .rangeFeedCheckpoints = 0
2049
+ m .mu .rangeFeedRestartInProgress = false
2020
2050
}
2021
2051
2022
2052
// cleanupExpiredSessionLeases expires session based leases marked for removal,
0 commit comments