Skip to content

Commit c7aed80

Browse files
craig[bot]tbg
andcommitted
106574: kvserver: avoid leaked replica mutex during delegated snapshot r=erikgrinaker a=tbg Prompted by cockroachdb#106568, I went through all callers to `repl.mu.Lock` and `repl.mu.RLock` trying to find places where we're leaking the mutex on certain return paths. I found some, including one that at least seems to possibly explain what we saw in cockroachdb#106568. I also looked into the possibility of panics under a held lock being recovered from at higher levels in the stack, in particular this code: https://github.com/cockroachdb/cockroach/blob/81569be4aeb61620d2fd51c41b416f7d0986698e/pkg/sql/colexecerror/error.go#L27-L30 However, it seems sufficiently restricted to recovering only from errors it knows to be safe to recover from, and in particular it doesn't look like it will ever recover from any panic in kvserver. I will say that this looks a little brittle, though I don't see a much better way of doing what this code is trying to achieve. Release note: None Epic: None Co-authored-by: Tobias Grieger <[email protected]>
2 parents 708e6f0 + e3c0767 commit c7aed80

File tree

6 files changed

+37
-6
lines changed

6 files changed

+37
-6
lines changed

pkg/kv/kvserver/replica.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,34 @@ type raftSparseStatus struct {
181181
Progress map[uint64]tracker.Progress
182182
}
183183

184+
// ReplicaMutex is an RWMutex. It has its own type to make it easier to look for
185+
// usages specific to the replica mutex.
186+
type ReplicaMutex syncutil.RWMutex
187+
188+
func (mu *ReplicaMutex) Lock() {
189+
(*syncutil.RWMutex)(mu).Lock()
190+
}
191+
192+
func (mu *ReplicaMutex) Unlock() {
193+
(*syncutil.RWMutex)(mu).Unlock()
194+
}
195+
196+
func (mu *ReplicaMutex) RLock() {
197+
(*syncutil.RWMutex)(mu).RLock()
198+
}
199+
200+
func (mu *ReplicaMutex) AssertHeld() {
201+
(*syncutil.RWMutex)(mu).AssertHeld()
202+
}
203+
204+
func (mu *ReplicaMutex) AssertRHeld() {
205+
(*syncutil.RWMutex)(mu).AssertRHeld()
206+
}
207+
208+
func (mu *ReplicaMutex) RUnlock() {
209+
(*syncutil.RWMutex)(mu).RUnlock()
210+
}
211+
184212
// A Replica is a contiguous keyspace with writes managed via an
185213
// instance of the Raft consensus algorithm. Many ranges may exist
186214
// in a store and they are unlikely to be contiguous. Ranges are
@@ -386,7 +414,7 @@ type Replica struct {
386414

387415
mu struct {
388416
// Protects all fields in the mu struct.
389-
syncutil.RWMutex
417+
ReplicaMutex
390418
// The destroyed status of a replica indicating if it's alive, corrupt,
391419
// scheduled for destruction or has been GCed.
392420
// destroyStatus should only be set while also holding the raftMu and

pkg/kv/kvserver/replica_app_batch.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
580580
existingClosed := r.mu.state.RaftClosedTimestamp
581581
newClosed := b.state.RaftClosedTimestamp
582582
if !newClosed.IsEmpty() && newClosed.Less(existingClosed) && raftClosedTimestampAssertionsEnabled {
583+
r.mu.Unlock()
583584
return errors.AssertionFailedf(
584585
"raft closed timestamp regression; replica has: %s, new batch has: %s.",
585586
existingClosed.String(), newClosed.String())

pkg/kv/kvserver/replica_command.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2953,15 +2953,15 @@ func (r *Replica) validateSnapshotDelegationRequest(
29532953
// that is also needs a snapshot, then any snapshot it sends will be useless.
29542954
r.mu.RLock()
29552955
replIdx := r.mu.state.RaftAppliedIndex + 1
2956-
29572956
status := r.raftStatusRLocked()
2957+
r.mu.RUnlock()
2958+
29582959
if status == nil {
29592960
// This code path is sometimes hit during scatter for replicas that
29602961
// haven't woken up yet.
29612962
return errors.Errorf("raft status not initialized")
29622963
}
29632964
replTerm := kvpb.RaftTerm(status.Term)
2964-
r.mu.RUnlock()
29652965

29662966
// Delegate has a lower term than the coordinator. This typically means the
29672967
// lease has been transferred, and we should not process this request. There

pkg/kv/kvserver/replica_proposal_buf.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,11 +1277,11 @@ type replicaProposer Replica
12771277
var _ proposer = &replicaProposer{}
12781278

12791279
func (rp *replicaProposer) locker() sync.Locker {
1280-
return &rp.mu.RWMutex
1280+
return &rp.mu.ReplicaMutex
12811281
}
12821282

12831283
func (rp *replicaProposer) rlocker() sync.Locker {
1284-
return rp.mu.RWMutex.RLocker()
1284+
return &rp.mu.ReplicaMutex
12851285
}
12861286

12871287
func (rp *replicaProposer) getReplicaID() roachpb.ReplicaID {

pkg/kv/kvserver/replica_raftstorage.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,8 +642,10 @@ func (r *Replica) applySnapshot(
642642
if isInitialSnap {
643643
// NB: this will also call setDescLockedRaftMuLocked.
644644
if err := r.initFromSnapshotLockedRaftMuLocked(ctx, desc); err != nil {
645+
r.mu.Unlock()
645646
log.Fatalf(ctx, "unable to initialize replica while applying snapshot: %+v", err)
646647
} else if err := r.store.markReplicaInitializedLockedReplLocked(ctx, r); err != nil {
648+
r.mu.Unlock()
647649
log.Fatalf(ctx, "unable to mark replica initialized while applying snapshot: %+v", err)
648650
}
649651
} else {

pkg/kv/kvserver/stores_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (s *baseStore) SetQueueActive(active bool, queue string) error {
9999
func (s *baseStore) GetReplicaMutexForTesting(rangeID roachpb.RangeID) *syncutil.RWMutex {
100100
store := (*Store)(s)
101101
if repl := store.GetReplicaIfExists(rangeID); repl != nil {
102-
return &repl.mu.RWMutex
102+
return (*syncutil.RWMutex)(&repl.mu.ReplicaMutex)
103103
}
104104
return nil
105105
}

0 commit comments

Comments
 (0)