Skip to content

Commit f566c0b

Browse files
authored
Merge branch 'main' into ev
2 parents a4d1b42 + 9e08074 commit f566c0b

File tree

2 files changed

+100
-0
lines changed

2 files changed

+100
-0
lines changed

epoch.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2541,6 +2541,7 @@ func (e *Epoch) handleReplicationResponse(resp *ReplicationResponse, from NodeID
25412541
continue
25422542
}
25432543

2544+
validRounds = append(validRounds, data)
25442545
e.replicationState.StoreQuorumRound(data)
25452546
validRounds = append(validRounds, data)
25462547
}

replication_timeout_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,3 +463,102 @@ func TestReplicationRequestWithoutFinalization(t *testing.T) {
463463
laggingNode.e.AdvanceTime(laggingNode.e.StartTime.Add(simplex.DefaultReplicationRequestTimeout * 4))
464464
laggingNode.storage.waitForBlockCommit(endDisconnect - missedSeqs)
465465
}
466+
467+
// TestReplicationMalformedQuorumRound tests that a node resends a replication request when it receives a malformed quorum round message.
468+
func TestReplicationMalformedQuorumRound(t *testing.T) {
469+
nodes := []simplex.NodeID{{1}, {2}, {3}, []byte("lagging")}
470+
startSeq := uint64(8)
471+
472+
// node begins replication
473+
bb := newTestControlledBlockBuilder(t)
474+
net := newInMemNetwork(t, nodes)
475+
476+
storageData := createBlocks(t, nodes, &bb.testBlockBuilder, startSeq)
477+
478+
newNodeConfig := func(from simplex.NodeID) *testNodeConfig {
479+
comm := newTestComm(from, net, rejectReplicationRequests)
480+
return &testNodeConfig{
481+
initialStorage: storageData,
482+
comm: comm,
483+
replicationEnabled: true,
484+
}
485+
}
486+
487+
mf := &testTimeoutMessageFilter{
488+
t: t,
489+
replicationResponses: make(chan struct{}, 1),
490+
}
491+
492+
newSimplexNode(t, nodes[0], net, bb, newNodeConfig(nodes[0]))
493+
normalNode2 := newSimplexNode(t, nodes[1], net, bb, newNodeConfig(nodes[1]))
494+
normalNode2.e.Comm.(*testComm).setFilter(mf.receivedReplicationRequest)
495+
newSimplexNode(t, nodes[2], net, bb, newNodeConfig(nodes[2]))
496+
497+
recordedMessages := make(chan *simplex.Message, 1000)
498+
comm := newTestComm(nodes[3], net, allowAllMessages)
499+
500+
laggingNode := newSimplexNode(t, nodes[3], net, bb, &testNodeConfig{
501+
replicationEnabled: true,
502+
comm: &recordingComm{Communication: comm, SentMessages: recordedMessages},
503+
})
504+
505+
net.startInstances()
506+
bb.triggerNewBlock()
507+
508+
// typically the lagging node would catch up here, but since we block
509+
// replication requests, the lagging node will be forced to resend requests after a timeout
510+
for i := 0; i <= int(startSeq); i++ {
511+
for _, n := range net.instances {
512+
if n.e.ID.Equals(laggingNode.e.ID) {
513+
continue
514+
}
515+
n.storage.waitForBlockCommit(uint64(startSeq))
516+
}
517+
}
518+
519+
<-mf.replicationResponses
520+
521+
// assert the lagging node has not received any replication responses
522+
require.Equal(t, uint64(0), laggingNode.storage.Height())
523+
net.setAllNodesMessageFilter(
524+
func(msg *simplex.Message, _, _ simplex.NodeID) bool {
525+
if msg.VerifiedReplicationResponse != nil || msg.ReplicationResponse != nil {
526+
newData := make([]simplex.VerifiedQuorumRound, 0, len(msg.VerifiedReplicationResponse.Data))
527+
528+
for _, qr := range msg.VerifiedReplicationResponse.Data {
529+
qr.Notarization = nil // remove notarization
530+
qr.Finalization = nil // remove finalization
531+
newData = append(newData, qr)
532+
}
533+
msg.VerifiedReplicationResponse.Data = newData
534+
}
535+
return true
536+
},
537+
)
538+
539+
// after the timeout, only normalNode2 should respond, but with malformed data
540+
laggingNode.e.AdvanceTime(laggingNode.e.StartTime.Add(simplex.DefaultReplicationRequestTimeout / 2))
541+
require.Equal(t, uint64(0), laggingNode.storage.Height())
542+
543+
laggingNode.e.AdvanceTime(laggingNode.e.StartTime.Add(simplex.DefaultReplicationRequestTimeout))
544+
require.Equal(t, uint64(0), laggingNode.storage.Height())
545+
546+
require.Eventually(t, func() bool {
547+
msg, ok := <-recordedMessages
548+
if !ok {
549+
return false
550+
}
551+
552+
if msg.ReplicationRequest == nil {
553+
return false
554+
}
555+
556+
return reflect.DeepEqual(msg.ReplicationRequest.Seqs, []uint64{3, 4, 5})
557+
558+
}, 30*time.Second, 10*time.Millisecond)
559+
560+
net.setAllNodesMessageFilter(allowAllMessages)
561+
// timeout again, now all nodes will respond
562+
laggingNode.e.AdvanceTime(laggingNode.e.StartTime.Add(simplex.DefaultReplicationRequestTimeout * 2))
563+
laggingNode.storage.waitForBlockCommit(startSeq)
564+
}

0 commit comments

Comments
 (0)