Skip to content

Commit 56ed615

Browse files
committed
core, eth, miner: improve shutdown synchronisation
Shutting down geth prints hundreds of annoying error messages in some cases. The errors appear because the Stop method of eth.ProtocolManager, miner.Miner and core.TxPool is asynchronous. Left over peer sessions generate events which are processed after Stop even though the database has already been closed. The fix is to make Stop synchronous using sync.WaitGroup. For eth.ProtocolManager, in order to make use of WaitGroup safe, we need a way to stop new peer sessions from being added while waiting on the WaitGroup. The eth protocol Run function now selects on a signaling channel and adds to the WaitGroup only if ProtocolManager is not shutting down. For miner.worker and core.TxPool the number of goroutines is static, WaitGroup can be used in the usual way without additional synchronisation.
1 parent f821b01 commit 56ed615

File tree

7 files changed

+101
-70
lines changed

7 files changed

+101
-70
lines changed

core/tx_pool.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ type stateFn func() (*state.StateDB, error)
6060
// two states over time as they are received and processed.
6161
type TxPool struct {
6262
config *ChainConfig
63-
quit chan bool // Quitting channel
64-
currentState stateFn // The state function which will allow us to do some pre checks
63+
currentState stateFn // The state function which will allow us to do some pre checks
6564
pendingState *state.ManagedState
6665
gasLimit func() *big.Int // The current gas limit function callback
6766
minGasPrice *big.Int
@@ -72,6 +71,8 @@ type TxPool struct {
7271
pending map[common.Hash]*types.Transaction // processable transactions
7372
queue map[common.Address]map[common.Hash]*types.Transaction
7473

74+
wg sync.WaitGroup // for shutdown sync
75+
7576
homestead bool
7677
}
7778

@@ -80,7 +81,6 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
8081
config: config,
8182
pending: make(map[common.Hash]*types.Transaction),
8283
queue: make(map[common.Address]map[common.Hash]*types.Transaction),
83-
quit: make(chan bool),
8484
eventMux: eventMux,
8585
currentState: currentStateFn,
8686
gasLimit: gasLimitFn,
@@ -90,12 +90,15 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
9090
events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
9191
}
9292

93+
pool.wg.Add(1)
9394
go pool.eventLoop()
9495

9596
return pool
9697
}
9798

9899
func (pool *TxPool) eventLoop() {
100+
defer pool.wg.Done()
101+
99102
// Track chain events. When a chain events occurs (new chain canon block)
100103
// we need to know the new state. The new state will help us determine
101104
// the nonces in the managed state
@@ -155,8 +158,8 @@ func (pool *TxPool) resetState() {
155158
}
156159

157160
func (pool *TxPool) Stop() {
158-
close(pool.quit)
159161
pool.events.Unsubscribe()
162+
pool.wg.Wait()
160163
glog.V(logger.Info).Infoln("Transaction pool stopped")
161164
}
162165

eth/backend.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ func (s *Ethereum) Stop() error {
416416
s.blockchain.Stop()
417417
s.protocolManager.Stop()
418418
s.txPool.Stop()
419+
s.miner.Stop()
419420
s.eventMux.Stop()
420421

421422
s.StopAutoDAG()

eth/handler.go

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ type ProtocolManager struct {
7474
minedBlockSub event.Subscription
7575

7676
// channels for fetcher, syncer, txsyncLoop
77-
newPeerCh chan *peer
78-
txsyncCh chan *txsync
79-
quitSync chan struct{}
77+
newPeerCh chan *peer
78+
txsyncCh chan *txsync
79+
quitSync chan struct{}
80+
noMorePeers chan struct{}
8081

8182
// wait group is used for graceful shutdowns during downloading
8283
// and processing
83-
wg sync.WaitGroup
84-
quit bool
84+
wg sync.WaitGroup
8585
}
8686

8787
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -94,16 +94,17 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
9494
}
9595
// Create the protocol manager with the base fields
9696
manager := &ProtocolManager{
97-
networkId: networkId,
98-
fastSync: fastSync,
99-
eventMux: mux,
100-
txpool: txpool,
101-
blockchain: blockchain,
102-
chaindb: chaindb,
103-
peers: newPeerSet(),
104-
newPeerCh: make(chan *peer, 1),
105-
txsyncCh: make(chan *txsync),
106-
quitSync: make(chan struct{}),
97+
networkId: networkId,
98+
fastSync: fastSync,
99+
eventMux: mux,
100+
txpool: txpool,
101+
blockchain: blockchain,
102+
chaindb: chaindb,
103+
peers: newPeerSet(),
104+
newPeerCh: make(chan *peer),
105+
noMorePeers: make(chan struct{}),
106+
txsyncCh: make(chan *txsync),
107+
quitSync: make(chan struct{}),
107108
}
108109
// Initiate a sub-protocol for every implemented version we can handle
109110
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
@@ -120,8 +121,14 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
120121
Length: ProtocolLengths[i],
121122
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
122123
peer := manager.newPeer(int(version), p, rw)
123-
manager.newPeerCh <- peer
124-
return manager.handle(peer)
124+
select {
125+
case manager.newPeerCh <- peer:
126+
manager.wg.Add(1)
127+
defer manager.wg.Done()
128+
return manager.handle(peer)
129+
case <-manager.quitSync:
130+
return p2p.DiscQuitting
131+
}
125132
},
126133
NodeInfo: func() interface{} {
127134
return manager.NodeInfo()
@@ -187,16 +194,25 @@ func (pm *ProtocolManager) Start() {
187194
}
188195

189196
func (pm *ProtocolManager) Stop() {
190-
// Showing a log message. During download / process this could actually
191-
// take between 5 to 10 seconds and therefor feedback is required.
192197
glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...")
193198

194-
pm.quit = true
195199
pm.txSub.Unsubscribe() // quits txBroadcastLoop
196200
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
197-
close(pm.quitSync) // quits syncer, fetcher, txsyncLoop
198201

199-
// Wait for any process action
202+
// Quit the sync loop.
203+
// After this send has completed, no new peers will be accepted.
204+
pm.noMorePeers <- struct{}{}
205+
206+
// Quit fetcher, txsyncLoop.
207+
close(pm.quitSync)
208+
209+
// Disconnect existing sessions.
210+
// This also closes the gate for any new registrations on the peer set.
211+
// sessions which are already established but not added to pm.peers yet
212+
// will exit when they try to register.
213+
pm.peers.Close()
214+
215+
// Wait for all peer handler goroutines and the loops to come down.
200216
pm.wg.Wait()
201217

202218
glog.V(logger.Info).Infoln("Ethereum protocol handler stopped")

eth/helper_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,14 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te
140140
// Start the peer on a new thread
141141
errc := make(chan error, 1)
142142
go func() {
143-
pm.newPeerCh <- peer
144-
errc <- pm.handle(peer)
143+
select {
144+
case pm.newPeerCh <- peer:
145+
errc <- pm.handle(peer)
146+
case <-pm.quitSync:
147+
errc <- p2p.DiscQuitting
148+
}
145149
}()
146-
tp := &testPeer{
147-
app: app,
148-
net: net,
149-
peer: peer,
150-
}
150+
tp := &testPeer{app: app, net: net, peer: peer}
151151
// Execute any implicitly requested handshakes and return
152152
if shake {
153153
td, head, genesis := pm.blockchain.Status()

eth/peer.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
)
3535

3636
var (
37+
errClosed = errors.New("peer set is closed")
3738
errAlreadyRegistered = errors.New("peer is already registered")
3839
errNotRegistered = errors.New("peer is not registered")
3940
)
@@ -351,8 +352,9 @@ func (p *peer) String() string {
351352
// peerSet represents the collection of active peers currently participating in
352353
// the Ethereum sub-protocol.
353354
type peerSet struct {
354-
peers map[string]*peer
355-
lock sync.RWMutex
355+
peers map[string]*peer
356+
lock sync.RWMutex
357+
closed bool
356358
}
357359

358360
// newPeerSet creates a new peer set to track the active participants.
@@ -368,6 +370,9 @@ func (ps *peerSet) Register(p *peer) error {
368370
ps.lock.Lock()
369371
defer ps.lock.Unlock()
370372

373+
if ps.closed {
374+
return errClosed
375+
}
371376
if _, ok := ps.peers[p.id]; ok {
372377
return errAlreadyRegistered
373378
}
@@ -450,3 +455,15 @@ func (ps *peerSet) BestPeer() *peer {
450455
}
451456
return bestPeer
452457
}
458+
459+
// Close disconnects all peers.
460+
// No new peers can be registered after Close has returned.
461+
func (ps *peerSet) Close() {
462+
ps.lock.Lock()
463+
defer ps.lock.Unlock()
464+
465+
for _, p := range ps.peers {
466+
p.Disconnect(p2p.DiscQuitting)
467+
}
468+
ps.closed = true
469+
}

eth/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func (pm *ProtocolManager) syncer() {
148148
// Force a sync even if not enough peers are present
149149
go pm.synchronise(pm.peers.BestPeer())
150150

151-
case <-pm.quitSync:
151+
case <-pm.noMorePeers:
152152
return
153153
}
154154
}

miner/worker.go

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,13 @@ type worker struct {
9494

9595
mu sync.Mutex
9696

97+
// update loop
98+
mux *event.TypeMux
99+
events event.Subscription
100+
wg sync.WaitGroup
101+
97102
agents map[Agent]struct{}
98103
recv chan *Result
99-
mux *event.TypeMux
100-
quit chan struct{}
101104
pow pow.PoW
102105

103106
eth core.Backend
@@ -138,13 +141,14 @@ func newWorker(config *core.ChainConfig, coinbase common.Address, eth core.Backe
138141
possibleUncles: make(map[common.Hash]*types.Block),
139142
coinbase: coinbase,
140143
txQueue: make(map[common.Hash]*types.Transaction),
141-
quit: make(chan struct{}),
142144
agents: make(map[Agent]struct{}),
143145
fullValidation: false,
144146
}
147+
worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
148+
worker.wg.Add(1)
145149
go worker.update()
146-
go worker.wait()
147150

151+
go worker.wait()
148152
worker.commitNewWork()
149153

150154
return worker
@@ -184,9 +188,12 @@ func (self *worker) start() {
184188
}
185189

186190
func (self *worker) stop() {
191+
// Quit update.
192+
self.events.Unsubscribe()
193+
self.wg.Wait()
194+
187195
self.mu.Lock()
188196
defer self.mu.Unlock()
189-
190197
if atomic.LoadInt32(&self.mining) == 1 {
191198
// Stop all agents.
192199
for agent := range self.agents {
@@ -217,36 +224,23 @@ func (self *worker) unregister(agent Agent) {
217224
}
218225

219226
func (self *worker) update() {
220-
eventSub := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
221-
defer eventSub.Unsubscribe()
222-
223-
eventCh := eventSub.Chan()
224-
for {
225-
select {
226-
case event, ok := <-eventCh:
227-
if !ok {
228-
// Event subscription closed, set the channel to nil to stop spinning
229-
eventCh = nil
230-
continue
231-
}
232-
// A real event arrived, process interesting content
233-
switch ev := event.Data.(type) {
234-
case core.ChainHeadEvent:
235-
self.commitNewWork()
236-
case core.ChainSideEvent:
237-
self.uncleMu.Lock()
238-
self.possibleUncles[ev.Block.Hash()] = ev.Block
239-
self.uncleMu.Unlock()
240-
case core.TxPreEvent:
241-
// Apply transaction to the pending state if we're not mining
242-
if atomic.LoadInt32(&self.mining) == 0 {
243-
self.currentMu.Lock()
244-
self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain)
245-
self.currentMu.Unlock()
246-
}
227+
defer self.wg.Done()
228+
for event := range self.events.Chan() {
229+
// A real event arrived, process interesting content
230+
switch ev := event.Data.(type) {
231+
case core.ChainHeadEvent:
232+
self.commitNewWork()
233+
case core.ChainSideEvent:
234+
self.uncleMu.Lock()
235+
self.possibleUncles[ev.Block.Hash()] = ev.Block
236+
self.uncleMu.Unlock()
237+
case core.TxPreEvent:
238+
// Apply transaction to the pending state if we're not mining
239+
if atomic.LoadInt32(&self.mining) == 0 {
240+
self.currentMu.Lock()
241+
self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain)
242+
self.currentMu.Unlock()
247243
}
248-
case <-self.quit:
249-
return
250244
}
251245
}
252246
}

0 commit comments

Comments
 (0)