Skip to content

Commit 48b1345

Browse files
craig[bot]pav-kv
andcommitted
Merge #143615
143615: raft: replace MsgStorageApply protocol r=tbg a=pav-kv Raft interaction with storage mainly consists of two protocols: `MsgStorageAppend` and `MsgStorageApply`. This PR focuses on the latter. The protocol is replaced with a more ergonomic / strongly-typed API, which also provides the user with a better / direct control of the apply flow. --- The old flow: 1. If there are unapplied committed entries, `RawNode.Ready()` constructs a `MsgStorageApply` including some/all of these entries. 2. The `MsgStorageApply` is included in `Ready.Messages`. 3. The application filters it out from other messages, and acts on it. 4. When the job is done, the `RawNode` must be notified. The `MsgStorageApply` contains a `MsgStorageApplyResp` in its `Responses` field, which must be stepped back to `RawNode`. Downsides of this approach: - The decision of fetching entries from storage and constructing `MsgStorageApply` is made by raft. This necessitated some form of built-in flow control, but we never ended up using it: #143576. It would be better to have ability to control this flow from outside raft. - `MsgStorageApply` construction and the associated IO happens under `Replica.{raftMu+mu}` during the `Ready()` call. We would like to avoid IO when holding `Replica.mu`: #140235. - The raft messaging system is slightly abused with this approach. All raft messages contain the extra `Responses` [field](https://github.com/cockroachdb/cockroach/blob/acb74317523b0a0849d827a968167a9243e2bb88/pkg/raft/raftpb/raft.proto#L109-L112) that only the two storage APIs use. Most raft messages are external, and don't require order/delivery, while the `MsgStorageAppend/Apply` protocols require strict order. - Storage interaction being encoded in a `raftpb.Message` with many unused fields is simply confusing and not a great API. The preference is to have dedicated types with stronger/narrower semantics. --- The downsides are addressed by the new flow: 1. `RawNode.Ready()` includes a `Ready.Committed LogSpan`, to signify the committed but not applied span of the log. 2. The caller obtains `RawNode.LogSnapshot()` and has the freedom of fetching/applying the entire committed span or a prefix. It also can do so after releasing `Replica.mu`. 3. When applied, the caller notifies using the `RawNode.AckApplied()` method. --- Part of #143652 Related to #124440 Epic: CRDB-46488 Release note: none Co-authored-by: Pavel Kalinnikov <[email protected]>
2 parents a0df03c + 6a22902 commit 48b1345

File tree

57 files changed

+923
-995
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+923
-995
lines changed

pkg/kv/kvpb/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ go_library(
3131
"//pkg/col/coldata",
3232
"//pkg/kv/kvnemesis/kvnemesisutil",
3333
"//pkg/kv/kvserver/concurrency/lock",
34+
"//pkg/raft/raftpb",
3435
"//pkg/roachpb",
3536
"//pkg/storage/enginepb",
3637
"//pkg/util/admission/admissionpb",

pkg/kv/kvpb/data.go

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package kvpb
88

99
import (
10+
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
1011
"github.com/cockroachdb/cockroach/pkg/roachpb"
1112
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
1213
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -183,36 +184,6 @@ type LeaseAppliedIndex uint64
183184
// SafeValue implements the redact.SafeValue interface.
184185
func (s LeaseAppliedIndex) SafeValue() {}
185186

186-
// RaftTerm represents the term of a raft message. This corresponds to Term in
187-
// HardState.Term in the Raft library. That type is a uint64, so it is necessary
188-
// to cast to/from that type when dealing with the Raft library, however
189-
// internally RaftTerm is used for all fields in CRDB.
190-
type RaftTerm uint64
191-
192-
// SafeValue implements the redact.SafeValue interface.
193-
func (s RaftTerm) SafeValue() {}
194-
195-
// RaftIndex represents the term of a raft message. This corresponds to Index in
196-
// HardState.Index in the Raft library. That type is a uint64, so it is
197-
// necessary to cast to/from that type when dealing with the Raft library,
198-
// however internally RaftIndex is used for all fields in CRDB.
199-
type RaftIndex uint64
200-
201-
// SafeValue implements the redact.SafeValue interface.
202-
func (s RaftIndex) SafeValue() {}
203-
204-
// RaftSpan represents a (begin, end] span of indices in a raft log. The choice
205-
// of excluding the left bound and including the right bound is deliberate and
206-
// principled. When working with raft logs, it almost always helps to avoid
207-
// off-by-one errors and risk of integer underflow.
208-
type RaftSpan struct {
209-
// After is the left bound of the log indices span. Exclusive.
210-
After RaftIndex
211-
// Last is the right bound of the log indices span. Inclusive.
212-
Last RaftIndex
213-
}
214-
215-
// Contains returns true iff the given index is within the span.
216-
func (s RaftSpan) Contains(index RaftIndex) bool {
217-
return index > s.After && index <= s.Last
218-
}
187+
type RaftTerm = raftpb.Term
188+
type RaftIndex = raftpb.Index
189+
type RaftSpan = raftpb.LogSpan

pkg/kv/kvserver/raft.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,7 @@ func logRaftReady(ctx context.Context, ready raft.Ready) {
166166
fmt.Fprintf(&buf, " New Entry[%d]: %.200s\n",
167167
i, raft.DescribeEntry(e, raftEntryFormatter))
168168
}
169-
for i, e := range ready.CommittedEntries {
170-
fmt.Fprintf(&buf, " Committed Entry[%d]: %.200s\n",
171-
i, raft.DescribeEntry(e, raftEntryFormatter))
172-
}
169+
fmt.Fprintf(&buf, " Committed: %v\n", ready.Committed)
173170
if !raft.IsEmptySnap(ready.Snapshot) {
174171
snap := ready.Snapshot
175172
snap.Data = nil

pkg/kv/kvserver/replica_raft.go

Lines changed: 33 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -947,7 +947,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
947947

948948
var hasReady bool
949949
var outboundMsgs []raftpb.Message
950-
var msgStorageAppend, msgStorageApply raftpb.Message
950+
var msgStorageAppend raftpb.Message
951+
var toApply []raftpb.Entry
951952
rac2ModeToUse := r.replicationAdmissionControlModeToUse(ctx)
952953
// Replication AC v2 state that is initialized while holding Replica.mu.
953954
replicaStateInfoMap := r.raftMu.replicaStateScratchForFlowControl
@@ -977,31 +978,37 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
977978
}
978979
r.mu.currentRACv2Mode = rac2ModeToUse
979980
}
981+
logSnapshot = raftGroup.LogSnapshot()
980982
if hasReady = raftGroup.HasReady(); hasReady {
981-
// Since we are holding raftMu, only this Ready() call will use
983+
// Since we are holding raftMu, only the Slice() call below will use
982984
// raftMu.bytesAccount. It tracks memory usage that this Ready incurs.
983985
r.attachRaftEntriesMonitorRaftMuLocked()
984-
// TODO(pav-kv): currently, Ready() only accounts for entry bytes loaded
986+
// TODO(pav-kv): currently, Slice() only accounts for entry bytes loaded
985987
// from log storage, and ignores the in-memory unstable entries. Pass a
986988
// flow control struct down the stack, and do a more complete accounting
987989
// in raft. This will also eliminate the "side channel" plumbing hack with
988990
// this bytesAccount.
989-
syncRd := raftGroup.Ready()
991+
rd := raftGroup.Ready()
992+
if !rd.Committed.Empty() {
993+
// TODO(pav-kv): do this loading when Replica.mu is released. We don't
994+
// want IO under Replica.mu.
995+
if toApply, err = logSnapshot.Slice(
996+
rd.Committed, r.store.cfg.RaftMaxCommittedSizePerReady,
997+
); err != nil {
998+
return false, err
999+
}
1000+
}
9901001
// We apply committed entries during this handleRaftReady, so it is ok to
9911002
// release the corresponding memory tokens at the end of this func. Next
9921003
// time we enter this function, the account will be empty again.
9931004
defer r.detachRaftEntriesMonitorRaftMuLocked()
9941005

995-
logRaftReady(ctx, syncRd)
996-
asyncRd := makeAsyncReady(syncRd)
997-
outboundMsgs, msgStorageAppend, msgStorageApply = splitLocalStorageMsgs(asyncRd.Messages)
1006+
logRaftReady(ctx, rd)
1007+
outboundMsgs, msgStorageAppend = splitLocalStorageMsgs(rd.Messages)
9981008
}
9991009
if switchToPullModeAfterReady {
10001010
raftGroup.SetLazyReplication(true)
10011011
}
1002-
if rac2ModeForReady == rac2.MsgAppPull {
1003-
logSnapshot = raftGroup.LogSnapshot()
1004-
}
10051012
raftNodeBasicState = replica_rac2.MakeRaftNodeBasicStateLocked(
10061013
raftGroup, r.shMu.state.Lease.Replica.ReplicaID)
10071014
replica_rac2.MakeReplicaStateInfos(raftGroup, replicaStateInfoMap)
@@ -1020,7 +1027,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
10201027
unquiesceAndWakeLeader := hasReady || numFlushed > 0 || len(r.mu.proposals) > 0
10211028
return unquiesceAndWakeLeader, nil
10221029
})
1023-
r.mu.applyingEntries = hasMsg(msgStorageApply)
1030+
r.mu.applyingEntries = len(toApply) != 0
10241031
pausedFollowers := r.mu.pausedFollowers
10251032
r.mu.Unlock()
10261033
if errors.Is(err, errRemoved) {
@@ -1077,9 +1084,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
10771084
// entries and acknowledge as many as we can trivially prove will not be
10781085
// rejected beneath raft.
10791086
//
1080-
// Note that the Entries slice in the MsgStorageApply cannot refer to entries
1081-
// that are also in the Entries slice in the MsgStorageAppend. Raft will not
1082-
// allow unstable entries to be applied.
1087+
// Note that the Ready.Committed span cannot refer to entries that are also in
1088+
// the Entries slice in the MsgStorageAppend. Raft will not allow unstable
1089+
// entries to be applied.
10831090
// TODO(pav-kv): Reconsider if this can be relaxed.
10841091
//
10851092
// If we disable async storage writes in the future, this property will no
@@ -1097,12 +1104,12 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
10971104
sm := r.getStateMachine()
10981105
dec := r.getDecoder()
10991106
var appTask apply.Task
1100-
if hasMsg(msgStorageApply) {
1101-
r.mu.raftTracer.MaybeTraceApplying(msgStorageApply.Entries)
1107+
if len(toApply) != 0 {
1108+
r.mu.raftTracer.MaybeTraceApplying(toApply)
11021109
appTask = apply.MakeTask(sm, dec)
11031110
appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize)
11041111
defer appTask.Close()
1105-
if err := appTask.Decode(ctx, msgStorageApply.Entries); err != nil {
1112+
if err := appTask.Decode(ctx, toApply); err != nil {
11061113
return stats, err
11071114
}
11081115
if knobs := r.store.TestingKnobs(); knobs == nil || !knobs.DisableCanAckBeforeApplication {
@@ -1265,8 +1272,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
12651272
}
12661273

12671274
stats.tApplicationBegin = crtime.NowMono()
1268-
if hasMsg(msgStorageApply) {
1269-
r.traceEntries(msgStorageApply.Entries, "committed, before applying any entries")
1275+
if len(toApply) != 0 {
1276+
r.traceEntries(toApply, "committed, before applying any entries")
12701277

12711278
err := appTask.ApplyCommittedEntries(ctx)
12721279
stats.apply = sm.moveStats()
@@ -1302,15 +1309,12 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
13021309
refreshReason = reasonNewLeaderOrConfigChange
13031310
}
13041311
}
1305-
1306-
r.mu.raftTracer.MaybeTraceApplied(msgStorageApply.Entries)
1307-
// Send MsgStorageApply's responses.
1308-
r.sendRaftMessages(ctx, msgStorageApply.Responses, nil /* blocked */, true /* willDeliverLocal */)
1312+
r.mu.raftTracer.MaybeTraceApplied(toApply)
13091313
}
13101314
stats.tApplicationEnd = crtime.NowMono()
13111315
applicationElapsed := stats.tApplicationEnd.Sub(stats.tApplicationBegin).Nanoseconds()
13121316
r.store.metrics.RaftApplyCommittedLatency.RecordValue(applicationElapsed)
1313-
r.store.metrics.RaftCommandsApplied.Inc(int64(len(msgStorageApply.Entries)))
1317+
r.store.metrics.RaftCommandsApplied.Inc(int64(len(toApply)))
13141318
if r.store.TestingKnobs().EnableUnconditionalRefreshesInRaftReady {
13151319
refreshReason = reasonNewLeaderOrConfigChange
13161320
}
@@ -1328,6 +1332,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
13281332
r.mu.Lock()
13291333
err = r.withRaftGroupLocked(func(raftGroup *raft.RawNode) (bool, error) {
13301334
r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup)
1335+
raftGroup.AckApplied(toApply)
13311336

13321337
if stats.apply.numConfChangeEntries > 0 {
13331338
// If the raft leader got removed, campaign on the leaseholder. Uses
@@ -1381,26 +1386,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
13811386
return stats, nil
13821387
}
13831388

1384-
// asyncReady encapsulates the messages that are ready to be sent to other peers
1385-
// or to be sent to local storage routines when async storage writes are enabled.
1386-
// All fields in asyncReady are read-only.
1387-
// TODO(nvanbenschoten): move this into go.etcd.io/raft.
1388-
type asyncReady struct {
1389-
// Messages specifies outbound messages to other peers and to local storage
1390-
// threads. These messages can be sent in any order.
1391-
//
1392-
// If it contains a MsgSnap message, the application MUST report back to raft
1393-
// when the snapshot has been received or has failed by calling ReportSnapshot.
1394-
Messages []raftpb.Message
1395-
}
1396-
1397-
// makeAsyncReady constructs an asyncReady from the provided Ready.
1398-
func makeAsyncReady(rd raft.Ready) asyncReady {
1399-
return asyncReady{
1400-
Messages: rd.Messages,
1401-
}
1402-
}
1403-
14041389
// hasMsg returns whether the provided raftpb.Message is present.
14051390
// It serves as a poor man's Optional[raftpb.Message].
14061391
func hasMsg(m raftpb.Message) bool { return m.Type != 0 }
@@ -1409,30 +1394,25 @@ func hasMsg(m raftpb.Message) bool { return m.Type != 0 }
14091394
// message slice and returns them separately.
14101395
func splitLocalStorageMsgs(
14111396
msgs []raftpb.Message,
1412-
) (otherMsgs []raftpb.Message, msgStorageAppend, msgStorageApply raftpb.Message) {
1397+
) (otherMsgs []raftpb.Message, msgStorageAppend raftpb.Message) {
14131398
for i := len(msgs) - 1; i >= 0; i-- {
14141399
switch msgs[i].Type {
14151400
case raftpb.MsgStorageAppend:
14161401
if hasMsg(msgStorageAppend) {
14171402
panic("two MsgStorageAppend")
14181403
}
14191404
msgStorageAppend = msgs[i]
1420-
case raftpb.MsgStorageApply:
1421-
if hasMsg(msgStorageApply) {
1422-
panic("two MsgStorageApply")
1423-
}
1424-
msgStorageApply = msgs[i]
14251405
default:
14261406
// Local storage messages will always be at the end of the messages slice,
14271407
// so we can terminate iteration as soon as we reach any other message
1428-
// type. This is leaking an implementation detail from etcd/raft which may
1408+
// type. This is leaking an implementation detail from pkg/raft which may
14291409
// not always hold, but while it does, we use it for convenience and
14301410
// assert against it changing in sendRaftMessages.
1431-
return msgs[:i+1], msgStorageAppend, msgStorageApply
1411+
return msgs[:i+1], msgStorageAppend
14321412
}
14331413
}
14341414
// Only local storage messages.
1435-
return nil, msgStorageAppend, msgStorageApply
1415+
return nil, msgStorageAppend
14361416
}
14371417

14381418
// maybeFatalOnRaftReadyErr will fatal if err is neither nil nor
@@ -1899,12 +1879,6 @@ func (r *Replica) sendRaftMessages(
18991879
// Instead, we handle messages to LocalAppendThread inline on the raft
19001880
// scheduler goroutine, so this code path is unused.
19011881
panic("unsupported, currently processed inline on raft scheduler goroutine")
1902-
case raft.LocalApplyThread:
1903-
// To local apply thread.
1904-
// NOTE: we don't currently split apply work off into an async goroutine.
1905-
// Instead, we handle messages to LocalAppendThread inline on the raft
1906-
// scheduler goroutine, so this code path is unused.
1907-
panic("unsupported, currently processed inline on raft scheduler goroutine")
19081882
case raftpb.PeerID(r.ReplicaID()):
19091883
// To local raft state machine, from local storage append and apply work.
19101884
// NOTE: For async Raft log appends, these messages come from calls to

pkg/raft/log.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,6 @@ import (
2929
//
3030
// To access it safely, the user must not mutate the underlying raft log storage
3131
// between when the snapshot is obtained and the reads are done.
32-
//
33-
// TODO(pav-kv): this should be part of the Ready API. Instead of pre-fetching
34-
// entries (e.g. the committed entries subject to state machine application),
35-
// allow the application to read them from LogSnapshot in the Ready handler.
36-
// This gives the application direct control on resource allocation, and
37-
// flexibility to do raft log IO without blocking RawNode operation.
3832
type LogSnapshot struct {
3933
// compacted is the compacted log index.
4034
compacted uint64
@@ -291,23 +285,27 @@ func (l *raftLog) hasNextUnstableEnts() bool {
291285
// appended them to the local raft log yet. If allowUnstable is true, committed
292286
// entries from the unstable log may be returned; otherwise, only entries known
293287
// to reside locally on stable storage will be returned.
288+
//
289+
// TODO(pav-kv): only used in tests. Downgrade to a test helper or remove.
294290
func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
295-
lo, hi := l.applying, l.maxAppliableIndex(allowUnstable) // (lo, hi]
296-
if lo >= hi {
297-
// Nothing to apply.
291+
span := l.nextCommittedSpan(allowUnstable)
292+
if span.Empty() {
298293
return nil
299294
}
300-
ents, err := l.slice(lo, hi, l.maxApplyingEntsSize)
295+
ents, err := l.slice(uint64(span.After), uint64(span.Last), l.maxApplyingEntsSize)
301296
if err != nil {
302297
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
303298
}
304299
return ents
305300
}
306301

307-
// hasNextCommittedEnts returns if there is any available entries for execution.
302+
// nextCommittedSpan returns the span of committed entries that can be applied.
308303
// This is a fast check without heavy raftLog.slice() in nextCommittedEnts().
309-
func (l *raftLog) hasNextCommittedEnts(allowUnstable bool) bool {
310-
return l.applying < l.maxAppliableIndex(allowUnstable)
304+
func (l *raftLog) nextCommittedSpan(allowUnstable bool) pb.LogSpan {
305+
return pb.LogSpan{
306+
After: pb.Index(l.applying),
307+
Last: pb.Index(l.maxAppliableIndex(allowUnstable)),
308+
}
311309
}
312310

313311
// maxAppliableIndex returns the maximum committed index that can be applied.
@@ -577,7 +575,15 @@ func (l LogSnapshot) LeadSlice(lo, hi uint64, maxSize uint64) (LeadSlice, error)
577575
}, nil
578576
}
579577

580-
// TODO(pav-kv): return LogSlice.
578+
// Slice returns log entries forming a prefix of the given log span, with the
579+
// total entries size not exceeding maxSize.
580+
//
581+
// Returns at least one entry if the interval contains any. The maxSize can only
582+
// be exceeded if the first entry (span.After+1) is larger.
583+
func (l LogSnapshot) Slice(span pb.LogSpan, maxSize uint64) ([]pb.Entry, error) {
584+
return l.slice(uint64(span.After), uint64(span.Last), entryEncodingSize(maxSize))
585+
}
586+
581587
func (l LogSnapshot) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
582588
if err := l.mustCheckOutOfBounds(lo, hi); err != nil {
583589
return nil, err

0 commit comments

Comments
 (0)