Skip to content

Commit 0da60fc

Browse files
authored
Merge pull request #731 from gzliudan/trust-peer
rpc: add admin_addTrustedPeer and admin_removeTrustedPeer
2 parents e8a9807 + f9f172a commit 0da60fc

File tree

5 files changed

+208
-9
lines changed

5 files changed

+208
-9
lines changed

internal/web3ext/web3ext.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,16 @@ web3._extend({
197197
call: 'admin_removePeer',
198198
params: 1
199199
}),
200+
new web3._extend.Method({
201+
name: 'addTrustedPeer',
202+
call: 'admin_addTrustedPeer',
203+
params: 1
204+
}),
205+
new web3._extend.Method({
206+
name: 'removeTrustedPeer',
207+
call: 'admin_removeTrustedPeer',
208+
params: 1
209+
}),
200210
new web3._extend.Method({
201211
name: 'exportChain',
202212
call: 'admin_exportChain',

node/api.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (api *PrivateAdminAPI) AddPeer(url string) (bool, error) {
6060
return true, nil
6161
}
6262

63-
// RemovePeer disconnects from a a remote node if the connection exists
63+
// RemovePeer disconnects from a remote node if the connection exists
6464
func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
6565
// Make sure the server is running, fail otherwise
6666
server := api.node.Server()
@@ -76,6 +76,37 @@ func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
7676
return true, nil
7777
}
7878

79+
// AddTrustedPeer allows a remote node to always connect, even if slots are full
80+
func (api *PrivateAdminAPI) AddTrustedPeer(url string) (bool, error) {
81+
// Make sure the server is running, fail otherwise
82+
server := api.node.Server()
83+
if server == nil {
84+
return false, ErrNodeStopped
85+
}
86+
node, err := discover.ParseNode(url)
87+
if err != nil {
88+
return false, fmt.Errorf("invalid enode: %v", err)
89+
}
90+
server.AddTrustedPeer(node)
91+
return true, nil
92+
}
93+
94+
// RemoveTrustedPeer removes a remote node from the trusted peer set, but it
95+
// does not disconnect it automatically.
96+
func (api *PrivateAdminAPI) RemoveTrustedPeer(url string) (bool, error) {
97+
// Make sure the server is running, fail otherwise
98+
server := api.node.Server()
99+
if server == nil {
100+
return false, ErrNodeStopped
101+
}
102+
node, err := discover.ParseNode(url)
103+
if err != nil {
104+
return false, fmt.Errorf("invalid enode: %v", err)
105+
}
106+
server.RemoveTrustedPeer(node)
107+
return true, nil
108+
}
109+
79110
// PeerEvents creates an RPC subscription which receives peer events from the
80111
// node's p2p.Server
81112
func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {

p2p/peer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (p *Peer) String() string {
165165

166166
// Inbound returns true if the peer is an inbound connection
167167
func (p *Peer) Inbound() bool {
168-
return p.rw.flags&inboundConn != 0
168+
return p.rw.is(inboundConn)
169169
}
170170

171171
func newPeer(conn *conn, protocols []Protocol) *Peer {

p2p/server.go

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"errors"
2323
"net"
2424
"sync"
25+
"sync/atomic"
2526
"time"
2627

2728
"github.com/XinFinOrg/XDPoSChain/common"
@@ -168,6 +169,8 @@ type Server struct {
168169
quit chan struct{}
169170
addstatic chan *discover.Node
170171
removestatic chan *discover.Node
172+
addtrusted chan *discover.Node
173+
removetrusted chan *discover.Node
171174
posthandshake chan *conn
172175
addpeer chan *conn
173176
delpeer chan peerDrop
@@ -184,7 +187,7 @@ type peerDrop struct {
184187
requested bool // true if signaled by the peer
185188
}
186189

187-
type connFlag int
190+
type connFlag int32
188191

189192
const (
190193
dynDialedConn connFlag = 1 << iota
@@ -249,7 +252,18 @@ func (f connFlag) String() string {
249252
}
250253

251254
func (c *conn) is(f connFlag) bool {
252-
return c.flags&f != 0
255+
flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
256+
return flags&f != 0
257+
}
258+
259+
func (c *conn) set(f connFlag, val bool) {
260+
flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
261+
if val {
262+
flags |= f
263+
} else {
264+
flags &= ^f
265+
}
266+
atomic.StoreInt32((*int32)(&c.flags), int32(flags))
253267
}
254268

255269
// Peers returns all connected peers.
@@ -300,6 +314,23 @@ func (srv *Server) RemovePeer(node *discover.Node) {
300314
}
301315
}
302316

317+
// AddTrustedPeer adds the given node to a reserved whitelist which allows the
318+
// node to always connect, even if the slot are full.
319+
func (srv *Server) AddTrustedPeer(node *discover.Node) {
320+
select {
321+
case srv.addtrusted <- node:
322+
case <-srv.quit:
323+
}
324+
}
325+
326+
// RemoveTrustedPeer removes the given node from the trusted peer set.
327+
func (srv *Server) RemoveTrustedPeer(node *discover.Node) {
328+
select {
329+
case srv.removetrusted <- node:
330+
case <-srv.quit:
331+
}
332+
}
333+
303334
// SubscribePeers subscribes the given channel to peer events
304335
func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
305336
return srv.peerFeed.Subscribe(ch)
@@ -410,6 +441,8 @@ func (srv *Server) Start() (err error) {
410441
srv.posthandshake = make(chan *conn)
411442
srv.addstatic = make(chan *discover.Node)
412443
srv.removestatic = make(chan *discover.Node)
444+
srv.addtrusted = make(chan *discover.Node)
445+
srv.removetrusted = make(chan *discover.Node)
413446
srv.peerOp = make(chan peerOpFunc)
414447
srv.peerOpDone = make(chan struct{})
415448

@@ -546,8 +579,7 @@ func (srv *Server) run(dialstate dialer) {
546579
queuedTasks []task // tasks that can't run yet
547580
)
548581
// Put trusted nodes into a map to speed up checks.
549-
// Trusted peers are loaded on startup and cannot be
550-
// modified while the server is running.
582+
// Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
551583
for _, n := range srv.TrustedNodes {
552584
trusted[n.ID] = true
553585
}
@@ -599,12 +631,32 @@ running:
599631
case n := <-srv.removestatic:
600632
// This channel is used by RemovePeer to send a
601633
// disconnect request to a peer and begin the
602-
// stop keeping the node connected
634+
// stop keeping the node connected.
603635
srv.log.Debug("Removing static node", "node", n)
604636
dialstate.removeStatic(n)
605637
if p, ok := peers[n.ID]; ok {
606638
p.Disconnect(DiscRequested)
607639
}
640+
case n := <-srv.addtrusted:
641+
// This channel is used by AddTrustedPeer to add an enode
642+
// to the trusted node set.
643+
srv.log.Trace("Adding trusted node", "node", n)
644+
trusted[n.ID] = true
645+
// Mark any already-connected peer as trusted
646+
if p, ok := peers[n.ID]; ok {
647+
p.rw.set(trustedConn, true)
648+
}
649+
case n := <-srv.removetrusted:
650+
// This channel is used by RemoveTrustedPeer to remove an enode
651+
// from the trusted node set.
652+
srv.log.Trace("Removing trusted node", "node", n)
653+
if _, ok := trusted[n.ID]; ok {
654+
delete(trusted, n.ID)
655+
}
656+
// Unmark any already-connected peer as trusted
657+
if p, ok := peers[n.ID]; ok {
658+
p.rw.set(trustedConn, false)
659+
}
608660
case op := <-srv.peerOp:
609661
// This channel is used by Peers and PeerCount.
610662
op(peers)

p2p/server_test.go

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ func TestServerDial(t *testing.T) {
148148

149149
// tell the server to connect
150150
tcpAddr := listener.Addr().(*net.TCPAddr)
151-
srv.AddPeer(&discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)})
151+
node := &discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)}
152+
srv.AddPeer(node)
152153

153154
select {
154155
case conn := <-accepted:
@@ -169,6 +170,29 @@ func TestServerDial(t *testing.T) {
169170
if !reflect.DeepEqual(peers, []*Peer{peer}) {
170171
t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
171172
}
173+
174+
// Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags
175+
// Particularly for race conditions on changing the flag state.
176+
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
177+
t.Errorf("peer is trusted prematurely: %v", peer)
178+
}
179+
done := make(chan bool)
180+
go func() {
181+
srv.AddTrustedPeer(node)
182+
if peer := srv.Peers()[0]; !peer.Info().Network.Trusted {
183+
t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer)
184+
}
185+
srv.RemoveTrustedPeer(node)
186+
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
187+
t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
188+
}
189+
done <- true
190+
}()
191+
// Trigger potential race conditions
192+
peer = srv.Peers()[0]
193+
_ = peer.Inbound()
194+
_ = peer.Info()
195+
<-done
172196
case <-time.After(1 * time.Second):
173197
t.Error("server did not launch peer within one second")
174198
}
@@ -365,7 +389,8 @@ func TestServerAtCap(t *testing.T) {
365389
}
366390
}
367391
// Try inserting a non-trusted connection.
368-
c := newconn(randomID())
392+
anotherID := randomID()
393+
c := newconn(anotherID)
369394
if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
370395
t.Error("wrong error for insert:", err)
371396
}
@@ -378,6 +403,87 @@ func TestServerAtCap(t *testing.T) {
378403
t.Error("Server did not set trusted flag")
379404
}
380405

406+
// Remove from trusted set and try again
407+
srv.RemoveTrustedPeer(&discover.Node{ID: trustedID})
408+
c = newconn(trustedID)
409+
if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
410+
t.Error("wrong error for insert:", err)
411+
}
412+
413+
// Add anotherID to trusted set and try again
414+
srv.AddTrustedPeer(&discover.Node{ID: anotherID})
415+
c = newconn(anotherID)
416+
if err := srv.checkpoint(c, srv.posthandshake); err != nil {
417+
t.Error("unexpected error for trusted conn @posthandshake:", err)
418+
}
419+
if !c.is(trustedConn) {
420+
t.Error("Server did not set trusted flag")
421+
}
422+
}
423+
424+
func TestServerPeerLimits(t *testing.T) {
425+
srvkey := newkey()
426+
427+
clientid := randomID()
428+
clientnode := &discover.Node{ID: clientid}
429+
430+
var tp *setupTransport = &setupTransport{
431+
id: clientid,
432+
phs: &protoHandshake{
433+
ID: clientid,
434+
// Force "DiscUselessPeer" due to unmatching caps
435+
// Caps: []Cap{discard.cap()},
436+
},
437+
}
438+
var flags connFlag = dynDialedConn
439+
var dialDest *discover.Node = &discover.Node{ID: clientid}
440+
441+
srv := &Server{
442+
Config: Config{
443+
PrivateKey: srvkey,
444+
MaxPeers: 0,
445+
NoDial: true,
446+
Protocols: []Protocol{discard},
447+
},
448+
newTransport: func(fd net.Conn) transport { return tp },
449+
log: log.New(),
450+
}
451+
if err := srv.Start(); err != nil {
452+
t.Fatalf("couldn't start server: %v", err)
453+
}
454+
defer srv.Stop()
455+
456+
// Check that server is full (MaxPeers=0)
457+
conn, _ := net.Pipe()
458+
srv.SetupConn(conn, flags, dialDest)
459+
if tp.closeErr != DiscTooManyPeers {
460+
t.Errorf("unexpected close error: %q", tp.closeErr)
461+
}
462+
conn.Close()
463+
464+
srv.AddTrustedPeer(clientnode)
465+
466+
// Check that server allows a trusted peer despite being full.
467+
conn, _ = net.Pipe()
468+
srv.SetupConn(conn, flags, dialDest)
469+
if tp.closeErr == DiscTooManyPeers {
470+
t.Errorf("failed to bypass MaxPeers with trusted node: %q", tp.closeErr)
471+
}
472+
473+
if tp.closeErr != DiscUselessPeer {
474+
t.Errorf("unexpected close error: %q", tp.closeErr)
475+
}
476+
conn.Close()
477+
478+
srv.RemoveTrustedPeer(clientnode)
479+
480+
// Check that server is full again.
481+
conn, _ = net.Pipe()
482+
srv.SetupConn(conn, flags, dialDest)
483+
if tp.closeErr != DiscTooManyPeers {
484+
t.Errorf("unexpected close error: %q", tp.closeErr)
485+
}
486+
conn.Close()
381487
}
382488

383489
func TestServerSetupConn(t *testing.T) {

0 commit comments

Comments
 (0)