Skip to content

Commit 2993649

Browse files
committed
kvserver: push unavailability computation inside the leaderlessWatcher
No reason to do this on the Replica. Release note: None
1 parent c1c1ed4 commit 2993649

File tree

3 files changed

+48
-47
lines changed

3 files changed

+48
-47
lines changed

pkg/kv/kvserver/replica.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,49 @@ func (lw *leaderlessWatcher) IsUnavailable() bool {
235235
return lw.mu.unavailable
236236
}
237237

238+
// refreshUnavailableState refreshes the unavailable state on the leaderless
239+
// watcher. Replicas are considered unavailable if they have been leaderless for
240+
// a long time, where long is defined by the ReplicaUnavailableThreshold.
241+
func (lw *leaderlessWatcher) refreshUnavailableState(
242+
ctx context.Context, postTickLead raftpb.PeerID, nowPhysicalTime time.Time, st *cluster.Settings,
243+
) {
244+
lw.mu.Lock()
245+
defer lw.mu.Unlock()
246+
247+
threshold := ReplicaLeaderlessUnavailableThreshold.Get(&st.SV)
248+
if threshold == time.Duration(0) {
249+
// The leaderless watcher is disabled. It's important to reset the
250+
// leaderless watcher when it's disabled to reset any replica that was
251+
// marked as unavailable before the watcher was disabled.
252+
lw.resetLocked()
253+
return
254+
}
255+
256+
if postTickLead != raft.None {
257+
// If we know about the leader, reset the leaderless timer, and mark the
258+
// replica as available.
259+
lw.resetLocked()
260+
} else if lw.mu.leaderlessTimestamp.IsZero() {
261+
// If we don't know about the leader, and we haven't been leaderless before,
262+
// mark the time we became leaderless.
263+
lw.mu.leaderlessTimestamp = nowPhysicalTime
264+
} else if !lw.mu.unavailable {
265+
// At this point we know that we have been leaderless for some time, and we
266+
// haven't marked the replica as unavailable yet. Make sure we didn't exceed
267+
// the threshold. Otherwise, mark the replica as unavailable.
268+
durationSinceLeaderless := nowPhysicalTime.Sub(lw.mu.leaderlessTimestamp)
269+
if durationSinceLeaderless >= threshold {
270+
err := errors.Errorf("have been leaderless for %.2fs, setting the "+
271+
"leaderless watcher replica's state as unavailable",
272+
durationSinceLeaderless.Seconds())
273+
if log.ExpensiveLogEnabled(ctx, 1) {
274+
log.VEventf(ctx, 1, "%s", err)
275+
}
276+
lw.mu.unavailable = true
277+
}
278+
}
279+
}
280+
238281
func (lw *leaderlessWatcher) resetLocked() {
239282
lw.mu.leaderlessTimestamp = time.Time{}
240283
lw.mu.unavailable = false

pkg/kv/kvserver/replica_raft.go

Lines changed: 4 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,9 +1510,10 @@ func (r *Replica) tick(
15101510
r.mu.internalRaftGroup.Tick()
15111511
postTickStatus := r.mu.internalRaftGroup.BasicStatus()
15121512

1513-
// Check if the replica has been leaderless for too long, and potentially set
1514-
// the leaderless watcher replica state as unavailable.
1515-
r.maybeMarkReplicaUnavailableInLeaderlessWatcher(ctx, postTickStatus.Lead, nowPhysicalTime)
1513+
// Refresh the unavailability state on the leaderlessWatcher.
1514+
r.LeaderlessWatcher.refreshUnavailableState(
1515+
ctx, postTickStatus.Lead, nowPhysicalTime, r.store.cfg.Settings,
1516+
)
15161517

15171518
if preTickStatus.RaftState != postTickStatus.RaftState {
15181519
if postTickStatus.RaftState == raftpb.StatePreCandidate {
@@ -2114,49 +2115,6 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID
21142115
}
21152116
}
21162117

2117-
// maybeMarkReplicaUnavailableInLeaderlessWatcher marks the replica as
2118-
// unavailable in the leaderless watcher if the replica has been leaderless
2119-
// for a duration of time greater than or equal to the threshold.
2120-
func (r *Replica) maybeMarkReplicaUnavailableInLeaderlessWatcher(
2121-
ctx context.Context, postTickLead raftpb.PeerID, storeClockTime time.Time,
2122-
) {
2123-
r.LeaderlessWatcher.mu.Lock()
2124-
defer r.LeaderlessWatcher.mu.Unlock()
2125-
2126-
threshold := ReplicaLeaderlessUnavailableThreshold.Get(&r.store.cfg.Settings.SV)
2127-
if threshold == time.Duration(0) {
2128-
// The leaderless watcher is disabled. It's important to reset the
2129-
// leaderless watcher when it's disabled to reset any replica that was
2130-
// marked as unavailable before the watcher was disabled.
2131-
r.LeaderlessWatcher.resetLocked()
2132-
return
2133-
}
2134-
2135-
if postTickLead != raft.None {
2136-
// If we know about the leader, reset the leaderless timer, and mark the
2137-
// replica as available.
2138-
r.LeaderlessWatcher.resetLocked()
2139-
} else if r.LeaderlessWatcher.mu.leaderlessTimestamp.IsZero() {
2140-
// If we don't know about the leader, and we haven't been leaderless before,
2141-
// mark the time we became leaderless.
2142-
r.LeaderlessWatcher.mu.leaderlessTimestamp = storeClockTime
2143-
} else if !r.LeaderlessWatcher.mu.unavailable {
2144-
// At this point we know that we have been leaderless for sometime, and
2145-
// we haven't marked the replica as unavailable yet. Make sure we didn't
2146-
// exceed the threshold. Otherwise, mark the replica as unavailable.
2147-
durationSinceLeaderless := storeClockTime.Sub(r.LeaderlessWatcher.mu.leaderlessTimestamp)
2148-
if durationSinceLeaderless >= threshold {
2149-
err := errors.Errorf("have been leaderless for %.2fs, setting the "+
2150-
"leaderless watcher replica's state as unavailable",
2151-
durationSinceLeaderless.Seconds())
2152-
if log.ExpensiveLogEnabled(ctx, 1) {
2153-
log.VEventf(ctx, 1, "%s", err)
2154-
}
2155-
r.LeaderlessWatcher.mu.unavailable = true
2156-
}
2157-
}
2158-
}
2159-
21602118
type snapTruncationInfo struct {
21612119
index kvpb.RaftIndex
21622120
recipientStore roachpb.StoreID

pkg/kv/kvserver/replica_raft_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ func TestMaybeMarkReplicaUnavailableInLeaderlessWatcher(t *testing.T) {
481481
repl := tContext.repl
482482
repl.LeaderlessWatcher.mu.unavailable = tc.initReplicaUnavailable
483483
repl.LeaderlessWatcher.mu.leaderlessTimestamp = tc.initLeaderlessTimestamp
484-
repl.maybeMarkReplicaUnavailableInLeaderlessWatcher(ctx, tc.leader, now)
484+
repl.LeaderlessWatcher.refreshUnavailableState(ctx, tc.leader, now, cfg.Settings)
485485
require.Equal(t, tc.expectedUnavailable, repl.LeaderlessWatcher.IsUnavailable())
486486
require.Equal(t, tc.expectedLeaderlessTime, repl.LeaderlessWatcher.mu.leaderlessTimestamp)
487487

0 commit comments

Comments
 (0)