Skip to content

Commit 2295fdf

Browse files
committed
fix: pre-cache leader payload
1 parent 6e89d6c commit 2295fdf

File tree

2 files changed

+112
-0
lines changed

2 files changed

+112
-0
lines changed

fastpath_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,3 +320,92 @@ func TestLeaderFastPropFallbackInstallsLeaderPayloadAndClearsCache(t *testing.T)
320320
t.Fatalf("log at k=%d not leader-approved payload: %+v", k, ents)
321321
}
322322
}
323+
324+
func TestLeaderCachesAtLeaderK_IgnoresMsgIndex(t *testing.T) {
325+
cfg := baseConfigFast(1) // or your helper
326+
rl := newRaft(cfg)
327+
328+
primeSingleVoter(rl, rl.id)
329+
rl.becomeCandidate()
330+
rl.becomeLeader()
331+
mustAppendCommitted(rl, 3) // committed = 3
332+
333+
// Send a fast-prop from a non-leader sender; leader caches but does not fallback.
334+
msg := pb.Message{
335+
Type: pb.MsgFastProp,
336+
From: 2, // not the leader
337+
Index: 999, // ignored by leader
338+
Entries: []pb.Entry{{
339+
Type: pb.EntryNormal,
340+
Data: []byte("payload-A"),
341+
ContentId: []byte("cid-A"),
342+
}},
343+
}
344+
if err := rl.Step(msg); err != nil {
345+
t.Fatalf("leader step: %v", err)
346+
}
347+
348+
k := rl.raftLog.committed + 1 // leader buckets at current k
349+
bucket := rl.proposalCache[k]
350+
if bucket == nil {
351+
t.Fatalf("cache bucket missing at k=%d", k)
352+
}
353+
e, ok := bucket["cid-A"]
354+
if !ok || string(e.Data) != "payload-A" {
355+
t.Fatalf("cache miss or wrong payload: ok=%v data=%q", ok, e.Data)
356+
}
357+
}
358+
359+
func TestLeaderFallback_InstallsLeaderPayload_AndClearsCache(t *testing.T) {
360+
cfg := baseConfigFast(1)
361+
rl := newRaft(cfg)
362+
363+
primeSingleVoter(rl, rl.id)
364+
rl.becomeCandidate()
365+
rl.becomeLeader()
366+
mustAppendCommitted(rl, 5) // committed = 5, so k=6
367+
368+
// Pre-arrival non-leader proposal to simulate concurrent traffic
369+
nonLeader := pb.Message{
370+
Type: pb.MsgFastProp,
371+
From: 2, // follower
372+
Index: 0, // ignored by leader now
373+
Entries: []pb.Entry{{
374+
Type: pb.EntryNormal,
375+
Data: []byte("follower-payload"),
376+
ContentId: []byte("cid-X"),
377+
}},
378+
}
379+
if err := rl.Step(nonLeader); err != nil {
380+
t.Fatalf("leader step (non-leader): %v", err)
381+
}
382+
383+
// Pre-cache leader payload (what etcd does immediately after ProposeFast).
384+
rl.CacheLeaderFastPayload([]byte("leader-payload"), []byte("cid-X"))
385+
386+
// Now send leader’s own fast-prop; fallback should install at k and clear cache.
387+
self := pb.Message{
388+
Type: pb.MsgFastProp,
389+
From: None, // local self-prop; normalized to r.id
390+
Index: 0,
391+
Entries: []pb.Entry{{
392+
Type: pb.EntryNormal,
393+
Data: []byte("leader-payload"), // same logical content
394+
ContentId: []byte("cid-X"),
395+
}},
396+
}
397+
if err := rl.Step(self); err != nil {
398+
t.Fatalf("leader step (self): %v", err)
399+
}
400+
401+
k := rl.raftLog.lastIndex() // single-node: append advances lastIndex == k
402+
// Cache should be cleared after installing leader-approved decision
403+
if rl.proposalCache[k] != nil {
404+
t.Fatalf("expected proposalCache[%d] to be cleared", k)
405+
}
406+
// Log[k] must be the leader payload
407+
ents, _ := rl.raftLog.slice(k, k+1, noLimit)
408+
if len(ents) != 1 || string(ents[0].Data) != "leader-payload" || getOrigin(&ents[0]) != pb.EntryOriginLeader {
409+
t.Fatalf("log at k=%d not leader-approved payload: %+v", k, ents)
410+
}
411+
}

raft.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,29 @@ func setOrigin(e *pb.Entry, o pb.EntryOrigin) {
552552
e.Origin = &o
553553
}
554554

555+
// CacheLeaderFastPayload pre-loads the leader's payload for the current k into proposalCache.
556+
// It’s safe to call only when this node is the leader and fast path is enabled.
557+
func (r *raft) CacheLeaderFastPayload(data, cid []byte) {
558+
if !r.enableFastPath || r.state != StateLeader {
559+
return
560+
}
561+
k := r.raftLog.committed + 1 // leader-stamped k
562+
if r.proposalCache == nil {
563+
r.proposalCache = make(map[uint64]map[string]pb.Entry)
564+
}
565+
bucket := r.proposalCache[k]
566+
if bucket == nil {
567+
bucket = make(map[string]pb.Entry)
568+
r.proposalCache[k] = bucket
569+
}
570+
bucket[string(cid)] = pb.Entry{
571+
Type: pb.EntryNormal,
572+
Data: data,
573+
ContentId: cid,
574+
Origin: pb.EntryOriginLeader.Enum(),
575+
}
576+
}
577+
555578
func (r *raft) recomputeLastLeaderIndex() {
556579
fi := r.raftLog.firstIndex()
557580
li := r.raftLog.lastIndex()

0 commit comments

Comments
 (0)