Skip to content

Commit 8f68520

Browse files
authored
Add replication support for lagging nodes (#70)
* added replicatoin * separate out replication state * all tests working --------- Signed-off-by: Sam Liokumovich <65994425+samliok@users.noreply.github.com>
1 parent 0136c8e commit 8f68520

File tree

9 files changed

+743
-126
lines changed

9 files changed

+743
-126
lines changed

epoch.go

Lines changed: 279 additions & 67 deletions
Large diffs are not rendered by default.

epoch_multinode_test.go

Lines changed: 95 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,43 +20,74 @@ import (
2020
func TestSimplexMultiNodeSimple(t *testing.T) {
2121
bb := newTestControlledBlockBuilder(t)
2222

23-
var net inMemNetwork
24-
net.nodes = []NodeID{{1}, {2}, {3}, {4}}
25-
26-
n1 := newSimplexNode(t, 1, &net, bb)
27-
n2 := newSimplexNode(t, 2, &net, bb)
28-
n3 := newSimplexNode(t, 3, &net, bb)
29-
n4 := newSimplexNode(t, 4, &net, bb)
23+
nodes := []NodeID{{1}, {2}, {3}, {4}}
24+
net := newInMemNetwork(t, nodes)
25+
newSimplexNode(t, nodes[0], net, bb, false)
26+
newSimplexNode(t, nodes[1], net, bb, false)
27+
newSimplexNode(t, nodes[2], net, bb, false)
28+
newSimplexNode(t, nodes[3], net, bb, false)
3029

3130
bb.triggerNewBlock()
3231

33-
instances := []*testInstance{n4, n3, n2, n1}
34-
35-
for _, n := range instances {
36-
n.start()
37-
}
32+
net.startInstances()
3833

3934
for seq := 0; seq < 10; seq++ {
40-
for _, n := range instances {
41-
n.ledger.waitForBlockCommit(uint64(seq))
35+
for _, n := range net.instances {
36+
n.storage.waitForBlockCommit(uint64(seq))
4237
}
4338
bb.triggerNewBlock()
4439
}
4540
}
4641

47-
func (t *testInstance) start() {
42+
func (t *testNode) start() {
4843
go t.handleMessages()
4944
require.NoError(t.t, t.e.Start())
5045
}
5146

52-
func newSimplexNode(t *testing.T, id uint8, net *inMemNetwork, bb BlockBuilder) *testInstance {
53-
l := testutil.MakeLogger(t, int(id))
54-
storage := newInMemStorage()
47+
func newSimplexNodeWithStorage(t *testing.T, nodeID NodeID, net *inMemNetwork, bb BlockBuilder, storage []FinalizedBlock) *testNode {
48+
wal := newTestWAL(t)
49+
conf := defaultTestNodeEpochConfig(t, nodeID, net, wal, bb, true)
50+
for _, data := range storage {
51+
conf.Storage.Index(data.Block, data.FCert)
52+
}
53+
e, err := NewEpoch(conf)
54+
require.NoError(t, err)
55+
ti := &testNode{
56+
wal: wal,
57+
e: e,
58+
t: t,
59+
storage: conf.Storage.(*InMemStorage),
60+
ingress: make(chan struct {
61+
msg *Message
62+
from NodeID
63+
}, 100)}
5564

56-
nodeID := NodeID{id}
65+
net.addNode(ti)
66+
return ti
67+
}
5768

69+
func newSimplexNode(t *testing.T, nodeID NodeID, net *inMemNetwork, bb BlockBuilder, replicationEnabled bool) *testNode {
5870
wal := newTestWAL(t)
71+
conf := defaultTestNodeEpochConfig(t, nodeID, net, wal, bb, replicationEnabled)
72+
e, err := NewEpoch(conf)
73+
require.NoError(t, err)
74+
ti := &testNode{
75+
wal: wal,
76+
e: e,
77+
t: t,
78+
storage: conf.Storage.(*InMemStorage),
79+
ingress: make(chan struct {
80+
msg *Message
81+
from NodeID
82+
}, 100)}
83+
84+
net.addNode(ti)
85+
return ti
86+
}
5987

88+
func defaultTestNodeEpochConfig(t *testing.T, nodeID NodeID, net *inMemNetwork, wal WriteAheadLog, bb BlockBuilder, replicationEnabled bool) EpochConfig {
89+
l := testutil.MakeLogger(t, int(nodeID[0]))
90+
storage := newInMemStorage()
6091
conf := EpochConfig{
6192
MaxProposalWait: DefaultMaxProposalWaitTime,
6293
Comm: &testComm{
@@ -71,29 +102,16 @@ func newSimplexNode(t *testing.T, id uint8, net *inMemNetwork, bb BlockBuilder)
71102
Storage: storage,
72103
BlockBuilder: bb,
73104
SignatureAggregator: &testSignatureAggregator{},
105+
BlockDeserializer: &blockDeserializer{},
106+
QCDeserializer: &testQCDeserializer{t: t},
107+
ReplicationEnabled: replicationEnabled,
74108
}
75-
76-
e, err := NewEpoch(conf)
77-
require.NoError(t, err)
78-
79-
ti := &testInstance{
80-
wal: wal,
81-
e: e,
82-
t: t,
83-
ledger: storage,
84-
ingress: make(chan struct {
85-
msg *Message
86-
from NodeID
87-
}, 100)}
88-
89-
net.instances = append(net.instances, ti)
90-
91-
return ti
109+
return conf
92110
}
93111

94-
type testInstance struct {
112+
type testNode struct {
95113
wal *testWAL
96-
ledger *InMemStorage
114+
storage *InMemStorage
97115
e *Epoch
98116
ingress chan struct {
99117
msg *Message
@@ -102,13 +120,13 @@ type testInstance struct {
102120
t *testing.T
103121
}
104122

105-
func (t *testInstance) HandleMessage(msg *Message, from NodeID) error {
123+
func (t *testNode) HandleMessage(msg *Message, from NodeID) error {
106124
err := t.e.HandleMessage(msg, from)
107125
require.NoError(t.t, err)
108126
return err
109127
}
110128

111-
func (t *testInstance) handleMessages() {
129+
func (t *testNode) handleMessages() {
112130
for msg := range t.ingress {
113131
err := t.HandleMessage(msg.msg, msg.from)
114132
require.NoError(t.t, err)
@@ -225,10 +243,46 @@ func (c *testComm) Broadcast(msg *Message) {
225243
}
226244

227245
type inMemNetwork struct {
246+
t *testing.T
228247
nodes []NodeID
229-
instances []*testInstance
248+
instances []*testNode
249+
}
250+
251+
// newInMemNetwork creates an in-memory network. Node IDs must be provided before
252+
// adding instances, as nodes require prior knowledge of all participants.
253+
func newInMemNetwork(t *testing.T, nodes []NodeID) *inMemNetwork {
254+
net := &inMemNetwork{
255+
t: t,
256+
nodes: nodes,
257+
instances: make([]*testNode, 0),
258+
}
259+
return net
260+
}
261+
262+
func (n *inMemNetwork) addNode(node *testNode) {
263+
allowed := false
264+
for _, id := range n.nodes {
265+
if bytes.Equal(id, node.e.ID) {
266+
allowed = true
267+
break
268+
}
269+
}
270+
require.True(node.t, allowed, "node must be declared before adding")
271+
n.instances = append(n.instances, node)
272+
}
273+
274+
// startInstances starts all instances in the network.
275+
// The first one is typically the leader, so we make sure to start it last.
276+
func (n *inMemNetwork) startInstances() {
277+
require.Equal(n.t, len(n.nodes), len(n.instances))
278+
279+
for i := len(n.nodes) - 1; i >= 0; i-- {
280+
n.instances[i].start()
281+
}
230282
}
231283

284+
// testControlledBlockBuilder is a test block builder that blocks
285+
// block building until a trigger is received
232286
type testControlledBlockBuilder struct {
233287
t *testing.T
234288
control chan struct{}

msg.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ type Message struct {
1717
Notarization *Notarization
1818
Finalization *Finalization
1919
FinalizationCertificate *FinalizationCertificate
20+
ReplicationResponse *ReplicationResponse
21+
ReplicationRequest *ReplicationRequest
2022
}
2123

2224
type ToBeSignedEmptyVote struct {
@@ -206,3 +208,25 @@ type QuorumCertificate interface {
206208
// Bytes returns a raw representation of the given QuorumCertificate.
207209
Bytes() []byte
208210
}
211+
212+
type ReplicationRequest struct {
213+
FinalizationCertificateRequest *FinalizationCertificateRequest
214+
}
215+
216+
type ReplicationResponse struct {
217+
FinalizationCertificateResponse *FinalizationCertificateResponse
218+
}
219+
220+
// request a finalization certificate for the given sequence number
221+
type FinalizationCertificateRequest struct {
222+
Sequences []uint64
223+
}
224+
225+
type FinalizedBlock struct {
226+
Block Block
227+
FCert FinalizationCertificate
228+
}
229+
230+
type FinalizationCertificateResponse struct {
231+
Data []FinalizedBlock
232+
}

record_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,28 @@ import (
1111
"github.com/stretchr/testify/require"
1212
)
1313

14-
func newNotarizationRecord(logger simplex.Logger, signatureAggregator simplex.SignatureAggregator, block simplex.Block, ids []simplex.NodeID) ([]byte, error) {
14+
func newNotarization(logger simplex.Logger, signatureAggregator simplex.SignatureAggregator, block simplex.Block, ids []simplex.NodeID) (simplex.Notarization, error) {
1515
votesForCurrentRound := make(map[string]*simplex.Vote)
1616
for _, id := range ids {
1717
vote, err := newTestVote(block, id)
1818
if err != nil {
19-
return nil, err
19+
return simplex.Notarization{}, err
2020
}
21+
2122
votesForCurrentRound[string(id)] = vote
2223
}
2324

2425
notarization, err := simplex.NewNotarization(logger, signatureAggregator, votesForCurrentRound, block.BlockHeader())
26+
return notarization, err
27+
}
28+
29+
func newNotarizationRecord(logger simplex.Logger, signatureAggregator simplex.SignatureAggregator, block simplex.Block, ids []simplex.NodeID) ([]byte, error) {
30+
notarization, err := newNotarization(logger, signatureAggregator, block, ids)
2531
if err != nil {
2632
return nil, err
2733
}
2834

2935
record := simplex.NewQuorumRecord(notarization.QC.Bytes(), notarization.Vote.Bytes(), record.NotarizationRecordType)
30-
3136
return record, nil
3237
}
3338

replication.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package simplex
5+
6+
import (
7+
"bytes"
8+
"fmt"
9+
"math"
10+
11+
"go.uber.org/zap"
12+
)
13+
14+
type ReplicationState struct {
15+
logger Logger
16+
enabled bool
17+
maxRoundWindow uint64
18+
comm Communication
19+
id NodeID
20+
21+
// latest seq requested
22+
lastSequenceRequested uint64
23+
24+
// highest sequence we have received a finalization certificate for
25+
highestFCertReceived *FinalizationCertificate
26+
27+
// received
28+
receivedFinalizationCertificates map[uint64]FinalizedBlock
29+
}
30+
31+
func NewReplicationState(logger Logger, comm Communication, id NodeID, maxRoundWindow uint64, enabled bool) *ReplicationState {
32+
return &ReplicationState{
33+
logger: logger,
34+
enabled: enabled,
35+
comm: comm,
36+
id: id,
37+
maxRoundWindow: maxRoundWindow,
38+
receivedFinalizationCertificates: make(map[uint64]FinalizedBlock),
39+
}
40+
}
41+
42+
func (r *ReplicationState) collectFutureFinalizationCertificates(fCert *FinalizationCertificate, currentRound uint64, nextSeqToCommit uint64) {
43+
if !r.enabled {
44+
return
45+
}
46+
fCertRound := fCert.Finalization.Round
47+
// Don't exceed the max round window
48+
endSeq := math.Min(float64(fCertRound), float64(r.maxRoundWindow+currentRound))
49+
if r.highestFCertReceived == nil || fCertRound > r.highestFCertReceived.Finalization.Seq {
50+
r.highestFCertReceived = fCert
51+
}
52+
// Node is behind, but we've already sent messages to collect future fCerts
53+
if r.lastSequenceRequested >= uint64(endSeq) {
54+
return
55+
}
56+
57+
startSeq := math.Max(float64(nextSeqToCommit), float64(r.lastSequenceRequested))
58+
r.logger.Debug("Node is behind, requesting missing finalization certificates", zap.Uint64("round", fCertRound), zap.Uint64("startSeq", uint64(startSeq)), zap.Uint64("endSeq", uint64(endSeq)))
59+
r.sendFutureCertficatesRequests(uint64(startSeq), uint64(endSeq))
60+
}
61+
62+
// sendFutureCertficatesRequests sends requests for future finalization certificates for the
63+
// range of sequences [start, end] <- inclusive
64+
func (r *ReplicationState) sendFutureCertficatesRequests(start uint64, end uint64) {
65+
seqs := make([]uint64, (end+1)-start)
66+
for i := start; i <= end; i++ {
67+
seqs[i-start] = i
68+
}
69+
70+
roundRequest := &ReplicationRequest{
71+
FinalizationCertificateRequest: &FinalizationCertificateRequest{
72+
Sequences: seqs,
73+
},
74+
}
75+
msg := &Message{ReplicationRequest: roundRequest}
76+
77+
requestFrom := r.requestFrom()
78+
79+
r.lastSequenceRequested = end
80+
r.comm.SendMessage(msg, requestFrom)
81+
}
82+
83+
// requestFrom returns a node to send a message request to
84+
// this is used to ensure that we are not sending a message to ourselves
85+
func (r *ReplicationState) requestFrom() NodeID {
86+
nodes := r.comm.ListNodes()
87+
for _, node := range nodes {
88+
if !node.Equals(r.id) {
89+
return node
90+
}
91+
}
92+
return NodeID{}
93+
}
94+
95+
// maybeCollectFutureFinalizationCertificates attempts to collect future finalization certificates if
96+
// there are more fCerts to be collected and the round has caught up.
97+
func (r *ReplicationState) maybeCollectFutureFinalizationCertificates(round uint64, nextSequenceToCommit uint64) {
98+
if r.highestFCertReceived == nil {
99+
return
100+
}
101+
102+
// we send out more request once our round has caught up to 1/2 of the maxRoundWindow
103+
if r.lastSequenceRequested >= r.highestFCertReceived.Finalization.Round {
104+
return
105+
}
106+
if round+r.maxRoundWindow/2 > r.lastSequenceRequested {
107+
r.collectFutureFinalizationCertificates(r.highestFCertReceived, round, nextSequenceToCommit)
108+
}
109+
}
110+
111+
func (r *ReplicationState) StoreFinalizedBlock(data FinalizedBlock) error {
112+
// ensure the finalization certificate we get relates to the block
113+
blockDigest := data.Block.BlockHeader().Digest
114+
if !bytes.Equal(blockDigest[:], data.FCert.Finalization.Digest[:]) {
115+
return fmt.Errorf("finalization certificate does not match the block")
116+
}
117+
118+
// don't store the same finalization certificate twice
119+
if _, ok := r.receivedFinalizationCertificates[data.FCert.Finalization.Seq]; ok {
120+
return nil
121+
}
122+
123+
r.receivedFinalizationCertificates[data.FCert.Finalization.Seq] = data
124+
return nil
125+
}

0 commit comments

Comments
 (0)