Skip to content

Commit 756b629

Browse files
committed
Merge pull request #2523 from fjl/shutdown
core, eth, miner: improve shutdown synchronisation
2 parents dc7f202 + 56ed615 commit 756b629

File tree

7 files changed

+101
-70
lines changed

7 files changed

+101
-70
lines changed

core/tx_pool.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ type stateFn func() (*state.StateDB, error)
6060
// two states over time as they are received and processed.
6161
type TxPool struct {
6262
config *ChainConfig
63-
quit chan bool // Quitting channel
64-
currentState stateFn // The state function which will allow us to do some pre checks
63+
currentState stateFn // The state function which will allow us to do some pre checks
6564
pendingState *state.ManagedState
6665
gasLimit func() *big.Int // The current gas limit function callback
6766
minGasPrice *big.Int
@@ -72,6 +71,8 @@ type TxPool struct {
7271
pending map[common.Hash]*types.Transaction // processable transactions
7372
queue map[common.Address]map[common.Hash]*types.Transaction
7473

74+
wg sync.WaitGroup // for shutdown sync
75+
7576
homestead bool
7677
}
7778

@@ -80,7 +81,6 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
8081
config: config,
8182
pending: make(map[common.Hash]*types.Transaction),
8283
queue: make(map[common.Address]map[common.Hash]*types.Transaction),
83-
quit: make(chan bool),
8484
eventMux: eventMux,
8585
currentState: currentStateFn,
8686
gasLimit: gasLimitFn,
@@ -90,12 +90,15 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
9090
events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
9191
}
9292

93+
pool.wg.Add(1)
9394
go pool.eventLoop()
9495

9596
return pool
9697
}
9798

9899
func (pool *TxPool) eventLoop() {
100+
defer pool.wg.Done()
101+
99102
// Track chain events. When a chain events occurs (new chain canon block)
100103
// we need to know the new state. The new state will help us determine
101104
// the nonces in the managed state
@@ -155,8 +158,8 @@ func (pool *TxPool) resetState() {
155158
}
156159

157160
func (pool *TxPool) Stop() {
158-
close(pool.quit)
159161
pool.events.Unsubscribe()
162+
pool.wg.Wait()
160163
glog.V(logger.Info).Infoln("Transaction pool stopped")
161164
}
162165

eth/backend.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ func (s *Ethereum) Stop() error {
416416
s.blockchain.Stop()
417417
s.protocolManager.Stop()
418418
s.txPool.Stop()
419+
s.miner.Stop()
419420
s.eventMux.Stop()
420421

421422
s.StopAutoDAG()

eth/handler.go

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ type ProtocolManager struct {
7474
minedBlockSub event.Subscription
7575

7676
// channels for fetcher, syncer, txsyncLoop
77-
newPeerCh chan *peer
78-
txsyncCh chan *txsync
79-
quitSync chan struct{}
77+
newPeerCh chan *peer
78+
txsyncCh chan *txsync
79+
quitSync chan struct{}
80+
noMorePeers chan struct{}
8081

8182
// wait group is used for graceful shutdowns during downloading
8283
// and processing
83-
wg sync.WaitGroup
84-
quit bool
84+
wg sync.WaitGroup
8585
}
8686

8787
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -94,16 +94,17 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
9494
}
9595
// Create the protocol manager with the base fields
9696
manager := &ProtocolManager{
97-
networkId: networkId,
98-
fastSync: fastSync,
99-
eventMux: mux,
100-
txpool: txpool,
101-
blockchain: blockchain,
102-
chaindb: chaindb,
103-
peers: newPeerSet(),
104-
newPeerCh: make(chan *peer, 1),
105-
txsyncCh: make(chan *txsync),
106-
quitSync: make(chan struct{}),
97+
networkId: networkId,
98+
fastSync: fastSync,
99+
eventMux: mux,
100+
txpool: txpool,
101+
blockchain: blockchain,
102+
chaindb: chaindb,
103+
peers: newPeerSet(),
104+
newPeerCh: make(chan *peer),
105+
noMorePeers: make(chan struct{}),
106+
txsyncCh: make(chan *txsync),
107+
quitSync: make(chan struct{}),
107108
}
108109
// Initiate a sub-protocol for every implemented version we can handle
109110
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
@@ -120,8 +121,14 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
120121
Length: ProtocolLengths[i],
121122
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
122123
peer := manager.newPeer(int(version), p, rw)
123-
manager.newPeerCh <- peer
124-
return manager.handle(peer)
124+
select {
125+
case manager.newPeerCh <- peer:
126+
manager.wg.Add(1)
127+
defer manager.wg.Done()
128+
return manager.handle(peer)
129+
case <-manager.quitSync:
130+
return p2p.DiscQuitting
131+
}
125132
},
126133
NodeInfo: func() interface{} {
127134
return manager.NodeInfo()
@@ -187,16 +194,25 @@ func (pm *ProtocolManager) Start() {
187194
}
188195

189196
func (pm *ProtocolManager) Stop() {
190-
// Showing a log message. During download / process this could actually
191-
// take between 5 to 10 seconds and therefor feedback is required.
192197
glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...")
193198

194-
pm.quit = true
195199
pm.txSub.Unsubscribe() // quits txBroadcastLoop
196200
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
197-
close(pm.quitSync) // quits syncer, fetcher, txsyncLoop
198201

199-
// Wait for any process action
202+
// Quit the sync loop.
203+
// After this send has completed, no new peers will be accepted.
204+
pm.noMorePeers <- struct{}{}
205+
206+
// Quit fetcher, txsyncLoop.
207+
close(pm.quitSync)
208+
209+
// Disconnect existing sessions.
210+
// This also closes the gate for any new registrations on the peer set.
211+
// sessions which are already established but not added to pm.peers yet
212+
// will exit when they try to register.
213+
pm.peers.Close()
214+
215+
// Wait for all peer handler goroutines and the loops to come down.
200216
pm.wg.Wait()
201217

202218
glog.V(logger.Info).Infoln("Ethereum protocol handler stopped")

eth/helper_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,14 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te
140140
// Start the peer on a new thread
141141
errc := make(chan error, 1)
142142
go func() {
143-
pm.newPeerCh <- peer
144-
errc <- pm.handle(peer)
143+
select {
144+
case pm.newPeerCh <- peer:
145+
errc <- pm.handle(peer)
146+
case <-pm.quitSync:
147+
errc <- p2p.DiscQuitting
148+
}
145149
}()
146-
tp := &testPeer{
147-
app: app,
148-
net: net,
149-
peer: peer,
150-
}
150+
tp := &testPeer{app: app, net: net, peer: peer}
151151
// Execute any implicitly requested handshakes and return
152152
if shake {
153153
td, head, genesis := pm.blockchain.Status()

eth/peer.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
)
3535

3636
var (
37+
errClosed = errors.New("peer set is closed")
3738
errAlreadyRegistered = errors.New("peer is already registered")
3839
errNotRegistered = errors.New("peer is not registered")
3940
)
@@ -351,8 +352,9 @@ func (p *peer) String() string {
351352
// peerSet represents the collection of active peers currently participating in
352353
// the Ethereum sub-protocol.
353354
type peerSet struct {
354-
peers map[string]*peer
355-
lock sync.RWMutex
355+
peers map[string]*peer
356+
lock sync.RWMutex
357+
closed bool
356358
}
357359

358360
// newPeerSet creates a new peer set to track the active participants.
@@ -368,6 +370,9 @@ func (ps *peerSet) Register(p *peer) error {
368370
ps.lock.Lock()
369371
defer ps.lock.Unlock()
370372

373+
if ps.closed {
374+
return errClosed
375+
}
371376
if _, ok := ps.peers[p.id]; ok {
372377
return errAlreadyRegistered
373378
}
@@ -450,3 +455,15 @@ func (ps *peerSet) BestPeer() *peer {
450455
}
451456
return bestPeer
452457
}
458+
459+
// Close disconnects all peers.
460+
// No new peers can be registered after Close has returned.
461+
func (ps *peerSet) Close() {
462+
ps.lock.Lock()
463+
defer ps.lock.Unlock()
464+
465+
for _, p := range ps.peers {
466+
p.Disconnect(p2p.DiscQuitting)
467+
}
468+
ps.closed = true
469+
}

eth/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func (pm *ProtocolManager) syncer() {
148148
// Force a sync even if not enough peers are present
149149
go pm.synchronise(pm.peers.BestPeer())
150150

151-
case <-pm.quitSync:
151+
case <-pm.noMorePeers:
152152
return
153153
}
154154
}

miner/worker.go

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,13 @@ type worker struct {
9494

9595
mu sync.Mutex
9696

97+
// update loop
98+
mux *event.TypeMux
99+
events event.Subscription
100+
wg sync.WaitGroup
101+
97102
agents map[Agent]struct{}
98103
recv chan *Result
99-
mux *event.TypeMux
100-
quit chan struct{}
101104
pow pow.PoW
102105

103106
eth core.Backend
@@ -138,13 +141,14 @@ func newWorker(config *core.ChainConfig, coinbase common.Address, eth core.Backe
138141
possibleUncles: make(map[common.Hash]*types.Block),
139142
coinbase: coinbase,
140143
txQueue: make(map[common.Hash]*types.Transaction),
141-
quit: make(chan struct{}),
142144
agents: make(map[Agent]struct{}),
143145
fullValidation: false,
144146
}
147+
worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
148+
worker.wg.Add(1)
145149
go worker.update()
146-
go worker.wait()
147150

151+
go worker.wait()
148152
worker.commitNewWork()
149153

150154
return worker
@@ -184,9 +188,12 @@ func (self *worker) start() {
184188
}
185189

186190
func (self *worker) stop() {
191+
// Quit update.
192+
self.events.Unsubscribe()
193+
self.wg.Wait()
194+
187195
self.mu.Lock()
188196
defer self.mu.Unlock()
189-
190197
if atomic.LoadInt32(&self.mining) == 1 {
191198
// Stop all agents.
192199
for agent := range self.agents {
@@ -217,36 +224,23 @@ func (self *worker) unregister(agent Agent) {
217224
}
218225

219226
func (self *worker) update() {
220-
eventSub := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
221-
defer eventSub.Unsubscribe()
222-
223-
eventCh := eventSub.Chan()
224-
for {
225-
select {
226-
case event, ok := <-eventCh:
227-
if !ok {
228-
// Event subscription closed, set the channel to nil to stop spinning
229-
eventCh = nil
230-
continue
231-
}
232-
// A real event arrived, process interesting content
233-
switch ev := event.Data.(type) {
234-
case core.ChainHeadEvent:
235-
self.commitNewWork()
236-
case core.ChainSideEvent:
237-
self.uncleMu.Lock()
238-
self.possibleUncles[ev.Block.Hash()] = ev.Block
239-
self.uncleMu.Unlock()
240-
case core.TxPreEvent:
241-
// Apply transaction to the pending state if we're not mining
242-
if atomic.LoadInt32(&self.mining) == 0 {
243-
self.currentMu.Lock()
244-
self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain)
245-
self.currentMu.Unlock()
246-
}
227+
defer self.wg.Done()
228+
for event := range self.events.Chan() {
229+
// A real event arrived, process interesting content
230+
switch ev := event.Data.(type) {
231+
case core.ChainHeadEvent:
232+
self.commitNewWork()
233+
case core.ChainSideEvent:
234+
self.uncleMu.Lock()
235+
self.possibleUncles[ev.Block.Hash()] = ev.Block
236+
self.uncleMu.Unlock()
237+
case core.TxPreEvent:
238+
// Apply transaction to the pending state if we're not mining
239+
if atomic.LoadInt32(&self.mining) == 0 {
240+
self.currentMu.Lock()
241+
self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain)
242+
self.currentMu.Unlock()
247243
}
248-
case <-self.quit:
249-
return
250244
}
251245
}
252246
}

0 commit comments

Comments
 (0)