@@ -18,8 +18,9 @@ import (
18
18
)
19
19
20
20
const (
21
- defaultDialTimeout = 10 * time .Second
22
- refreshPeersInterval = 30 * time .Second
21
+ defaultDialTimeout = 10 * time .Second
22
+ refreshPeersInterval = 30 * time .Second
23
+ staticPeerCheckInterval = 15 * time .Second
23
24
24
25
// This is the maximum number of inbound connection
25
26
// that are allowed to linger between 'accepted' and
@@ -59,6 +60,14 @@ type Server struct {
59
60
// with the rest of the network.
60
61
BootstrapNodes []* discover.Node
61
62
63
+ // Static nodes are used as pre-configured connections which are always
64
+ // maintained and re-connected on disconnects.
65
+ StaticNodes []* discover.Node
66
+
67
+ // Trusted nodes are used as pre-configured connections which are always
68
+ // allowed to connect, even above the peer limit.
69
+ TrustedNodes []* discover.Node
70
+
62
71
// NodeDatabase is the path to the database containing the previously seen
63
72
// live nodes in the network.
64
73
NodeDatabase string
@@ -95,20 +104,23 @@ type Server struct {
95
104
96
105
ourHandshake * protoHandshake
97
106
98
- lock sync.RWMutex // protects running and peers
99
- running bool
100
- peers map [discover.NodeID ]* Peer
107
+ lock sync.RWMutex // protects running, peers and the trust fields
108
+ running bool
109
+ peers map [discover.NodeID ]* Peer
110
+ staticNodes map [discover.NodeID ]* discover.Node // Map of currently maintained static remote nodes
111
+ staticDial chan * discover.Node // Dial request channel reserved for the static nodes
112
+ staticCycle time.Duration // Overrides staticPeerCheckInterval, used for testing
113
+ trustedNodes map [discover.NodeID ]bool // Set of currently trusted remote nodes
101
114
102
115
ntab * discover.Table
103
116
listener net.Listener
104
117
105
- quit chan struct {}
106
- loopWG sync.WaitGroup // {dial,listen,nat}Loop
107
- peerWG sync.WaitGroup // active peer goroutines
108
- peerConnect chan * discover.Node
118
+ quit chan struct {}
119
+ loopWG sync.WaitGroup // {dial,listen,nat}Loop
120
+ peerWG sync.WaitGroup // active peer goroutines
109
121
}
110
122
111
- type setupFunc func (net.Conn , * ecdsa.PrivateKey , * protoHandshake , * discover.Node , bool ) (* conn , error )
123
+ type setupFunc func (net.Conn , * ecdsa.PrivateKey , * protoHandshake , * discover.Node , bool , map [ discover . NodeID ] bool ) (* conn , error )
112
124
type newPeerHook func (* Peer )
113
125
114
126
// Peers returns all connected peers.
@@ -131,10 +143,14 @@ func (srv *Server) PeerCount() int {
131
143
return n
132
144
}
133
145
134
- // SuggestPeer creates a connection to the given Node if it
135
- // is not already connected.
136
- func (srv * Server ) SuggestPeer (n * discover.Node ) {
137
- srv .peerConnect <- n
146
+ // AddPeer connects to the given node and maintains the connection until the
147
+ // server is shut down. If the connection fails for any reason, the server will
148
+ // attempt to reconnect the peer.
149
+ func (srv * Server ) AddPeer (node * discover.Node ) {
150
+ srv .lock .Lock ()
151
+ defer srv .lock .Unlock ()
152
+
153
+ srv .staticNodes [node .ID ] = node
138
154
}
139
155
140
156
// Broadcast sends an RLP-encoded message to all connected peers.
@@ -195,7 +211,18 @@ func (srv *Server) Start() (err error) {
195
211
}
196
212
srv .quit = make (chan struct {})
197
213
srv .peers = make (map [discover.NodeID ]* Peer )
198
- srv .peerConnect = make (chan * discover.Node )
214
+
215
+ // Create the current trust maps, and the associated dialing channel
216
+ srv .trustedNodes = make (map [discover.NodeID ]bool )
217
+ for _ , node := range srv .TrustedNodes {
218
+ srv .trustedNodes [node .ID ] = true
219
+ }
220
+ srv .staticNodes = make (map [discover.NodeID ]* discover.Node )
221
+ for _ , node := range srv .StaticNodes {
222
+ srv .staticNodes [node .ID ] = node
223
+ }
224
+ srv .staticDial = make (chan * discover.Node )
225
+
199
226
if srv .setupFunc == nil {
200
227
srv .setupFunc = setupConn
201
228
}
@@ -229,6 +256,8 @@ func (srv *Server) Start() (err error) {
229
256
if srv .NoDial && srv .ListenAddr == "" {
230
257
glog .V (logger .Warn ).Infoln ("I will be kind-of useless, neither dialing nor listening." )
231
258
}
259
+ // maintain the static peers
260
+ go srv .staticNodesLoop ()
232
261
233
262
srv .running = true
234
263
return nil
@@ -323,6 +352,45 @@ func (srv *Server) listenLoop() {
323
352
}
324
353
}
325
354
355
+ // staticNodesLoop is responsible for periodically checking that static
356
+ // connections are actually live, and requests dialing if not.
357
+ func (srv * Server ) staticNodesLoop () {
358
+ // Create a default maintenance ticker, but override it requested
359
+ cycle := staticPeerCheckInterval
360
+ if srv .staticCycle != 0 {
361
+ cycle = srv .staticCycle
362
+ }
363
+ tick := time .NewTicker (cycle )
364
+
365
+ for {
366
+ select {
367
+ case <- srv .quit :
368
+ return
369
+
370
+ case <- tick .C :
371
+ // Collect all the non-connected static nodes
372
+ needed := []* discover.Node {}
373
+ srv .lock .RLock ()
374
+ for id , node := range srv .staticNodes {
375
+ if _ , ok := srv .peers [id ]; ! ok {
376
+ needed = append (needed , node )
377
+ }
378
+ }
379
+ srv .lock .RUnlock ()
380
+
381
+ // Try to dial each of them (don't hang if server terminates)
382
+ for _ , node := range needed {
383
+ glog .V (logger .Debug ).Infof ("Dialing static peer %v" , node )
384
+ select {
385
+ case srv .staticDial <- node :
386
+ case <- srv .quit :
387
+ return
388
+ }
389
+ }
390
+ }
391
+ }
392
+ }
393
+
326
394
func (srv * Server ) dialLoop () {
327
395
var (
328
396
dialed = make (chan * discover.Node )
@@ -373,7 +441,7 @@ func (srv *Server) dialLoop() {
373
441
// below MaxPeers.
374
442
refresh .Reset (refreshPeersInterval )
375
443
}
376
- case dest := <- srv .peerConnect :
444
+ case dest := <- srv .staticDial :
377
445
dial (dest )
378
446
case dests := <- findresults :
379
447
for _ , dest := range dests {
@@ -416,10 +484,18 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
416
484
// returns during that exchange need to call peerWG.Done because
417
485
// the callers of startPeer added the peer to the wait group already.
418
486
fd .SetDeadline (time .Now ().Add (handshakeTimeout ))
487
+
488
+ // Check capacity, but override for static nodes
419
489
srv .lock .RLock ()
420
490
atcap := len (srv .peers ) == srv .MaxPeers
491
+ if dest != nil {
492
+ if _ , ok := srv .staticNodes [dest .ID ]; ok {
493
+ atcap = false
494
+ }
495
+ }
421
496
srv .lock .RUnlock ()
422
- conn , err := srv .setupFunc (fd , srv .PrivateKey , srv .ourHandshake , dest , atcap )
497
+
498
+ conn , err := srv .setupFunc (fd , srv .PrivateKey , srv .ourHandshake , dest , atcap , srv .trustedNodes )
423
499
if err != nil {
424
500
fd .Close ()
425
501
glog .V (logger .Debug ).Infof ("Handshake with %v failed: %v" , fd .RemoteAddr (), err )
@@ -472,11 +548,18 @@ func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
472
548
return true , 0
473
549
}
474
550
551
+ // checkPeer verifies whether a peer looks promising and should be allowed/kept
552
+ // in the pool, or if it's of no use.
475
553
func (srv * Server ) checkPeer (id discover.NodeID ) (bool , DiscReason ) {
554
+ // First up, figure out if the peer is static or trusted
555
+ _ , static := srv .staticNodes [id ]
556
+ trusted := srv .trustedNodes [id ]
557
+
558
+ // Make sure the peer passes all required checks
476
559
switch {
477
560
case ! srv .running :
478
561
return false , DiscQuitting
479
- case len (srv .peers ) >= srv .MaxPeers :
562
+ case ! static && ! trusted && len (srv .peers ) >= srv .MaxPeers :
480
563
return false , DiscTooManyPeers
481
564
case srv .peers [id ] != nil :
482
565
return false , DiscAlreadyConnected
0 commit comments