Skip to content

Commit 07a10f1

Browse files
committed
fix: hot path dedupe
1 parent 92a0f6e commit 07a10f1

File tree

1 file changed

+63
-7
lines changed

1 file changed

+63
-7
lines changed

raft.go

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,7 @@ type raft struct {
468468
// follower -> highest k they voted for the winner
469469
fastMatchIndex map[uint64]uint64
470470
electionSelf map[uint64][]*pb.EntryRef
471+
lastFastVote map[uint64]string
471472
}
472473

473474
func newRaft(c *Config) *raft {
@@ -503,6 +504,10 @@ func newRaft(c *Config) *raft {
503504
fastOpenIndexWindow: c.FastOpenIndexWindow,
504505
}
505506

507+
if r.enableFastPath {
508+
r.lastFastVote = make(map[uint64]string)
509+
}
510+
506511
traceInitState(r)
507512

508513
lastID := r.raftLog.lastEntryID()
@@ -1079,6 +1084,13 @@ func (r *raft) appliedSnap(snap *pb.Snapshot) {
10791084
if r.lastLeaderIndex > index {
10801085
r.lastLeaderIndex = index
10811086
}
1087+
if r.enableFastPath && r.lastFastVote != nil {
1088+
for k := range r.lastFastVote {
1089+
if k <= index {
1090+
delete(r.lastFastVote, k)
1091+
}
1092+
}
1093+
}
10821094
}
10831095

10841096
// maybeCommit attempts to advance the commit index. Returns true if the commit
@@ -1101,6 +1113,14 @@ func (r *raft) reset(term uint64) {
11011113
r.heartbeatElapsed = 0
11021114
r.resetRandomizedElectionTimeout()
11031115

1116+
if r.lastFastVote == nil {
1117+
r.lastFastVote = make(map[uint64]string)
1118+
} else {
1119+
for k := range r.lastFastVote {
1120+
delete(r.lastFastVote, k)
1121+
}
1122+
}
1123+
11041124
r.abortLeaderTransfer()
11051125

11061126
r.trk.ResetVotes()
@@ -2336,10 +2356,19 @@ func (r *raft) handleAppendEntries(m pb.Message) {
23362356
return
23372357
}
23382358
if mlastIndex, ok := r.raftLog.maybeAppend(a, m.Commit); ok {
2359+
// ❑ Fast-path housekeeping: clear vote-dedupe for indices we just accepted.
2360+
if r.enableFastPath && r.lastFastVote != nil {
2361+
lo := a.prev.index + 1 // first index carried by this MsgApp
2362+
hi := mlastIndex
2363+
for i := lo; i <= hi; i++ {
2364+
delete(r.lastFastVote, i) // free memory and stop considering old votes for these slots
2365+
}
2366+
}
2367+
23392368
if r.enableFastPath && mlastIndex > r.lastLeaderIndex {
23402369
from := m.Index + 1
23412370
to := mlastIndex
2342-
// We only need the highest Leader origin; scan backwards and stop at first match.
2371+
// keep your existing scan to update lastLeaderIndex …
23432372
for i := to; i >= from && i > r.lastLeaderIndex; i-- {
23442373
ents, _ := r.raftLog.slice(i, i+1, noLimit)
23452374
if len(ents) == 1 && (ents[0].Origin == pb.EntryOriginLeader.Enum() || ents[0].Origin == pb.EntryOriginUnknown.Enum()) {
@@ -2348,12 +2377,14 @@ func (r *raft) handleAppendEntries(m pb.Message) {
23482377
}
23492378
if i == 0 {
23502379
break
2351-
} // guard underflow
2380+
}
23522381
}
23532382
}
2383+
23542384
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
23552385
return
23562386
}
2387+
23572388
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
23582389
r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
23592390

@@ -2622,22 +2653,47 @@ func (r *raft) abortLeaderTransfer() {
26222653
}
26232654

26242655
// follower helper called after self-insert
2656+
// follower helper called after self-insert (or on duplicate fast prop)
26252657
func (r *raft) sendFastVoteForIndex(m pb.Message) {
2626-
fmt.Printf("sendFastVoteForIndex: enableFastPath=%v lead=%d len(m.Entries)=%d\n", r.enableFastPath, r.lead, len(m.Entries))
2658+
// REMOVE: fmt.Printf(...) – this is extremely hot.
26272659
if !r.enableFastPath || r.lead == None || len(m.Entries) == 0 {
26282660
return
26292661
}
2662+
2663+
idx := m.Index
2664+
if idx == 0 || idx <= r.raftLog.committed {
2665+
return // never vote for committed/past indices
2666+
}
2667+
// Optional: enforce window on the vote path too.
2668+
win := r.fastOpenIndexWindow
2669+
if win <= 0 {
2670+
win = 1
2671+
}
2672+
if idx > r.raftLog.committed+uint64(win) {
2673+
return
2674+
}
2675+
26302676
cid := m.Entries[0].ContentId
26312677
if len(cid) == 0 {
26322678
return
26332679
}
2680+
key := string(cid)
26342681

2635-
// EntryRef.Index is *uint64 in your generated code — pass a pointer.
2636-
idx := m.Index // local variable so &idx is stable
2637-
ref := &pb.EntryRef{Index: &idx, ContentId: cid}
2682+
// ✅ Deduplicate: one vote per (index, contentId)
2683+
if prev, ok := r.lastFastVote[idx]; ok && prev == key {
2684+
return // already voted for this candidate at this index
2685+
}
2686+
r.lastFastVote[idx] = key
26382687

2639-
// LocalCommit is *uint64 in your generated code — pass a pointer.
2688+
// Build the vote
26402689
lc := r.raftLog.committed
2690+
// NB: in your proto EntryRef.Index is *uint64; use a local var for a stable pointer.
2691+
i := idx
2692+
ref := &pb.EntryRef{
2693+
Index: &i,
2694+
ContentId: cid,
2695+
Origin: pb.EntryOriginSelf.Enum(), // optional but nice
2696+
}
26412697

26422698
r.send(pb.Message{
26432699
To: r.lead,

0 commit comments

Comments
 (0)