Skip to content

Commit 2281362

Browse files
authored
Merge pull request #251 from blinklabs-io/fix/shutdown-sync-waitgroup
fix: use sync.WaitGroup to avoid random panics on shutdown
2 parents 54674b9 + 33d28e8 commit 2281362

File tree

3 files changed

+23
-0
lines changed

3 files changed

+23
-0
lines changed

muxer/muxer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type Muxer struct {
3535
sendMutex sync.Mutex
3636
startChan chan bool
3737
doneChan chan bool
38+
waitGroup sync.WaitGroup
3839
stopMutex sync.Mutex
3940
protocolSenders map[uint16]chan *Segment
4041
protocolReceivers map[uint16]chan *Segment
@@ -51,6 +52,7 @@ func New(conn net.Conn) *Muxer {
5152
protocolSenders: make(map[uint16]chan *Segment),
5253
protocolReceivers: make(map[uint16]chan *Segment),
5354
}
55+
m.waitGroup.Add(1)
5456
go m.readLoop()
5557
return m
5658
}
@@ -78,6 +80,8 @@ func (m *Muxer) Stop() {
7880
}
7981
// Close doneChan to signify that we're shutting down
8082
close(m.doneChan)
83+
// Wait for other goroutines to shutdown
84+
m.waitGroup.Wait()
8185
// Close protocol receive channels
8286
// We rely on the individual mini-protocols to close the sender channel
8387
for _, recvChan := range m.protocolReceivers {
@@ -116,7 +120,9 @@ func (m *Muxer) RegisterProtocol(protocolId uint16) (chan *Segment, chan *Segmen
116120
m.protocolSenders[protocolId] = senderChan
117121
m.protocolReceivers[protocolId] = receiverChan
118122
// Start Goroutine to handle outbound messages
123+
m.waitGroup.Add(1)
119124
go func() {
125+
defer m.waitGroup.Done()
120126
for {
121127
select {
122128
case _, ok := <-m.doneChan:
@@ -157,6 +163,7 @@ func (m *Muxer) Send(msg *Segment) error {
157163
// readLoop waits for incoming data on the connection, parses the segment, and passes it to the appropriate
158164
// protocol
159165
func (m *Muxer) readLoop() {
166+
defer m.waitGroup.Done()
160167
started := false
161168
for {
162169
// Break out of read loop if we're shutting down

ouroboros.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type Ouroboros struct {
3939
protoErrorChan chan error
4040
handshakeFinishedChan chan interface{}
4141
doneChan chan interface{}
42+
waitGroup sync.WaitGroup
4243
closeMutex sync.Mutex
4344
sendKeepAlives bool
4445
delayMuxerStart bool
@@ -137,6 +138,8 @@ func (o *Ouroboros) Close() error {
137138
return err
138139
}
139140
}
141+
// Wait for other goroutines to finish
142+
o.waitGroup.Wait()
140143
// Close channels
141144
close(o.errorChan)
142145
close(o.protoErrorChan)
@@ -196,7 +199,9 @@ func (o *Ouroboros) TxSubmission() *txsubmission.TxSubmission {
196199
func (o *Ouroboros) setupConnection() error {
197200
o.muxer = muxer.New(o.conn)
198201
// Start Goroutine to pass along errors from the muxer
202+
o.waitGroup.Add(1)
199203
go func() {
204+
defer o.waitGroup.Done()
200205
select {
201206
case <-o.doneChan:
202207
return
@@ -274,7 +279,9 @@ func (o *Ouroboros) setupConnection() error {
274279
protoOptions.Version = protoOptions.Version - protocolVersionNtCFlag
275280
}
276281
// Start Goroutine to pass along errors from the mini-protocols
282+
o.waitGroup.Add(1)
277283
go func() {
284+
defer o.waitGroup.Done()
278285
err, ok := <-o.protoErrorChan
279286
// The channel is closed, which means we're already shutting down
280287
if !ok {

protocol/protocol.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Protocol struct {
3030
recvReadyChan chan bool
3131
sendReadyChan chan bool
3232
doneChan chan bool
33+
waitGroup sync.WaitGroup
3334
stateTransitionTimer *time.Timer
3435
}
3536

@@ -102,11 +103,16 @@ func (p *Protocol) Start() {
102103
p.sendReadyChan = make(chan bool, 1)
103104
// Start goroutine to cleanup when shutting down
104105
go func() {
106+
// Wait for doneChan to be closed
105107
<-p.doneChan
108+
// Wait for all other goroutines to finish
109+
p.waitGroup.Wait()
110+
// Close channels
106111
close(p.sendQueueChan)
107112
close(p.sendStateQueueChan)
108113
close(p.recvReadyChan)
109114
close(p.sendReadyChan)
115+
// Cancel any timer
110116
if p.stateTransitionTimer != nil {
111117
// Stop timer and drain channel
112118
if !p.stateTransitionTimer.Stop() {
@@ -118,6 +124,7 @@ func (p *Protocol) Start() {
118124
// Set initial state
119125
p.setState(p.config.InitialState)
120126
// Start our send and receive Goroutines
127+
p.waitGroup.Add(2)
121128
go p.recvLoop()
122129
go p.sendLoop()
123130
}
@@ -149,6 +156,7 @@ func (p *Protocol) SendError(err error) {
149156
}
150157

151158
func (p *Protocol) sendLoop() {
159+
defer p.waitGroup.Done()
152160
var setNewState bool
153161
var newState State
154162
var err error
@@ -263,6 +271,7 @@ func (p *Protocol) sendLoop() {
263271
}
264272

265273
func (p *Protocol) recvLoop() {
274+
defer p.waitGroup.Done()
266275
leftoverData := false
267276
isResponse := false
268277
for {

0 commit comments

Comments
 (0)