@@ -74,14 +74,14 @@ type ProtocolManager struct {
74
74
minedBlockSub event.Subscription
75
75
76
76
// channels for fetcher, syncer, txsyncLoop
77
- newPeerCh chan * peer
78
- txsyncCh chan * txsync
79
- quitSync chan struct {}
77
+ newPeerCh chan * peer
78
+ txsyncCh chan * txsync
79
+ quitSync chan struct {}
80
+ noMorePeers chan struct {}
80
81
81
82
// wait group is used for graceful shutdowns during downloading
82
83
// and processing
83
- wg sync.WaitGroup
84
- quit bool
84
+ wg sync.WaitGroup
85
85
}
86
86
87
87
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -94,16 +94,17 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
94
94
}
95
95
// Create the protocol manager with the base fields
96
96
manager := & ProtocolManager {
97
- networkId : networkId ,
98
- fastSync : fastSync ,
99
- eventMux : mux ,
100
- txpool : txpool ,
101
- blockchain : blockchain ,
102
- chaindb : chaindb ,
103
- peers : newPeerSet (),
104
- newPeerCh : make (chan * peer , 1 ),
105
- txsyncCh : make (chan * txsync ),
106
- quitSync : make (chan struct {}),
97
+ networkId : networkId ,
98
+ fastSync : fastSync ,
99
+ eventMux : mux ,
100
+ txpool : txpool ,
101
+ blockchain : blockchain ,
102
+ chaindb : chaindb ,
103
+ peers : newPeerSet (),
104
+ newPeerCh : make (chan * peer ),
105
+ noMorePeers : make (chan struct {}),
106
+ txsyncCh : make (chan * txsync ),
107
+ quitSync : make (chan struct {}),
107
108
}
108
109
// Initiate a sub-protocol for every implemented version we can handle
109
110
manager .SubProtocols = make ([]p2p.Protocol , 0 , len (ProtocolVersions ))
@@ -120,8 +121,14 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
120
121
Length : ProtocolLengths [i ],
121
122
Run : func (p * p2p.Peer , rw p2p.MsgReadWriter ) error {
122
123
peer := manager .newPeer (int (version ), p , rw )
123
- manager .newPeerCh <- peer
124
- return manager .handle (peer )
124
+ select {
125
+ case manager .newPeerCh <- peer :
126
+ manager .wg .Add (1 )
127
+ defer manager .wg .Done ()
128
+ return manager .handle (peer )
129
+ case <- manager .quitSync :
130
+ return p2p .DiscQuitting
131
+ }
125
132
},
126
133
NodeInfo : func () interface {} {
127
134
return manager .NodeInfo ()
@@ -187,16 +194,25 @@ func (pm *ProtocolManager) Start() {
187
194
}
188
195
189
196
func (pm * ProtocolManager ) Stop () {
190
- // Showing a log message. During download / process this could actually
191
- // take between 5 to 10 seconds and therefor feedback is required.
192
197
glog .V (logger .Info ).Infoln ("Stopping ethereum protocol handler..." )
193
198
194
- pm .quit = true
195
199
pm .txSub .Unsubscribe () // quits txBroadcastLoop
196
200
pm .minedBlockSub .Unsubscribe () // quits blockBroadcastLoop
197
- close (pm .quitSync ) // quits syncer, fetcher, txsyncLoop
198
201
199
- // Wait for any process action
202
+ // Quit the sync loop.
203
+ // After this send has completed, no new peers will be accepted.
204
+ pm .noMorePeers <- struct {}{}
205
+
206
+ // Quit fetcher, txsyncLoop.
207
+ close (pm .quitSync )
208
+
209
+ // Disconnect existing sessions.
210
+ // This also closes the gate for any new registrations on the peer set.
211
+ // sessions which are already established but not added to pm.peers yet
212
+ // will exit when they try to register.
213
+ pm .peers .Close ()
214
+
215
+ // Wait for all peer handler goroutines and the loops to come down.
200
216
pm .wg .Wait ()
201
217
202
218
glog .V (logger .Info ).Infoln ("Ethereum protocol handler stopped" )
0 commit comments