Skip to content

Commit a43b0a6

Browse files
committed
add connect/disconnect to test network
1 parent 8f68520 commit a43b0a6

File tree

2 files changed

+113
-5
lines changed

2 files changed

+113
-5
lines changed

epoch_multinode_test.go

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,8 @@ func defaultTestNodeEpochConfig(t *testing.T, nodeID NodeID, net *inMemNetwork,
8989
l := testutil.MakeLogger(t, int(nodeID[0]))
9090
storage := newInMemStorage()
9191
conf := EpochConfig{
92-
MaxProposalWait: DefaultMaxProposalWaitTime,
93-
Comm: &testComm{
94-
from: nodeID,
95-
net: net,
96-
},
92+
MaxProposalWait: DefaultMaxProposalWaitTime,
93+
Comm: newTestComm(nodeID, net),
9794
Logger: l,
9895
ID: nodeID,
9996
Signer: &testSigner{},
@@ -213,11 +210,23 @@ type testComm struct {
213210
net *inMemNetwork
214211
}
215212

213+
func newTestComm(from NodeID, net *inMemNetwork) *testComm {
214+
return &testComm{
215+
from: from,
216+
net: net,
217+
}
218+
}
219+
216220
func (c *testComm) ListNodes() []NodeID {
217221
return c.net.nodes
218222
}
219223

220224
func (c *testComm) SendMessage(msg *Message, destination NodeID) {
225+
// cannot send if either [from] or [destination] is not connected
226+
if !c.net.IsConnected(destination) || !c.net.IsConnected(c.from) {
227+
return
228+
}
229+
221230
for _, instance := range c.net.instances {
222231
if bytes.Equal(instance.e.ID, destination) {
223232
instance.ingress <- struct {
@@ -230,11 +239,19 @@ func (c *testComm) SendMessage(msg *Message, destination NodeID) {
230239
}
231240

232241
func (c *testComm) Broadcast(msg *Message) {
242+
if !c.net.IsConnected(c.from) {
243+
return
244+
}
245+
233246
for _, instance := range c.net.instances {
234247
// Skip sending the message to yourself
235248
if bytes.Equal(c.from, instance.e.ID) {
236249
continue
237250
}
251+
if !c.net.IsConnected(instance.e.ID) {
252+
continue
253+
}
254+
238255
instance.ingress <- struct {
239256
msg *Message
240257
from NodeID
@@ -246,15 +263,22 @@ type inMemNetwork struct {
246263
t *testing.T
247264
nodes []NodeID
248265
instances []*testNode
266+
connected map[string]bool
249267
}
250268

251269
// newInMemNetwork creates an in-memory network. Node IDs must be provided before
252270
// adding instances, as nodes require prior knowledge of all participants.
253271
func newInMemNetwork(t *testing.T, nodes []NodeID) *inMemNetwork {
272+
connected := make(map[string]bool)
273+
for _, node := range nodes {
274+
connected[string(node)] = true
275+
}
276+
254277
net := &inMemNetwork{
255278
t: t,
256279
nodes: nodes,
257280
instances: make([]*testNode, 0),
281+
connected: connected,
258282
}
259283
return net
260284
}
@@ -271,6 +295,23 @@ func (n *inMemNetwork) addNode(node *testNode) {
271295
n.instances = append(n.instances, node)
272296
}
273297

298+
func (n *inMemNetwork) IsConnected(node NodeID) bool {
299+
for _, instance := range n.instances {
300+
if bytes.Equal(instance.e.ID, node) {
301+
return n.connected[string(node)]
302+
}
303+
}
304+
return false
305+
}
306+
307+
func (n *inMemNetwork) Connect(node NodeID) {
308+
n.connected[string(node)] = true
309+
}
310+
311+
func (n *inMemNetwork) Disconnect(node NodeID) {
312+
n.connected[string(node)] = false
313+
}
314+
274315
// startInstances starts all instances in the network.
275316
// The first one is typically the leader, so we make sure to start it last.
276317
func (n *inMemNetwork) startInstances() {

replication_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package simplex_test
55

66
import (
7+
"bytes"
78
"context"
89
"simplex"
910
"simplex/testutil"
@@ -172,6 +173,72 @@ func TestReplicationStartsBeforeCurrentRound(t *testing.T) {
172173
}
173174
}
174175

176+
// TestReplicationAfterNodeDisconnects tests the replication process of a node that
177+
// disconnects from the network and reconnects after the rest of the network has made progress.
178+
func TestReplicationAfterNodeDisconnects(t *testing.T) {
179+
bb := newTestControlledBlockBuilder(t)
180+
nodes := []simplex.NodeID{{1}, {2}, {3}, []byte("lagging")}
181+
net := newInMemNetwork(t, nodes)
182+
startDisconnect := uint64(5)
183+
endDisconnect := uint64(18)
184+
185+
normalNode1 := newSimplexNode(t, nodes[0], net, bb, true)
186+
normalNode2 := newSimplexNode(t, nodes[1], net, bb, true)
187+
normalNode3 := newSimplexNode(t, nodes[2], net, bb, true)
188+
laggingNode := newSimplexNode(t, nodes[3], net, bb, true)
189+
190+
require.Equal(t, uint64(0), normalNode1.storage.Height())
191+
require.Equal(t, uint64(0), normalNode2.storage.Height())
192+
require.Equal(t, uint64(0), normalNode3.storage.Height())
193+
require.Equal(t, uint64(0), laggingNode.storage.Height())
194+
195+
bb.triggerNewBlock()
196+
197+
net.startInstances()
198+
199+
for i := uint64(0); i < startDisconnect; i++ {
200+
for _, n := range net.instances {
201+
n.storage.waitForBlockCommit(i)
202+
}
203+
204+
bb.triggerNewBlock()
205+
}
206+
207+
require.Equal(t, startDisconnect, normalNode1.storage.Height())
208+
require.Equal(t, startDisconnect, normalNode2.storage.Height())
209+
require.Equal(t, startDisconnect, normalNode3.storage.Height())
210+
require.Equal(t, startDisconnect, laggingNode.storage.Height())
211+
212+
// lagging node disconnects
213+
net.Disconnect(nodes[3])
214+
215+
// normal nodes continue to make progress
216+
for i := startDisconnect; i < endDisconnect; i++ {
217+
if bytes.Equal(simplex.LeaderForRound(nodes, i), nodes[3]) {
218+
// TODO: build empty block
219+
}
220+
for _, n := range net.instances[:3] {
221+
n.storage.waitForBlockCommit(i)
222+
}
223+
bb.triggerNewBlock()
224+
}
225+
226+
require.Equal(t, endDisconnect, normalNode1.storage.Height())
227+
require.Equal(t, endDisconnect, normalNode2.storage.Height())
228+
require.Equal(t, endDisconnect, normalNode3.storage.Height())
229+
require.Equal(t, startDisconnect, laggingNode.storage.Height())
230+
231+
// lagging node reconnects
232+
net.Connect(nodes[3])
233+
234+
// lagging node catches up
235+
bb.triggerNewBlock()
236+
require.Equal(t, endDisconnect+1, laggingNode.storage.Height())
237+
require.Equal(t, endDisconnect+1, normalNode1.storage.Height())
238+
require.Equal(t, endDisconnect+1, normalNode2.storage.Height())
239+
require.Equal(t, endDisconnect+1, normalNode3.storage.Height())
240+
}
241+
175242
func createBlocks(t *testing.T, nodes []simplex.NodeID, bb simplex.BlockBuilder, seqCount uint64) []simplex.FinalizedBlock {
176243
logger := testutil.MakeLogger(t, int(0))
177244
ctx := context.Background()

0 commit comments

Comments
 (0)