Skip to content

Commit 0cbfd4a

Browse files
craig[bot]pav-kv
andcommitted
Merge #143689
143689: kvserver: load committed entries after mu.Unlock() r=tbg a=pav-kv This commit makes `handleRaftReadyRaftMuLocked` load the `Ready.Committed` entries after releasing `Replica.mu`, while still holding `Replica.raftMu`. This ensures that we don't have this IO under `Replica.mu`. Also, this storage interaction is placed after `r.sendRaftMessages`, which reduces messaging latency. This also fixes a bug. Previously, `detachRaftEntriesMonitorRaftMuLocked` would be called too soon, after `Ready` has been generated. Instead, it should span the entire `Ready` handling scope, so that includes the scope of applying the entries to the state machine. Part of #140235 Related to #143652, #143615 Obsoletes #125842 Co-authored-by: Pavel Kalinnikov <[email protected]>
2 parents 9295740 + a630095 commit 0cbfd4a

File tree

3 files changed

+40
-32
lines changed

3 files changed

+40
-32
lines changed

pkg/kv/kvserver/logstore/logstore.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,9 @@ func LoadTerm(
680680
// which is a serious issue. But if the caller is unsure, they can check the
681681
// LastIndex to distinguish.
682682
//
683+
// The bytesAccount is used to account for and limit the loaded bytes. It can be
684+
// nil when the accounting / limiting is not needed.
685+
//
683686
// TODO(#132114): eliminate both ErrCompacted and ErrUnavailable.
684687
// TODO(pavelkalinnikov): return all entries we've read, consider maxSize a
685688
// target size. Currently we may read one extra entry and drop it.

pkg/kv/kvserver/replica_raft.go

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -946,14 +946,13 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
946946
}
947947

948948
var hasReady bool
949-
var outboundMsgs []raftpb.Message
950-
var msgStorageAppend raftpb.Message
951-
var toApply []raftpb.Entry
949+
var ready raft.Ready
950+
var logSnapshot raft.LogSnapshot
951+
952952
rac2ModeToUse := r.replicationAdmissionControlModeToUse(ctx)
953953
// Replication AC v2 state that is initialized while holding Replica.mu.
954954
replicaStateInfoMap := r.raftMu.replicaStateScratchForFlowControl
955955
var raftNodeBasicState replica_rac2.RaftNodeBasicState
956-
var logSnapshot raft.LogSnapshot
957956

958957
rac2ModeForReady := r.shMu.currentRACv2Mode
959958
leaderID := r.shMu.leaderID
@@ -981,31 +980,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
981980
}
982981
logSnapshot = raftGroup.LogSnapshot()
983982
if hasReady = raftGroup.HasReady(); hasReady {
984-
// Since we are holding raftMu, only the Slice() call below will use
985-
// raftMu.bytesAccount. It tracks memory usage that this Ready incurs.
986-
r.attachRaftEntriesMonitorRaftMuLocked()
987-
// TODO(pav-kv): currently, Slice() only accounts for entry bytes loaded
988-
// from log storage, and ignores the in-memory unstable entries. Pass a
989-
// flow control struct down the stack, and do a more complete accounting
990-
// in raft. This will also eliminate the "side channel" plumbing hack with
991-
// this bytesAccount.
992-
rd := raftGroup.Ready()
993-
if !rd.Committed.Empty() {
994-
// TODO(pav-kv): do this loading when Replica.mu is released. We don't
995-
// want IO under Replica.mu.
996-
if toApply, err = logSnapshot.Slice(
997-
rd.Committed, r.store.cfg.RaftMaxCommittedSizePerReady,
998-
); err != nil {
999-
return false, err
1000-
}
1001-
}
1002-
// We apply committed entries during this handleRaftReady, so it is ok to
1003-
// release the corresponding memory tokens at the end of this func. Next
1004-
// time we enter this function, the account will be empty again.
1005-
defer r.detachRaftEntriesMonitorRaftMuLocked()
1006-
1007-
logRaftReady(ctx, rd)
1008-
outboundMsgs, msgStorageAppend = splitLocalStorageMsgs(rd.Messages)
983+
ready = raftGroup.Ready()
1009984
}
1010985
if switchToPullModeAfterReady {
1011986
raftGroup.SetLazyReplication(true)
@@ -1028,7 +1003,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
10281003
unquiesceAndWakeLeader := hasReady || numFlushed > 0 || len(r.mu.proposals) > 0
10291004
return unquiesceAndWakeLeader, nil
10301005
})
1031-
r.mu.applyingEntries = len(toApply) != 0
1006+
r.mu.applyingEntries = !ready.Committed.Empty()
10321007
pausedFollowers := r.mu.pausedFollowers
10331008
r.mu.Unlock()
10341009
if errors.Is(err, errRemoved) {
@@ -1037,6 +1012,13 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
10371012
} else if err != nil {
10381013
return stats, errors.Wrap(err, "checking raft group for Ready")
10391014
}
1015+
1016+
var outboundMsgs []raftpb.Message
1017+
var msgStorageAppend raftpb.Message
1018+
if hasReady {
1019+
logRaftReady(ctx, ready)
1020+
outboundMsgs, msgStorageAppend = splitLocalStorageMsgs(ready.Messages)
1021+
}
10401022
// Even if we don't have a Ready, or entries in Ready,
10411023
// replica_rac2.Processor may need to do some work.
10421024
raftEvent := rac2.RaftEventFromMsgStorageAppendAndMsgApps(
@@ -1074,6 +1056,30 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
10741056
r.traceMessageSends(outboundMsgs, "sending messages")
10751057
r.sendRaftMessages(ctx, outboundMsgs, pausedFollowers, true /* willDeliverLocal */)
10761058

1059+
// Load the committed entries to be applied after releasing Replica.mu, to
1060+
// ensure that we don't have IO under this narrow/lightweight mutex. The
1061+
// RawNode can be making progress in the meantime, but it will never overwrite
1062+
// the committed entries it has been observing during the Ready() call.
1063+
//
1064+
// Also, do this loading after r.sendRaftMessages so that the outgoing
1065+
// messages don't need to wait for the storage interaction.
1066+
var toApply []raftpb.Entry
1067+
if !ready.Committed.Empty() {
1068+
// TODO(pav-kv): currently, Slice() only accounts for entry bytes loaded
1069+
// from log storage, and ignores the in-memory unstable entries. Consider a
1070+
// more complete flow control mechanism here, and eliminating the plumbing
1071+
// hack with the bytesAccount.
1072+
r.attachRaftEntriesMonitorRaftMuLocked()
1073+
// We apply committed entries during this handleRaftReady, so it is ok to
1074+
// release the corresponding memory tokens at the end of this func. Next
1075+
// time we enter this function, the account will be empty again.
1076+
defer r.detachRaftEntriesMonitorRaftMuLocked()
1077+
if toApply, err = logSnapshot.Slice(
1078+
ready.Committed, r.store.cfg.RaftMaxCommittedSizePerReady,
1079+
); err != nil {
1080+
return stats, errors.Wrap(err, "loading committed entries")
1081+
}
1082+
}
10771083
// If the ready struct includes entries that have been committed, these
10781084
// entries will be applied to the Replica's replicated state machine down
10791085
// below, after appending new entries to the raft log and sending messages

pkg/kv/kvserver/replica_raftlog.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,11 @@ func (r *replicaLogStorage) entriesLocked(
7777
//
7878
// TODO(pav-kv): we need better safety guardrails here. The log storage type
7979
// can remember the readable bounds, and assert that reads do not cross them.
80-
// TODO(pav-kv): r.raftMu.bytesAccount is broken - can't rely on raftMu here.
8180
entries, _, loadedSize, err := logstore.LoadEntries(
8281
r.AnnotateCtx(context.TODO()),
8382
r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
8483
r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes,
85-
&r.raftMu.bytesAccount,
84+
nil, // bytesAccount is not used when reading under Replica.mu
8685
)
8786
r.store.metrics.RaftStorageReadBytes.Inc(int64(loadedSize))
8887
return entries, err

0 commit comments

Comments
 (0)