Skip to content

Commit f749126

Browse files
committed
kvserver: refresh ReplicaUnavailableErrors on the leaderlessWatcher
Previously, we would construct a ReplicaUnavailableError on the LeaderlessWatcher once (using an empty descriptor, which is rather hilarious) and never updated the error. That meant that the returned error wouldn't reflect the current state of the RangeDescriptor, which made it hard to make sense of the shape of the unavailability. This patch fixes the issue by refreshing the cached error on every available -> unavailable state transition in the leaderlessWatcher. Closes #144639 Release note: None
1 parent 1cd9496 commit f749126

File tree

7 files changed

+133
-27
lines changed

7 files changed

+133
-27
lines changed

pkg/kv/kvserver/client_replica_test.go

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ import (
6868
"github.com/cockroachdb/cockroach/pkg/util/log"
6969
"github.com/cockroachdb/cockroach/pkg/util/randutil"
7070
"github.com/cockroachdb/cockroach/pkg/util/retry"
71+
"github.com/cockroachdb/cockroach/pkg/util/stop"
7172
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
7273
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
7374
"github.com/cockroachdb/cockroach/pkg/util/tracing"
@@ -2788,7 +2789,6 @@ func TestLossQuorumCauseLeaderWatcherToSignalUnavailable(t *testing.T) {
27882789
require.NoError(t, log.SetVModule("replica_range_lease=3,raft=4"))
27892790

27902791
ctx := context.Background()
2791-
manualClock := hlc.NewHybridManualClock()
27922792
stickyVFSRegistry := fs.NewStickyRegistry()
27932793
lisReg := listenerutil.NewListenerRegistry()
27942794
defer lisReg.Close()
@@ -2851,12 +2851,8 @@ func TestLossQuorumCauseLeaderWatcherToSignalUnavailable(t *testing.T) {
28512851
return nil
28522852
})
28532853

2854-
// Increment the clock by the leaderlessWatcher unavailable threshold.
2855-
manualClock.Increment(threshold.Nanoseconds())
2856-
28572854
// Wait for the leaderlessWatcher to indicate that the range is unavailable.
28582855
testutils.SucceedsSoon(t, func() error {
2859-
tc.GetFirstStoreFromServer(t, aliveNodeIdx).LookupReplica(roachpb.RKey(key))
28602856
if !repl.LeaderlessWatcher.IsUnavailable() {
28612857
return errors.New("range is still available")
28622858
}
@@ -2915,6 +2911,77 @@ func TestLossQuorumCauseLeaderWatcherToSignalUnavailable(t *testing.T) {
29152911
})
29162912
}
29172913

2914+
// TestLeaderlessWatcherUnavailabilityErrorRefreshedOnUnavailabilityTransition
2915+
// ensures that the leaderless watcher constructs a new error every time it
2916+
// transitions to the unavailable state. In particular, the descriptor used
2917+
// in the error should be the latest descriptor.
2918+
// Serves as a regression test for
2919+
// https://github.com/cockroachdb/cockroach/issues/144639.
2920+
func TestLeaderlessWatcherErrorRefreshedOnUnavailabilityTransition(t *testing.T) {
2921+
defer leaktest.AfterTest(t)()
2922+
defer log.Scope(t).Close(t)
2923+
ctx := context.Background()
2924+
stopper := stop.NewStopper()
2925+
defer stopper.Stop(ctx)
2926+
2927+
manual := hlc.NewHybridManualClock()
2928+
st := cluster.MakeTestingClusterSettings()
2929+
// Set the leaderless threshold to 10 second.
2930+
kvserver.ReplicaLeaderlessUnavailableThreshold.Override(ctx, &st.SV, 10*time.Second)
2931+
2932+
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
2933+
ReplicationMode: base.ReplicationManual,
2934+
ServerArgs: base.TestServerArgs{
2935+
Settings: st,
2936+
Knobs: base.TestingKnobs{
2937+
Server: &server.TestingKnobs{
2938+
WallClock: manual,
2939+
},
2940+
},
2941+
},
2942+
})
2943+
defer tc.Stopper().Stop(ctx)
2944+
key := tc.ScratchRange(t)
2945+
tc.AddVotersOrFatal(t, key, tc.Targets(1)...)
2946+
repl := tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey(key))
2947+
2948+
// The leaderlessWatcher starts off as available.
2949+
require.False(t, repl.LeaderlessWatcher.IsUnavailable())
2950+
// Let it know it's leaderless.
2951+
repl.RefreshLeaderlessWatcherUnavailableStateForTesting(ctx, raft.None, manual.Now(), st)
2952+
// Even though the replica is leaderless, enough time hasn't passed for it to
2953+
// be considered unavailable.
2954+
require.False(t, repl.LeaderlessWatcher.IsUnavailable())
2955+
// The error should be nil as we're not considered leaderless at this point.
2956+
require.NoError(t, repl.LeaderlessWatcher.Err())
2957+
// Let enough time pass.
2958+
manual.Increment(10 * time.Second.Nanoseconds())
2959+
repl.RefreshLeaderlessWatcherUnavailableStateForTesting(ctx, raft.None, manual.Now(), st)
2960+
// Now the replica is considered unavailable.
2961+
require.True(t, repl.LeaderlessWatcher.IsUnavailable())
2962+
require.Error(t, repl.LeaderlessWatcher.Err())
2963+
// Regex to ensure we've got a replica unavailable error with n1 and n2 in the
2964+
// range descriptor.
2965+
require.Regexp(t, "replica unavailable.*n1.*n2.*gen=3", repl.LeaderlessWatcher.Err().Error())
2966+
2967+
// Next up, let the replica know there's a leader. This should make it
2968+
// available again.
2969+
repl.RefreshLeaderlessWatcherUnavailableStateForTesting(ctx, 1, manual.Now(), st)
2970+
require.False(t, repl.LeaderlessWatcher.IsUnavailable())
2971+
// Change the range descriptor. Mark it leaderless and let enough time pass
2972+
// for it to be considered unavailable again.
2973+
tc.AddVotersOrFatal(t, key, tc.Targets(2)...)
2974+
repl.RefreshLeaderlessWatcherUnavailableStateForTesting(ctx, raft.None, manual.Now(), st)
2975+
manual.Increment(10 * time.Second.Nanoseconds())
2976+
repl.RefreshLeaderlessWatcherUnavailableStateForTesting(ctx, raft.None, manual.Now(), st)
2977+
// The replica should now be considered unavailable again.
2978+
require.True(t, repl.LeaderlessWatcher.IsUnavailable())
2979+
require.Error(t, repl.LeaderlessWatcher.Err())
2980+
// Ensure that the range descriptor now contains n1, n2, and n3 -- i.e, we're
2981+
// updating the error with the latest descriptor on the latest transition.
2982+
require.Regexp(t, "replica unavailable.*n1.*n2.*n3.*gen=5", repl.LeaderlessWatcher.Err().Error())
2983+
}
2984+
29182985
func TestClearRange(t *testing.T) {
29192986
defer leaktest.AfterTest(t)()
29202987
defer log.Scope(t).Close(t)

pkg/kv/kvserver/replica.go

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -195,10 +195,10 @@ type leaderlessWatcher struct {
195195
// unavailable is set to true if the replica is leaderless for a long time
196196
// (longer than ReplicaLeaderlessUnavailableThreshold).
197197
unavailable bool
198-
}
199198

200-
// err is the error returned when the replica is leaderless for a long time.
201-
err error
199+
// err is the error returned when the replica is leaderless for a long time.
200+
err error
201+
}
202202

203203
// closedChannel is an already closed channel. Requests will use it to know
204204
// that the replica is leaderless, and can be considered unavailable. This
@@ -208,22 +208,21 @@ type leaderlessWatcher struct {
208208
closedChannel chan struct{}
209209
}
210210

211-
// newLeaderlessWatcher initializes a new leaderlessWatcher with the default
212-
// values.
213-
func newLeaderlessWatcher(r *Replica) *leaderlessWatcher {
211+
// newLeaderlessWatcher constructs and returns a new leaderlessWatcher.
212+
func newLeaderlessWatcher() *leaderlessWatcher {
214213
closedCh := make(chan struct{})
215214
close(closedCh)
216215
return &leaderlessWatcher{
217-
err: r.replicaUnavailableError(
218-
errors.Errorf("replica has been leaderless for %s",
219-
ReplicaLeaderlessUnavailableThreshold.Get(&r.store.cfg.Settings.SV))),
220216
closedChannel: closedCh,
221217
}
222218
}
223219

224220
// Err implements the signaller interface.
225221
func (lw *leaderlessWatcher) Err() error {
226-
return lw.err
222+
lw.mu.RLock()
223+
defer lw.mu.RUnlock()
224+
225+
return lw.mu.err
227226
}
228227

229228
// C implements the signaller interface.
@@ -237,14 +236,23 @@ func (lw *leaderlessWatcher) C() <-chan struct{} {
237236
func (lw *leaderlessWatcher) IsUnavailable() bool {
238237
lw.mu.RLock()
239238
defer lw.mu.RUnlock()
239+
240+
// The error is set iff the replica is unavailable. Sanity check.
241+
if lw.mu.unavailable == (lw.mu.err == nil) {
242+
panic("unavailable implies error is set")
243+
}
240244
return lw.mu.unavailable
241245
}
242246

243247
// refreshUnavailableState refreshes the unavailable state on the leaderless
244248
// watcher. Replicas are considered unavailable if they have been leaderless for
245249
// a long time, where long is defined by the ReplicaUnavailableThreshold.
246250
func (lw *leaderlessWatcher) refreshUnavailableState(
247-
ctx context.Context, postTickLead raftpb.PeerID, nowPhysicalTime time.Time, st *cluster.Settings,
251+
ctx context.Context,
252+
postTickLead raftpb.PeerID,
253+
nowPhysicalTime time.Time,
254+
st *cluster.Settings,
255+
newReplicaUnavailableError func(error) error,
248256
) {
249257
lw.mu.Lock()
250258
defer lw.mu.Unlock()
@@ -272,20 +280,28 @@ func (lw *leaderlessWatcher) refreshUnavailableState(
272280
// the threshold. Otherwise, mark the replica as unavailable.
273281
durationSinceLeaderless := nowPhysicalTime.Sub(lw.mu.leaderlessTimestamp)
274282
if durationSinceLeaderless >= threshold {
275-
err := errors.Errorf("have been leaderless for %.2fs, setting the "+
276-
"leaderless watcher replica's state as unavailable",
277-
durationSinceLeaderless.Seconds())
278283
if log.ExpensiveLogEnabled(ctx, 1) {
284+
err := errors.Errorf("have been leaderless for %.2fs, setting the "+
285+
"leaderless watcher replica's state as unavailable",
286+
durationSinceLeaderless.Seconds())
279287
log.VEventf(ctx, 1, "%s", err)
280288
}
289+
// Transition to being unavailable.
281290
lw.mu.unavailable = true
291+
// Now that we're transitioning to being unavailable, construct and cache
292+
// the associated error.
293+
lw.mu.err = newReplicaUnavailableError(
294+
errors.Errorf("replica has been leaderless for %s",
295+
ReplicaLeaderlessUnavailableThreshold.Get(&st.SV)),
296+
)
282297
}
283298
}
284299
}
285300

286301
func (lw *leaderlessWatcher) resetLocked() {
287302
lw.mu.leaderlessTimestamp = time.Time{}
288303
lw.mu.unavailable = false
304+
lw.mu.err = nil
289305
}
290306

291307
// ReplicaMutex is an RWMutex. It has its own type to make it easier to look for
@@ -2822,6 +2838,17 @@ func (r *Replica) GetCachedClosedTimestampPolicyForTesting() ctpb.RangeClosedTim
28222838
return ctpb.RangeClosedTimestampPolicy(r.cachedClosedTimestampPolicy.Load())
28232839
}
28242840

2841+
// RefreshLeaderlessWatcherUnavailableStateForTesting refreshes the replica's
2842+
// leaderlessWatcher's unavailable state. Intended for tests.
2843+
func (r *Replica) RefreshLeaderlessWatcherUnavailableStateForTesting(
2844+
ctx context.Context, postTickLead raftpb.PeerID, nowPhysicalTime time.Time, st *cluster.Settings,
2845+
) {
2846+
r.mu.Lock()
2847+
defer r.mu.Unlock()
2848+
2849+
r.LeaderlessWatcher.refreshUnavailableState(ctx, postTickLead, nowPhysicalTime, st, r.replicaUnavailableErrorRLocked)
2850+
}
2851+
28252852
// maybeEnqueueProblemRange will enqueue the replica for processing into the
28262853
// replicate queue iff:
28272854
//

pkg/kv/kvserver/replica_circuit_breaker.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,23 @@ func replicaUnavailableError(
268268
return kvpb.NewReplicaUnavailableError(errors.Wrapf(err, "%s", buf), desc, replDesc)
269269
}
270270

271+
// replicaUnavailableError returns a new ReplicaUnavailableError that wraps the
272+
// provided error.
271273
func (r *Replica) replicaUnavailableError(err error) error {
272-
desc := r.Desc()
274+
r.mu.RLock()
275+
defer r.mu.RUnlock()
276+
277+
return r.replicaUnavailableErrorRLocked(err)
278+
}
279+
280+
// replicaUnavailableLocked is like replicaUnavailableError, except it requires
281+
// r.mu to be RLocked.
282+
func (r *Replica) replicaUnavailableErrorRLocked(err error) error {
283+
desc := r.shMu.state.Desc
273284
replDesc, _ := desc.GetReplicaDescriptor(r.store.StoreID())
274285

275286
isLiveMap, _ := r.store.livenessMap.Load().(livenesspb.IsLiveMap)
276-
ct := r.GetCurrentClosedTimestamp(context.Background())
277-
return replicaUnavailableError(err, desc, replDesc, isLiveMap, r.RaftStatus(), ct)
287+
ct := r.getCurrentClosedTimestampLocked(context.Background(), hlc.Timestamp{} /* sufficient */)
288+
289+
return replicaUnavailableError(err, desc, replDesc, isLiveMap, r.raftStatusRLocked(), ct)
278290
}

pkg/kv/kvserver/replica_init.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func newUninitializedReplicaWithoutRaftGroup(
242242
r.breaker = newReplicaCircuitBreaker(
243243
store.cfg.Settings, store.stopper, r.AmbientContext, r, onTrip, onReset,
244244
)
245-
r.LeaderlessWatcher = newLeaderlessWatcher(r)
245+
r.LeaderlessWatcher = newLeaderlessWatcher()
246246
r.shMu.currentRACv2Mode = r.replicationAdmissionControlModeToUse(context.TODO())
247247
r.raftMu.flowControlLevel = kvflowcontrol.GetV2EnabledWhenLeaderLevel(
248248
r.raftCtx, store.ClusterSettings(), store.TestingKnobs().FlowControlTestingKnobs)

pkg/kv/kvserver/replica_raft.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1512,7 +1512,7 @@ func (r *Replica) tick(
15121512

15131513
// Refresh the unavailability state on the leaderlessWatcher.
15141514
r.LeaderlessWatcher.refreshUnavailableState(
1515-
ctx, postTickStatus.Lead, nowPhysicalTime, r.store.cfg.Settings,
1515+
ctx, postTickStatus.Lead, nowPhysicalTime, r.store.cfg.Settings, r.replicaUnavailableErrorRLocked,
15161516
)
15171517

15181518
if preTickStatus.RaftState != postTickStatus.RaftState {

pkg/kv/kvserver/replica_raft_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ func TestMaybeMarkReplicaUnavailableInLeaderlessWatcher(t *testing.T) {
481481
repl := tContext.repl
482482
repl.LeaderlessWatcher.mu.unavailable = tc.initReplicaUnavailable
483483
repl.LeaderlessWatcher.mu.leaderlessTimestamp = tc.initLeaderlessTimestamp
484-
repl.LeaderlessWatcher.refreshUnavailableState(ctx, tc.leader, now, cfg.Settings)
484+
repl.RefreshLeaderlessWatcherUnavailableStateForTesting(ctx, tc.leader, now, cfg.Settings)
485485
require.Equal(t, tc.expectedUnavailable, repl.LeaderlessWatcher.IsUnavailable())
486486
require.Equal(t, tc.expectedLeaderlessTime, repl.LeaderlessWatcher.mu.leaderlessTimestamp)
487487

pkg/kv/kvserver/replica_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15429,8 +15429,8 @@ func TestLeaderlessWatcherInit(t *testing.T) {
1542915429
// The leaderless timestamp is not set.
1543015430
require.Equal(t, time.Time{}, repl.LeaderlessWatcher.mu.leaderlessTimestamp)
1543115431

15432-
// The error is always loaded.
15433-
require.Regexp(t, "replica has been leaderless for 10s", repl.LeaderlessWatcher.Err())
15432+
// The error is nilled out.
15433+
require.Nil(t, repl.LeaderlessWatcher.mu.err)
1543415434

1543515435
// The channel is closed.
1543615436
c := repl.LeaderlessWatcher.C()

0 commit comments

Comments
 (0)