Skip to content

Commit 446c5a1

Browse files
craig[bot]pav-kv
andcommitted
Merge #143705
143705: raft: replace MsgStorageAppend protocol r=tbg a=pav-kv This PR replaces the `MsgStorageAppend` protocol with a more ergonomic / strongly-typed API. Benefits of this change: - 3 fields are removed from `raftpb.Message`, and its in-memory size is decreased from 184 to 144 bytes. - Storage interaction is completely removed from `raftpb.Message`, and there is a cleaner separation between messages and local storage interaction. - Improved readability and safety of the `Ready` API. - Log storage code upstream of `pkg/raft` can use the new `StorageAppend/Ack` types to carry their semantics. --- The old flow: 1. If there are pending storage writes, `RawNode.Ready()` constructs a `MsgStorageAppend` and puts it in `Ready.Messages`. 2. The application filters it out from other messages, and sends it to local storage. 3. When the write is done, the `RawNode` must be notified, and some messages (such as `MsgAppResp`) must be sent to other peers. The `MsgStorageAppend` contains a list of remote and local messages in `Responses`, which includes a `MsgStorageAppendResp`. The application filters out local messages and steps them into the `RawNode`. It sends all other messages via raft transport. --- The new flow: 1. If there are pending storage writes, `RawNode.Ready()` constructs a `StorageAppend` struct and embeds it into `Ready`. 2. All messages in `Ready.Messages` are not subject to durability, so can be sent immediately without filtering. 3. The application sends `Ready.StorageWrite` to local storage. 4. When the write is done/durable, the application routes the `StorageAppend.Ack()` to `RawNode.AckAppend(ack)` method, to notify the local `RawNode`, and sends all remote messages in `ack.Responses` via the raft transport. ---- Part of #143652, #124440 Co-authored-by: Pavel Kalinnikov <[email protected]>
2 parents 835f8e6 + 274928c commit 446c5a1

File tree

79 files changed

+1808
-1958
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+1808
-1958
lines changed

pkg/kv/kvserver/client_manual_proposal_test.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -205,14 +205,13 @@ LIMIT
205205

206206
stats := &logstore.AppendStats{}
207207

208-
msgApp := raftpb.Message{
209-
Type: raftpb.MsgStorageAppend,
210-
To: raft.LocalAppendThread,
211-
Term: lastTerm,
212-
LogTerm: lastTerm,
213-
Index: uint64(lastIndex),
208+
app := raft.StorageAppend{
209+
HardState: raftpb.HardState{
210+
Term: lastTerm,
211+
Commit: uint64(lastIndex) + uint64(len(ents)),
212+
},
214213
Entries: ents,
215-
Commit: uint64(lastIndex) + uint64(len(ents)),
214+
LeadTerm: lastTerm,
216215
Responses: []raftpb.Message{{}}, // need >0 responses so StoreEntries will sync
217216
}
218217

@@ -246,7 +245,7 @@ LIMIT
246245
_, err = ls.StoreEntries(ctx, logstore.RaftState{
247246
LastIndex: lastIndex,
248247
LastTerm: kvpb.RaftTerm(lastTerm),
249-
}, logstore.MakeMsgStorageAppend(msgApp), (*wgSyncCallback)(wg), stats)
248+
}, app, (*wgSyncCallback)(wg), stats)
250249
require.NoError(t, err)
251250
wg.Wait()
252251

@@ -259,7 +258,7 @@ LIMIT
259258
type wgSyncCallback sync.WaitGroup
260259

261260
func (w *wgSyncCallback) OnLogSync(
262-
context.Context, logstore.MsgStorageAppendDone, storage.BatchCommitStats,
261+
context.Context, raft.StorageAppendAck, storage.BatchCommitStats,
263262
) {
264263
(*sync.WaitGroup)(w).Done()
265264
}

pkg/kv/kvserver/client_raft_log_queue_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ func TestRaftTracing(t *testing.T) {
205205
// the ordering may change between 1->2 and 1->3. It should be
206206
// sufficient to just check one of them for tracing.
207207
`replica_raft.* 1->2 MsgApp`,
208-
`replica_raft.* AppendThread->1 MsgStorageAppendResp`,
208+
`replica_raft.* appended entries`,
209+
`replica_raft.* synced log storage write at mark`,
209210
`replica_raft.* applying entries`,
210211
`ack-ing replication success to the client`,
211212
}

pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -432,18 +432,19 @@ type MsgAppSender interface {
432432
func RaftEventFromMsgStorageAppendAndMsgApps(
433433
mode RaftMsgAppMode,
434434
replicaID roachpb.ReplicaID,
435-
appendMsg raftpb.Message,
435+
appendMsg raft.StorageAppend,
436436
outboundMsgs []raftpb.Message,
437437
logSnapshot raft.LogSnapshot,
438438
msgAppScratch map[roachpb.ReplicaID][]raftpb.Message,
439439
replicaStateInfoMap map[roachpb.ReplicaID]ReplicaStateInfo,
440440
) RaftEvent {
441441
event := RaftEvent{
442-
MsgAppMode: mode, LogSnapshot: logSnapshot, ReplicasStateInfo: replicaStateInfoMap}
443-
if appendMsg.Type == raftpb.MsgStorageAppend {
444-
event.Term = appendMsg.LogTerm
445-
event.Snap = appendMsg.Snapshot
446-
event.Entries = appendMsg.Entries
442+
MsgAppMode: mode,
443+
Term: appendMsg.LeadTerm,
444+
Snap: appendMsg.Snapshot,
445+
Entries: appendMsg.Entries,
446+
LogSnapshot: logSnapshot,
447+
ReplicasStateInfo: replicaStateInfoMap,
447448
}
448449
if len(outboundMsgs) == 0 || mode == MsgAppPull {
449450
// MsgAppPull mode can have MsgApps with entries under some cases: (a)

pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1766,14 +1766,11 @@ func testingFirst(args ...interface{}) interface{} {
17661766
func TestRaftEventFromMsgStorageAppendAndMsgAppsBasic(t *testing.T) {
17671767
// raftpb.Entry and raftpb.Message are only partially populated below, which
17681768
// could be improved in the future.
1769-
appendMsg := raftpb.Message{
1770-
Type: raftpb.MsgStorageAppend,
1771-
LogTerm: 10,
1769+
appendMsg := raft.StorageAppend{
1770+
LeadTerm: 10,
17721771
Snapshot: &raftpb.Snapshot{},
17731772
Entries: []raftpb.Entry{
1774-
{
1775-
Term: 9,
1776-
},
1773+
{Term: 9},
17771774
},
17781775
}
17791776
outboundMsgs := []raftpb.Message{
@@ -1822,7 +1819,7 @@ func TestRaftEventFromMsgStorageAppendAndMsgAppsBasic(t *testing.T) {
18221819
checkSnapAndMap(event)
18231820
// Only LogSnapshot and ReplicasStateInfo set.
18241821
event = RaftEventFromMsgStorageAppendAndMsgApps(
1825-
MsgAppPush, 20, raftpb.Message{}, nil, logSnap, msgAppScratch, infoMap)
1822+
MsgAppPush, 20, raft.StorageAppend{}, nil, logSnap, msgAppScratch, infoMap)
18261823
checkSnapAndMap(event)
18271824
event.LogSnapshot = raft.LogSnapshot{}
18281825
event.ReplicasStateInfo = nil

pkg/kv/kvserver/logstore/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ go_test(
6161
"//pkg/kv/kvserver/kvserverpb",
6262
"//pkg/kv/kvserver/raftentry",
6363
"//pkg/kv/kvserver/raftlog",
64+
"//pkg/raft",
6465
"//pkg/raft/raftpb",
6566
"//pkg/roachpb",
6667
"//pkg/settings/cluster",

pkg/kv/kvserver/logstore/logstore.go

Lines changed: 8 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package logstore
88

99
import (
1010
"context"
11-
"fmt"
1211
"math"
1312
"math/rand"
1413
"slices"
@@ -78,70 +77,6 @@ var enableNonBlockingRaftLogSync = settings.RegisterBoolSetting(
7877
var raftLogTruncationClearRangeThreshold = kvpb.RaftIndex(metamorphic.ConstantWithTestRange(
7978
"raft-log-truncation-clearrange-threshold", 100000 /* default */, 1 /* min */, 1e6 /* max */))
8079

81-
// MsgStorageAppend is a raftpb.Message with type MsgStorageAppend.
82-
type MsgStorageAppend raftpb.Message
83-
84-
// MakeMsgStorageAppend constructs a MsgStorageAppend from a raftpb.Message.
85-
func MakeMsgStorageAppend(m raftpb.Message) MsgStorageAppend {
86-
if m.Type != raftpb.MsgStorageAppend {
87-
panic(fmt.Sprintf("unexpected message type %s", m.Type))
88-
}
89-
return MsgStorageAppend(m)
90-
}
91-
92-
// HardState returns the hard state assembled from the message.
93-
func (m *MsgStorageAppend) HardState() raftpb.HardState {
94-
return raftpb.HardState{
95-
Term: m.Term,
96-
Vote: m.Vote,
97-
Commit: m.Commit,
98-
Lead: m.Lead,
99-
LeadEpoch: m.LeadEpoch,
100-
}
101-
}
102-
103-
// MustSync returns true if this storage write must be synced.
104-
func (m *MsgStorageAppend) MustSync() bool {
105-
return len(m.Responses) != 0
106-
}
107-
108-
// OnDone returns the storage write post-processing information.
109-
func (m *MsgStorageAppend) OnDone() MsgStorageAppendDone { return m.Responses }
110-
111-
// MsgStorageAppendDone encapsulates the actions to do after MsgStorageAppend is
112-
// done, such as sending messages back to raft node and its peers.
113-
type MsgStorageAppendDone []raftpb.Message
114-
115-
// Responses returns the messages to send after the write/sync is completed.
116-
func (m MsgStorageAppendDone) Responses() []raftpb.Message { return m }
117-
118-
// Mark returns the LogMark of the raft log in storage after the write/sync is
119-
// completed. Returns zero value if the write does not update the log mark.
120-
func (m MsgStorageAppendDone) Mark() raft.LogMark {
121-
if len(m) == 0 {
122-
return raft.LogMark{}
123-
}
124-
// Optimization: the MsgStorageAppendResp message, if any, is always the last
125-
// one in the list.
126-
// TODO(pav-kv): this is an undocumented API quirk. Refactor the raft write
127-
// API to be more digestible outside the package.
128-
if buildutil.CrdbTestBuild {
129-
for _, msg := range m[:len(m)-1] {
130-
if msg.Type == raftpb.MsgStorageAppendResp {
131-
panic("unexpected MsgStorageAppendResp not in last position")
132-
}
133-
}
134-
}
135-
if msg := m[len(m)-1]; msg.Type != raftpb.MsgStorageAppendResp {
136-
return raft.LogMark{}
137-
} else if msg.Index != 0 {
138-
return raft.LogMark{Term: msg.LogTerm, Index: msg.Index}
139-
} else if msg.Snapshot != nil {
140-
return raft.LogMark{Term: msg.LogTerm, Index: msg.Snapshot.Metadata.Index}
141-
}
142-
return raft.LogMark{}
143-
}
144-
14580
// RaftState stores information about the last entry and the size of the log.
14681
type RaftState struct {
14782
LastIndex kvpb.RaftIndex
@@ -198,7 +133,7 @@ type LogStore struct {
198133
//
199134
// commitStats is populated iff this was a non-blocking sync.
200135
type SyncCallback interface {
201-
OnLogSync(context.Context, MsgStorageAppendDone, storage.BatchCommitStats)
136+
OnLogSync(context.Context, raft.StorageAppendAck, storage.BatchCommitStats)
202137
}
203138

204139
func newStoreEntriesBatch(eng storage.Engine) storage.Batch {
@@ -218,18 +153,18 @@ func newStoreEntriesBatch(eng storage.Engine) storage.Batch {
218153
// Accepts the state of the log before the operation, returns the state after.
219154
// Persists HardState atomically with, or strictly after Entries.
220155
func (s *LogStore) StoreEntries(
221-
ctx context.Context, state RaftState, m MsgStorageAppend, cb SyncCallback, stats *AppendStats,
156+
ctx context.Context, state RaftState, app raft.StorageAppend, cb SyncCallback, stats *AppendStats,
222157
) (RaftState, error) {
223158
batch := newStoreEntriesBatch(s.Engine)
224-
return s.storeEntriesAndCommitBatch(ctx, state, m, cb, stats, batch)
159+
return s.storeEntriesAndCommitBatch(ctx, state, app, cb, stats, batch)
225160
}
226161

227162
// storeEntriesAndCommitBatch is like StoreEntries, but it accepts a
228163
// storage.Batch, which it takes responsibility for committing and closing.
229164
func (s *LogStore) storeEntriesAndCommitBatch(
230165
ctx context.Context,
231166
state RaftState,
232-
m MsgStorageAppend,
167+
m raft.StorageAppend,
233168
cb SyncCallback,
234169
stats *AppendStats,
235170
batch storage.Batch,
@@ -270,7 +205,7 @@ func (s *LogStore) storeEntriesAndCommitBatch(
270205
stats.End = crtime.NowMono()
271206
}
272207

273-
if hs := m.HardState(); !raft.IsEmptyHardState(hs) {
208+
if hs := m.HardState; !raft.IsEmptyHardState(hs) {
274209
// NB: Note that without additional safeguards, it's incorrect to write
275210
// the HardState before appending m.Entries. When catching up, a follower
276211
// will receive Entries that are immediately Committed in the same
@@ -336,7 +271,7 @@ func (s *LogStore) storeEntriesAndCommitBatch(
336271
*waiterCallback = nonBlockingSyncWaiterCallback{
337272
ctx: ctx,
338273
cb: cb,
339-
onDone: m.OnDone(),
274+
onDone: m.Ack(),
340275
batch: batch,
341276
metrics: s.Metrics,
342277
logCommitBegin: stats.PebbleBegin,
@@ -354,7 +289,7 @@ func (s *LogStore) storeEntriesAndCommitBatch(
354289
if wantsSync {
355290
logCommitEnd := stats.PebbleEnd
356291
s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds())
357-
cb.OnLogSync(ctx, m.OnDone(), storage.BatchCommitStats{})
292+
cb.OnLogSync(ctx, m.Ack(), storage.BatchCommitStats{})
358293
}
359294
}
360295
stats.Sync = wantsSync
@@ -406,7 +341,7 @@ type nonBlockingSyncWaiterCallback struct {
406341
// Used to run SyncCallback.
407342
ctx context.Context
408343
cb SyncCallback
409-
onDone MsgStorageAppendDone
344+
onDone raft.StorageAppendAck
410345
// Used to extract stats. This is the batch that has been synced.
411346
batch storage.WriteBatch
412347
// Used to record Metrics.

pkg/kv/kvserver/logstore/logstore_bench_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
17+
"github.com/cockroachdb/cockroach/pkg/raft"
1718
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
1819
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1920
"github.com/cockroachdb/cockroach/pkg/storage"
@@ -33,7 +34,10 @@ func (b *discardBatch) Commit(bool) error {
3334

3435
type noopSyncCallback struct{}
3536

36-
func (noopSyncCallback) OnLogSync(context.Context, MsgStorageAppendDone, storage.BatchCommitStats) {}
37+
func (noopSyncCallback) OnLogSync(
38+
context.Context, raft.StorageAppendAck, storage.BatchCommitStats,
39+
) {
40+
}
3741

3842
func BenchmarkLogStore_StoreEntries(b *testing.B) {
3943
defer log.Scope(b).Close(b)
@@ -96,7 +100,7 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) {
96100
batch := &discardBatch{}
97101
for i := 0; i < b.N; i++ {
98102
batch.Batch = newStoreEntriesBatch(eng)
99-
m := MsgStorageAppend{Entries: ents}
103+
m := raft.StorageAppend{Entries: ents}
100104
cb := noopSyncCallback{}
101105
var err error
102106
rs, err = s.storeEntriesAndCommitBatch(ctx, rs, m, cb, stats, batch)

pkg/kv/kvserver/raft.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ func logRaftReady(ctx context.Context, ready raft.Ready) {
167167
i, raft.DescribeEntry(e, raftEntryFormatter))
168168
}
169169
fmt.Fprintf(&buf, " Committed: %v\n", ready.Committed)
170-
if !raft.IsEmptySnap(ready.Snapshot) {
171-
snap := ready.Snapshot
170+
if ready.Snapshot != nil {
171+
snap := *ready.Snapshot
172172
snap.Data = nil
173173
fmt.Fprintf(&buf, " Snapshot updated: %v\n", snap)
174174
}

pkg/kv/kvserver/rafttrace/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ go_test(
2828
deps = [
2929
"//pkg/kv/kvpb",
3030
"//pkg/kv/kvserver/kvserverpb",
31+
"//pkg/raft",
3132
"//pkg/raft/raftpb",
3233
"//pkg/settings/cluster",
3334
"//pkg/testutils",

0 commit comments

Comments
 (0)