Skip to content

Commit 36834ab

Browse files
committed
raft: remove AsyncStorageWrites flag
Epic: none Release note: none
1 parent 82deded commit 36834ab

File tree

10 files changed

+32
-42
lines changed

10 files changed

+32
-42
lines changed

pkg/kv/kvserver/replica_raft.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,9 +1079,10 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
10791079
//
10801080
// Note that the Entries slice in the MsgStorageApply cannot refer to entries
10811081
// that are also in the Entries slice in the MsgStorageAppend. Raft will not
1082-
// allow unstable entries to be applied when AsyncStorageWrites is enabled.
1082+
// allow unstable entries to be applied.
1083+
// TODO(pav-kv): Reconsider if this can be relaxed.
10831084
//
1084-
// If we disable AsyncStorageWrites in the future, this property will no
1085+
// If we disable async storage writes in the future, this property will no
10851086
// longer be true, and the two slices could overlap. For example, this can
10861087
// happen when a follower is being caught up on committed commands. We could
10871088
// acknowledge these commands early even though they aren't durably in the

pkg/kv/kvserver/store.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,6 @@ func newRaftConfig(
405405
return &raft.Config{
406406
ID: id,
407407
Applied: uint64(appliedIndex),
408-
AsyncStorageWrites: true,
409408
ElectionTick: storeCfg.RaftElectionTimeoutTicks,
410409
ElectionJitterTick: storeCfg.RaftElectionTimeoutJitterTicks,
411410
HeartbeatTick: storeCfg.RaftHeartbeatIntervalTicks,

pkg/raft/node.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ type Ready struct {
9797

9898
// MustSync indicates whether the HardState and Entries must be durably
9999
// written to disk or if a non-durable write is permissible.
100+
//
101+
// TODO(pav-kv): This flag isn't used at the moment, and the user code
102+
// determines MustSync from the content of Messages. Make the API explicit.
100103
MustSync bool
101104
}
102105

pkg/raft/node_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,6 @@ func TestNodeStart(t *testing.T) {
297297
ElectionJitterTick: 10,
298298
HeartbeatTick: 1,
299299
Storage: storage,
300-
AsyncStorageWrites: true,
301300
MaxSizePerMsg: noLimit,
302301
MaxInflightMsgs: 256,
303302
StoreLiveness: raftstoreliveness.AlwaysLive{},
@@ -369,7 +368,6 @@ func TestNodeRestart(t *testing.T) {
369368
ElectionJitterTick: 10,
370369
HeartbeatTick: 1,
371370
Storage: storage,
372-
AsyncStorageWrites: true,
373371
MaxSizePerMsg: noLimit,
374372
MaxInflightMsgs: 256,
375373
StoreLiveness: raftstoreliveness.AlwaysLive{},
@@ -422,7 +420,6 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
422420
ElectionJitterTick: 10,
423421
HeartbeatTick: 1,
424422
Storage: s,
425-
AsyncStorageWrites: true,
426423
MaxSizePerMsg: noLimit,
427424
MaxInflightMsgs: 256,
428425
StoreLiveness: raftstoreliveness.AlwaysLive{},
@@ -449,7 +446,6 @@ func TestNodeAdvance(t *testing.T) {
449446
ElectionJitterTick: 10,
450447
HeartbeatTick: 1,
451448
Storage: storage,
452-
AsyncStorageWrites: true,
453449
MaxSizePerMsg: noLimit,
454450
MaxInflightMsgs: 256,
455451
StoreLiveness: raftstoreliveness.AlwaysLive{},
@@ -554,7 +550,6 @@ func TestNodeProposeAddLearnerNode(t *testing.T) {
554550
func TestAppendPagination(t *testing.T) {
555551
const maxSizePerMsg = 2048
556552
n := newNetworkWithConfig(func(c *Config) {
557-
c.AsyncStorageWrites = true
558553
c.MaxSizePerMsg = maxSizePerMsg
559554
}, nil, nil, nil)
560555

pkg/raft/raft.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ const (
4646
None pb.PeerID = 0
4747
// LocalAppendThread is a reference to a local thread that saves unstable
4848
// log entries and snapshots to stable storage. The identifier is used as a
49-
// target for MsgStorageAppend messages when AsyncStorageWrites is enabled.
49+
// target for MsgStorageAppend messages.
5050
LocalAppendThread pb.PeerID = math.MaxUint64
5151
// LocalApplyThread is a reference to a local thread that applies committed
5252
// log entries to the local state machine. The identifier is used as a
53-
// target for MsgStorageApply messages when AsyncStorageWrites is enabled.
53+
// target for MsgStorageApply messages.
5454
LocalApplyThread pb.PeerID = math.MaxUint64 - 1
5555
)
5656

@@ -159,9 +159,9 @@ type Config struct {
159159
// threads are not responsible for understanding the response messages, only
160160
// for delivering them to the correct target after performing the storage
161161
// write.
162-
// TODO(#129411): deprecate !AsyncStorageWrites mode as it's not used in
163-
// CRDB.
164-
AsyncStorageWrites bool
162+
// TODO(pav-kv): this comment is a remnant of the AsyncStorageWrites option,
163+
// which is now implicitly always true. Move the comment to a better place.
164+
165165
// LazyReplication instructs raft to hold off constructing MsgApp messages
166166
// eagerly in reaction to Step() calls.
167167
//
@@ -185,7 +185,8 @@ type Config struct {
185185
//
186186
// Despite its name (preserved for compatibility), this quota applies across
187187
// Ready structs to encompass all outstanding entries in unacknowledged
188-
// MsgStorageApply messages when AsyncStorageWrites is enabled.
188+
// MsgStorageApply messages.
189+
// TODO(pav-kv): make the name better.
189190
MaxCommittedSizePerReady uint64
190191
// MaxUncommittedEntriesSize limits the aggregate byte size of the
191192
// uncommitted entries that may be appended to a leader's log. Once this
@@ -558,13 +559,11 @@ func (r *raft) hardState() pb.HardState {
558559
// next Ready handling cycle, except in one condition below.
559560
//
560561
// Certain message types are scheduled for being sent *after* the unstable state
561-
// is durably persisted in storage. If AsyncStorageWrites config flag is true,
562-
// the responsibility of upholding this condition is on the application, so the
563-
// message will be handed over via the next Ready as usually; if false, the
564-
// message will skip one Ready handling cycle, and will be sent after the
565-
// application has persisted the state.
566-
//
567-
// TODO(pav-kv): remove this special case after !AsyncStorageWrites is removed.
562+
// is durably persisted in storage. These messages are nevertheless included in
563+
// Ready.Messages, and the responsibility of upholding this condition is on the
564+
// application.
565+
// TODO(pav-kv): make this requirement explicit in the API, instead of mixing
566+
// the two kinds of messages together.
568567
func (r *raft) send(m pb.Message) {
569568
if m.From == None {
570569
m.From = r.id

pkg/raft/raft_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5455,7 +5455,6 @@ func newTestConfig(
54555455
ElectionJitterTick: election,
54565456
HeartbeatTick: heartbeat,
54575457
Storage: storage,
5458-
AsyncStorageWrites: true,
54595458
MaxSizePerMsg: noLimit,
54605459
MaxInflightMsgs: 256,
54615460
StoreLiveness: storeLiveness,

pkg/raft/rafttest/interaction_env_handler_add_nodes.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e
3535
n := firstAsInt(t, d)
3636
var snap pb.Snapshot
3737
cfg := raftConfigStub()
38+
// NB: the datadriven tests use the async storage API, but have an option to
39+
// sync writes immediately in imitation of the previous synchronous API.
40+
var asyncWrites bool
41+
3842
for _, arg := range d.CmdArgs[1:] {
3943
for i := range arg.Vals {
4044
switch arg.Key {
@@ -56,7 +60,7 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e
5660
case "content":
5761
arg.Scan(t, i, &snap.Data)
5862
case "async-storage-writes":
59-
arg.Scan(t, i, &cfg.AsyncStorageWrites)
63+
arg.Scan(t, i, &asyncWrites)
6064
case "lazy-replication":
6165
arg.Scan(t, i, &cfg.LazyReplication)
6266
case "prevote":
@@ -81,7 +85,7 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e
8185
}
8286
}
8387
}
84-
return env.AddNodes(n, cfg, snap)
88+
return env.AddNodes(n, cfg, snap, asyncWrites)
8589
}
8690

8791
type snapOverrideStorage struct {
@@ -100,7 +104,9 @@ var _ raft.Storage = snapOverrideStorage{}
100104

101105
// AddNodes adds n new nodes initialized from the given snapshot (which may be
102106
// empty), and using the cfg as template. They will be assigned consecutive IDs.
103-
func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) error {
107+
func (env *InteractionEnv) AddNodes(
108+
n int, cfg raft.Config, snap pb.Snapshot, asyncWrites bool,
109+
) error {
104110
bootstrap := !reflect.DeepEqual(snap, pb.Snapshot{})
105111
for i := 0; i < n; i++ {
106112
id := pb.PeerID(1 + len(env.Nodes))
@@ -144,11 +150,6 @@ func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) er
144150
cfg.CRDBVersion = cluster.MakeTestingClusterSettings().Version
145151
}
146152

147-
// NB: the datadriven tests use the async storage API, but have an option to
148-
// sync writes immediately in imitation of the previous synchronous API.
149-
asyncWrites := cfg.AsyncStorageWrites
150-
cfg.AsyncStorageWrites = true
151-
152153
cfg.StoreLiveness = newStoreLiveness(env.Fabric, id)
153154

154155
cfg.Metrics = raft.NewMetrics()

pkg/raft/rafttest/interaction_env_handler_process_ready.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,6 @@ func (env *InteractionEnv) ProcessReady(idx int) error {
7171
return nil
7272
}
7373

74-
if !n.Config.AsyncStorageWrites {
75-
panic("expected AsyncStorageWrites")
76-
}
7774
env.Output.WriteString(raft.DescribeReady(rd, defaultEntryFormatter))
7875

7976
for _, m := range rd.Messages {

pkg/raft/rafttest/node.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ func startNode(id raftpb.PeerID, peers []raft.Peer, iface iface) *node {
5353
ElectionJitterTick: 50,
5454
HeartbeatTick: 1,
5555
Storage: st,
56-
AsyncStorageWrites: true,
5756
MaxSizePerMsg: 1024 * 1024,
5857
MaxInflightMsgs: 256,
5958
MaxUncommittedEntriesSize: 1 << 30,
@@ -216,7 +215,6 @@ func (n *node) restart() {
216215
ElectionJitterTick: 10,
217216
HeartbeatTick: 1,
218217
Storage: n.storage,
219-
AsyncStorageWrites: true,
220218
MaxSizePerMsg: 1024 * 1024,
221219
MaxInflightMsgs: 256,
222220
MaxUncommittedEntriesSize: 1 << 30,

pkg/raft/rawnode.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,6 @@ func NewRawNode(config *Config) (*RawNode, error) {
5252
rn := &RawNode{
5353
raft: r,
5454
}
55-
if !config.AsyncStorageWrites {
56-
panic("synchronous storage writes are no longer supported")
57-
}
5855
ss := r.softState()
5956
rn.prevSoftSt = &ss
6057
rn.prevHardSt = r.hardState()
@@ -267,7 +264,9 @@ func (rn *RawNode) Ready() Ready {
267264

268265
// MustSync returns true if the hard state and count of Raft entries indicate
269266
// that a synchronous write to persistent storage is required.
270-
// NOTE: MustSync isn't used under AsyncStorageWrites mode.
267+
//
268+
// TODO(pav-kv): MustSync isn't used, because all writes are asynchronous.
269+
// Remove this, or repurpose it to fit the asynchronous writes API.
271270
func MustSync(st, prevst pb.HardState, entsnum int) bool {
272271
// Persistent state on all servers:
273272
// (Updated on stable storage before responding to RPCs)
@@ -299,8 +298,7 @@ func needStorageAppendRespMsg(rd Ready) bool {
299298
// newStorageAppendMsg creates the message that should be sent to the local
300299
// append thread to instruct it to append log entries, write an updated hard
301300
// state, and apply a snapshot. The message also carries a set of responses
302-
// that should be delivered after the rest of the message is processed. Used
303-
// with AsyncStorageWrites.
301+
// that should be delivered after the rest of the message is processed.
304302
func newStorageAppendMsg(r *raft, rd Ready) pb.Message {
305303
m := pb.Message{
306304
Type: pb.MsgStorageAppend,
@@ -429,7 +427,7 @@ func needStorageApplyMsg(rd Ready) bool { return len(rd.CommittedEntries) > 0 }
429427
// newStorageApplyMsg creates the message that should be sent to the local
430428
// apply thread to instruct it to apply committed log entries. The message
431429
// also carries a response that should be delivered after the rest of the
432-
// message is processed. Used with AsyncStorageWrites.
430+
// message is processed.
433431
func newStorageApplyMsg(r *raft, rd Ready) pb.Message {
434432
ents := rd.CommittedEntries
435433
return pb.Message{

0 commit comments

Comments
 (0)