@@ -34,13 +34,10 @@ var ErrStepPeerNotFound = errors.New("raft: cannot step as peer not found")
34
34
// The methods of this struct correspond to the methods of Node and are described
35
35
// more fully there.
36
36
type RawNode struct {
37
- raft * raft
38
- asyncStorageWrites bool
39
-
37
+ raft * raft
40
38
// Mutable fields.
41
- prevSoftSt * SoftState
42
- prevHardSt pb.HardState
43
- stepsOnAdvance []pb.Message
39
+ prevSoftSt * SoftState
40
+ prevHardSt pb.HardState
44
41
}
45
42
46
43
// NewRawNode instantiates a RawNode from the given configuration.
@@ -55,7 +52,9 @@ func NewRawNode(config *Config) (*RawNode, error) {
55
52
rn := & RawNode {
56
53
raft : r ,
57
54
}
58
- rn .asyncStorageWrites = config .AsyncStorageWrites
55
+ if ! config .AsyncStorageWrites {
56
+ panic ("synchronous storage writes are no longer supported" )
57
+ }
59
58
ss := r .softState ()
60
59
rn .prevSoftSt = & ss
61
60
rn .prevHardSt = r .hardState ()
@@ -254,39 +253,12 @@ func (rn *RawNode) Ready() Ready {
254
253
rd .CommittedEntries = entries
255
254
}
256
255
257
- if rn .asyncStorageWrites {
258
- // If async storage writes are enabled, enqueue messages to local storage
259
- // threads, where applicable.
260
- if needStorageAppendMsg (r , rd ) {
261
- rd .Messages = append (rd .Messages , newStorageAppendMsg (r , rd ))
262
- }
263
- if needStorageApplyMsg (rd ) {
264
- rd .Messages = append (rd .Messages , newStorageApplyMsg (r , rd ))
265
- }
266
- } else {
267
- // TODO(pav-kv): remove this branch and synchronous log writes.
268
- if len (rn .stepsOnAdvance ) != 0 {
269
- r .logger .Panicf ("two accepted Ready structs without call to Advance" )
270
- }
271
- // If async storage writes are disabled, immediately enqueue msgsAfterAppend
272
- // to be sent out. The Ready struct contract mandates that Messages cannot
273
- // be sent until after Entries are written to stable storage. Enqueue the
274
- // self-directed messages to be processed after Ready/Advance.
275
- for _ , m := range r .msgsAfterAppend {
276
- if m .To != r .id {
277
- rd .Messages = append (rd .Messages , m )
278
- } else {
279
- rn .stepsOnAdvance = append (rn .stepsOnAdvance , m )
280
- }
281
- }
282
- if needStorageAppendRespMsg (rd ) {
283
- rn .stepsOnAdvance = append (rn .stepsOnAdvance ,
284
- newStorageAppendRespMsg (r , rd ))
285
- }
286
- if needStorageApplyRespMsg (rd ) {
287
- rn .stepsOnAdvance = append (rn .stepsOnAdvance ,
288
- newStorageApplyRespMsg (r , rd .CommittedEntries ))
289
- }
256
+ // For async storage writes, enqueue messages to local storage threads.
257
+ if needStorageAppendMsg (r , rd ) {
258
+ rd .Messages = append (rd .Messages , newStorageAppendMsg (r , rd ))
259
+ }
260
+ if needStorageApplyMsg (rd ) {
261
+ rd .Messages = append (rd .Messages , newStorageApplyMsg (r , rd ))
290
262
}
291
263
r .msgsAfterAppend = nil
292
264
@@ -452,8 +424,7 @@ func newStorageAppendRespMsg(r *raft, rd Ready) pb.Message {
452
424
return m
453
425
}
454
426
455
- func needStorageApplyMsg (rd Ready ) bool { return len (rd .CommittedEntries ) > 0 }
456
- func needStorageApplyRespMsg (rd Ready ) bool { return needStorageApplyMsg (rd ) }
427
+ func needStorageApplyMsg (rd Ready ) bool { return len (rd .CommittedEntries ) > 0 }
457
428
458
429
// newStorageApplyMsg creates the message that should be sent to the local
459
430
// apply thread to instruct it to apply committed log entries. The message
@@ -490,7 +461,7 @@ func newStorageApplyRespMsg(r *raft, ents []pb.Entry) pb.Message {
490
461
// they are known to be committed but before they have been written locally to
491
462
// stable storage.
492
463
func (rn * RawNode ) applyUnstableEntries () bool {
493
- return ! rn . asyncStorageWrites || rn .raft .testingKnobs .ApplyUnstableEntries ()
464
+ return rn .raft .testingKnobs .ApplyUnstableEntries ()
494
465
}
495
466
496
467
// HasReady called when RawNode user need to check if any Ready pending.
@@ -515,26 +486,6 @@ func (rn *RawNode) HasReady() bool {
515
486
return false
516
487
}
517
488
518
- // Advance notifies the RawNode that the application has applied all the updates
519
- // from the last Ready() call. It prepares the node to the next Ready handling
520
- // iteration.
521
- //
522
- // Advance must not be called when using AsyncStorageWrites. Response messages
523
- // from the local append and apply threads take its place.
524
- func (rn * RawNode ) Advance (_ Ready ) {
525
- // The actions performed by this function are encoded into stepsOnAdvance in
526
- // acceptReady. In earlier versions of this library, they were computed from
527
- // the provided Ready struct. Retain the unused parameter for compatibility.
528
- if rn .asyncStorageWrites {
529
- rn .raft .logger .Panicf ("Advance must not be called when using AsyncStorageWrites" )
530
- }
531
- for i , m := range rn .stepsOnAdvance {
532
- _ = rn .raft .Step (m )
533
- rn .stepsOnAdvance [i ] = pb.Message {}
534
- }
535
- rn .stepsOnAdvance = rn .stepsOnAdvance [:0 ]
536
- }
537
-
538
489
// SplitMessages splits the messages in Ready into two buckets:
539
490
//
540
491
// 1. Messages addressed to other peers. Includes both messages that can be
@@ -566,8 +517,8 @@ func SplitMessages(self pb.PeerID, msgs []pb.Message) (send, advance []pb.Messag
566
517
return send , advance
567
518
}
568
519
569
- // AdvanceHack does the same thing as the Advance method, but when asynchronous
570
- // storage writes are enabled .
520
+ // AdvanceHack notifies the RawNode that the application has applied all the
521
+ // updates from the given Ready() call .
571
522
//
572
523
// This is a helper for transitioning from synchronous storage API to the
573
524
// asynchronous one. Tests are being migrated to the async API, and temporarily
@@ -580,9 +531,6 @@ func (rn *RawNode) AdvanceHack(rd Ready) {
580
531
}
581
532
582
533
func (rn * RawNode ) advance (msgs []pb.Message ) {
583
- if ! rn .asyncStorageWrites {
584
- rn .raft .logger .Panicf ("AdvanceHack must be called when using AsyncStorageWrites" )
585
- }
586
534
for _ , msg := range msgs {
587
535
_ = rn .Step (msg )
588
536
}
0 commit comments