@@ -41,6 +41,7 @@ import (
41
41
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
42
42
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
43
43
kvstorage "github.com/cockroachdb/cockroach/pkg/storage"
44
+ "github.com/cockroachdb/cockroach/pkg/util/buildutil"
44
45
"github.com/cockroachdb/cockroach/pkg/util/hlc"
45
46
"github.com/cockroachdb/cockroach/pkg/util/log"
46
47
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
@@ -1086,6 +1087,9 @@ type Manager struct {
1086
1087
1087
1088
// rangeFeed current range feed on system.descriptors.
1088
1089
rangeFeed * rangefeed.RangeFeed
1090
+
1091
+ // rangeFeedRestartInProgress tracks if a range feed restart is in progress.
1092
+ rangeFeedRestartInProgress bool
1089
1093
}
1090
1094
1091
1095
// closeTimeStamp for the range feed, which is the timestamp
@@ -1712,6 +1716,19 @@ func (m *Manager) GetSafeReplicationTS() hlc.Timestamp {
1712
1716
return m .closeTimestamp .Load ().(hlc.Timestamp )
1713
1717
}
1714
1718
1719
+ // closeRangeFeed closes the currently open range feed, which will involve
1720
+ // temporarily releasing the lease manager mutex.
1721
+ func (m * Manager ) closeRangeFeedLocked () {
1722
+ // We cannot terminate the range feed while holding the lease manager
1723
+ // lock, since there may be event handlers that need the lock that need to
1724
+ // drain.
1725
+ oldRangeFeed := m .mu .rangeFeed
1726
+ m .mu .rangeFeed = nil
1727
+ m .mu .Unlock () // nolint:deferunlockcheck
1728
+ oldRangeFeed .Close ()
1729
+ m .mu .Lock () // nolint:deferunlockcheck
1730
+ }
1731
+
1715
1732
// watchForUpdates will watch a rangefeed on the system.descriptor table for
1716
1733
// updates.
1717
1734
func (m * Manager ) watchForUpdates (ctx context.Context ) {
@@ -1773,14 +1790,12 @@ func (m *Manager) watchForUpdates(ctx context.Context) {
1773
1790
m .closeTimestamp .Store (checkpoint .ResolvedTS )
1774
1791
}
1775
1792
1776
- // If we already started a range feed terminate it first
1793
+ // Assert that the range feed is already terminated.
1777
1794
if m .mu .rangeFeed != nil {
1778
- m .mu .rangeFeed .Close ()
1779
- m .mu .rangeFeed = nil
1780
- if m .testingKnobs .RangeFeedResetChannel != nil {
1781
- close (m .testingKnobs .RangeFeedResetChannel )
1782
- m .testingKnobs .RangeFeedResetChannel = nil
1795
+ if buildutil .CrdbTestBuild {
1796
+ panic (errors .AssertionFailedf ("range feed was not closed before a restart attempt" ))
1783
1797
}
1798
+ log .Warningf (ctx , "range feed was not closed before a restart attempt" )
1784
1799
}
1785
1800
// Ignore errors here because they indicate that the server is shutting down.
1786
1801
// Also note that the range feed automatically shuts down when the server
@@ -1937,12 +1952,28 @@ func (m *Manager) handleRangeFeedError(ctx context.Context) {
1937
1952
}
1938
1953
1939
1954
func (m * Manager ) restartLeasingRangeFeedLocked (ctx context.Context ) {
1955
+ // If someone else is already starting a range feed then exit early.
1956
+ if m .mu .rangeFeedRestartInProgress {
1957
+ return
1958
+ }
1940
1959
log .Warning (ctx , "attempting restart of leasing range feed" )
1960
+ // We will temporarily release the lock closing the range feed,
1961
+ // in case things need to drain before termination. It is possible for
1962
+ // another restart to enter once we release the lock.
1963
+ m .mu .rangeFeedRestartInProgress = true
1964
+ if m .mu .rangeFeed != nil {
1965
+ m .closeRangeFeedLocked ()
1966
+ if m .testingKnobs .RangeFeedResetChannel != nil {
1967
+ close (m .testingKnobs .RangeFeedResetChannel )
1968
+ m .testingKnobs .RangeFeedResetChannel = nil
1969
+ }
1970
+ }
1941
1971
// Attempt a range feed restart if it has been down too long.
1942
1972
m .watchForUpdates (ctx )
1943
1973
// Track when the last restart occurred.
1944
1974
m .mu .rangeFeedIsUnavailableAt = timeutil .Now ()
1945
1975
m .mu .rangeFeedCheckpoints = 0
1976
+ m .mu .rangeFeedRestartInProgress = false
1946
1977
}
1947
1978
1948
1979
// cleanupExpiredSessionLeases expires session based leases marked for removal,
0 commit comments