Skip to content

Commit 88dba2d

Browse files
jmozahzelig
authored andcommitted
bzzeth: protocol runloop now quits on peer disconnect
1 parent 33e83fa commit 88dba2d

File tree

2 files changed

+111
-20
lines changed

2 files changed

+111
-20
lines changed

bzzeth/bzzeth.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package bzzeth
1818

1919
import (
2020
"context"
21+
"errors"
2122

2223
"github.com/ethereum/go-ethereum/node"
2324
"github.com/ethereum/go-ethereum/p2p"
@@ -26,6 +27,10 @@ import (
2627
"github.com/ethersphere/swarm/p2p/protocols"
2728
)
2829

30+
var (
31+
errRcvdMsgFromSwarmNode = errors.New("received message from Swarm node")
32+
)
33+
2934
// BzzEth implements node.Service
3035
var _ node.Service = &BzzEth{}
3136

@@ -61,14 +66,14 @@ func (b *BzzEth) Run(p *p2p.Peer, rw p2p.MsgReadWriter) error {
6166
bp.serveHeaders = handshake.(*Handshake).ServeHeaders
6267
log.Debug("handshake", "hs", handshake, "peer", bp)
6368

69+
b.peers.add(bp)
70+
defer b.peers.remove(bp)
71+
6472
// This protocol is all about interaction between an Eth node and a Swarm Node.
6573
// If another swarm node tries to connect then the protocol goes into idle
6674
if isSwarmNodeFunc(bp) {
67-
<-b.quit
68-
return nil
75+
return peer.Run(b.handleMsgFromSwarmNode(bp))
6976
}
70-
b.peers.add(bp)
71-
defer b.peers.remove(bp)
7277

7378
return peer.Run(b.handleMsg(bp))
7479
}
@@ -85,6 +90,15 @@ func (b *BzzEth) handleMsg(p *Peer) func(context.Context, interface{}) error {
8590
}
8691
}
8792

93+
// handleMsgFromSwarmNode is used in the case if this node is connected to a Swarm node
94+
// If any message is received in this case, the peer needs to be dropped
95+
func (b *BzzEth) handleMsgFromSwarmNode(p *Peer) func(context.Context, interface{}) error {
96+
return func(ctx context.Context, msg interface{}) error {
97+
p.logger.Warn("bzzeth.handleMsgFromSwarmNode")
98+
return errRcvdMsgFromSwarmNode
99+
}
100+
}
101+
88102
// Protocols returns the p2p protocol
89103
func (b *BzzEth) Protocols() []p2p.Protocol {
90104
return []p2p.Protocol{

bzzeth/bzzeth_test.go

Lines changed: 93 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,23 @@ func handshakeExchange(tester *p2ptest.ProtocolTester, peerID enode.ID, serveHea
7272
})
7373
}
7474

75+
// This message is exchanged between two Swarm nodes to check if the connection drops
76+
func dummyHandshakeMessage(tester *p2ptest.ProtocolTester, peerID enode.ID) error {
77+
return tester.TestExchanges(
78+
p2ptest.Exchange{
79+
Label: "Handshake",
80+
Triggers: []p2ptest.Trigger{
81+
{
82+
Code: 0,
83+
Msg: Handshake{
84+
ServeHeaders: true,
85+
},
86+
Peer: peerID,
87+
},
88+
},
89+
})
90+
}
91+
7592
// tests handshake between eth node and swarm node
7693
// on successful handshake the protocol does not go idle
7794
// peer added to the pool and serves headers is registered
@@ -89,18 +106,10 @@ func TestBzzEthHandshake(t *testing.T) {
89106
}
90107

91108
// after successful handshake, expect peer added to peer pool
92-
var p *Peer
93-
for i := 0; i < 10; i++ {
94-
p = b.peers.get(node.ID())
95-
if p != nil {
96-
break
97-
}
98-
time.Sleep(100 * time.Millisecond)
99-
}
109+
p := getPeerAfterConnection(node.ID(), b)
100110
if p == nil {
101111
t.Fatal("bzzeth peer not added")
102112
}
103-
104113
if !p.serveHeaders {
105114
t.Fatal("bzzeth peer serveHeaders not set")
106115
}
@@ -114,35 +123,103 @@ func TestBzzEthHandshake(t *testing.T) {
114123

115124
// TestBzzBzzHandshake tests that a handshake between two Swarm nodes
116125
func TestBzzBzzHandshake(t *testing.T) {
126+
// redefine isSwarmNodeFunc to force recognise remote peer as swarm node
127+
defer func(f func(*Peer) bool) {
128+
isSwarmNodeFunc = f
129+
}(isSwarmNodeFunc)
130+
isSwarmNodeFunc = func(_ *Peer) bool { return true }
131+
117132
tester, b, teardown, err := newBzzEthTester()
118133
if err != nil {
119134
t.Fatal(err)
120135
}
121136
defer teardown()
122137

138+
node := tester.Nodes[0]
139+
err = handshakeExchange(tester, node.ID(), false, true)
140+
if err != nil {
141+
t.Fatalf("expected no error, got %v", err)
142+
}
143+
144+
// after successful handshake, expect peer added to peer pool
145+
p := getPeerAfterConnection(node.ID(), b)
146+
if p == nil {
147+
t.Fatal("bzzeth peer not added")
148+
}
149+
150+
// after closing the protocol, expect disconnect
151+
close(b.quit)
152+
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("?")})
153+
if err == nil || err.Error() != "timed out waiting for peers to disconnect" {
154+
t.Fatal(err)
155+
}
156+
}
157+
158+
// TestBzzBzzHandshakeWithMessage tests that a handshake between two Swarm nodes and message exchange
159+
// disconnects the peer
160+
func TestBzzBzzHandshakeWithMessage(t *testing.T) {
123161
// redefine isSwarmNodeFunc to force recognise remote peer as swarm node
124162
defer func(f func(*Peer) bool) {
125163
isSwarmNodeFunc = f
126164
}(isSwarmNodeFunc)
127165
isSwarmNodeFunc = func(_ *Peer) bool { return true }
128166

167+
tester, b, teardown, err := newBzzEthTester()
168+
if err != nil {
169+
t.Fatal(err)
170+
}
171+
defer teardown()
172+
129173
node := tester.Nodes[0]
130174
err = handshakeExchange(tester, node.ID(), false, true)
131175
if err != nil {
132176
t.Fatalf("expected no error, got %v", err)
133177
}
134178

135-
// after handshake expect protocol to hang, peer not added to pool
136-
p := b.peers.get(node.ID())
137-
if p != nil {
138-
t.Fatal("bzzeth swarm peer incorrectly added")
179+
// after successful handshake, expect peer added to peer pool
180+
var p *Peer
181+
for i := 0; i < 10; i++ {
182+
p = b.peers.get(node.ID())
183+
if p != nil {
184+
break
185+
}
186+
time.Sleep(100 * time.Millisecond)
187+
}
188+
if p == nil {
189+
t.Fatal("bzzeth peer not added")
139190
}
140191

141-
// after closing the ptotocall, expect disconnect
142-
close(b.quit)
143-
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("protocol returned")})
192+
// Send a dummy handshake message, wait for sometime and check if peer is dropped
193+
err = dummyHandshakeMessage(tester, node.ID())
144194
if err != nil {
145195
t.Fatal(err)
146196
}
197+
// after successful handshake, expect peer added to peer pool
198+
p1 := isPeerDisconnected(node.ID(), b)
199+
if p1 != nil {
200+
t.Fatal("bzzeth peer still connected")
201+
}
202+
}
203+
204+
func getPeerAfterConnection(id enode.ID, b *BzzEth) (p *Peer) {
205+
for i := 0; i < 10; i++ {
206+
p = b.peers.get(id)
207+
if p != nil {
208+
break
209+
}
210+
time.Sleep(100 * time.Millisecond)
211+
}
212+
return
213+
}
147214

215+
func isPeerDisconnected(id enode.ID, b *BzzEth) (p *Peer) {
216+
var p1 *Peer
217+
for i := 0; i < 10; i++ {
218+
p1 = b.peers.get(id)
219+
if p1 == nil {
220+
break
221+
}
222+
time.Sleep(100 * time.Millisecond)
223+
}
224+
return
148225
}

0 commit comments

Comments
 (0)