Skip to content

Commit 3066190

Browse files
authored
Merge pull request #269 from blinklabs-io/fix/prevent-multiple-start-stop-close
fix: protect stop/start/close functions from being called again
2 parents 09e5b3b + f8938aa commit 3066190

File tree

9 files changed

+166
-141
lines changed

9 files changed

+166
-141
lines changed

muxer/muxer.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,11 @@ type Muxer struct {
5050
startChan chan bool
5151
doneChan chan bool
5252
waitGroup sync.WaitGroup
53-
stopMutex sync.Mutex
5453
protocolSenders map[uint16]chan *Segment
5554
protocolReceivers map[uint16]chan *Segment
5655
diffusionMode DiffusionMode
56+
onceStart sync.Once
57+
onceStop sync.Once
5758
}
5859

5960
// New creates a new Muxer object and starts the read loop
@@ -77,32 +78,26 @@ func (m *Muxer) ErrorChan() chan error {
7778

7879
// Start unblocks the read loop after the initial handshake to allow it to start processing messages
7980
func (m *Muxer) Start() {
80-
m.startChan <- true
81+
m.onceStart.Do(func() {
82+
m.startChan <- true
83+
})
8184
}
8285

8386
// Stop shuts down the muxer
8487
func (m *Muxer) Stop() {
85-
// We use a mutex to prevent this function from being called multiple times
86-
// concurrently, which would cause a race condition
87-
m.stopMutex.Lock()
88-
defer m.stopMutex.Unlock()
89-
// Immediately return if we're already shutting down
90-
select {
91-
case <-m.doneChan:
92-
return
93-
default:
94-
}
95-
// Close doneChan to signify that we're shutting down
96-
close(m.doneChan)
97-
// Wait for other goroutines to shutdown
98-
m.waitGroup.Wait()
99-
// Close protocol receive channels
100-
// We rely on the individual mini-protocols to close the sender channel
101-
for _, recvChan := range m.protocolReceivers {
102-
close(recvChan)
103-
}
104-
// Close ErrorChan to signify to consumer that we're shutting down
105-
close(m.errorChan)
88+
m.onceStop.Do(func() {
89+
// Close doneChan to signify that we're shutting down
90+
close(m.doneChan)
91+
// Wait for other goroutines to shutdown
92+
m.waitGroup.Wait()
93+
// Close protocol receive channels
94+
// We rely on the individual mini-protocols to close the sender channel
95+
for _, recvChan := range m.protocolReceivers {
96+
close(recvChan)
97+
}
98+
// Close ErrorChan to signify to consumer that we're shutting down
99+
close(m.errorChan)
100+
})
106101
}
107102

108103
// SetDiffusionMode sets the muxer diffusion mode after the handshake completes
@@ -158,6 +153,12 @@ func (m *Muxer) RegisterProtocol(protocolId uint16) (chan *Segment, chan *Segmen
158153
// Send takes a populated Segment and writes it to the connection. A mutex is used to prevent more than
159154
// one protocol from sending at once
160155
func (m *Muxer) Send(msg *Segment) error {
156+
// Immediately return if we're already shutting down
157+
select {
158+
case <-m.doneChan:
159+
return fmt.Errorf("shutting down")
160+
default:
161+
}
161162
// We use a mutex to make sure only one protocol can send at a time
162163
m.sendMutex.Lock()
163164
defer m.sendMutex.Unlock()

ouroboros.go

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type Ouroboros struct {
5454
handshakeFinishedChan chan interface{}
5555
doneChan chan interface{}
5656
waitGroup sync.WaitGroup
57-
closeMutex sync.Mutex
57+
onceClose sync.Once
5858
sendKeepAlives bool
5959
delayMuxerStart bool
6060
fullDuplex bool
@@ -130,47 +130,40 @@ func (o *Ouroboros) Dial(proto string, address string) error {
130130

131131
// Close will shutdown the Ouroboros connection
132132
func (o *Ouroboros) Close() error {
133-
// We use a mutex to prevent this function from being called multiple times
134-
// concurrently, which would cause a race condition
135-
o.closeMutex.Lock()
136-
defer o.closeMutex.Unlock()
137-
// Immediately return if we're already shutting down
138-
select {
139-
case <-o.doneChan:
140-
return nil
141-
default:
142-
}
143-
// Close doneChan to signify that we're shutting down
144-
close(o.doneChan)
145-
// Gracefully stop the muxer
146-
if o.muxer != nil {
147-
o.muxer.Stop()
148-
}
149-
// Close the underlying connection
150-
if o.conn != nil {
151-
if err := o.conn.Close(); err != nil {
152-
return err
133+
var err error
134+
o.onceClose.Do(func() {
135+
// Close doneChan to signify that we're shutting down
136+
close(o.doneChan)
137+
// Gracefully stop the muxer
138+
if o.muxer != nil {
139+
o.muxer.Stop()
153140
}
154-
}
155-
// Wait for other goroutines to finish
156-
o.waitGroup.Wait()
157-
// Close channels
158-
close(o.errorChan)
159-
close(o.protoErrorChan)
160-
// We can only close a channel once, so we have to jump through a few hoops
161-
select {
162-
// The channel is either closed or has an item pending
163-
case _, ok := <-o.handshakeFinishedChan:
164-
// We successfully retrieved an item
165-
// This will probably never happen, but it doesn't hurt to cover this case
166-
if ok {
141+
// Close the underlying connection
142+
if o.conn != nil {
143+
if err = o.conn.Close(); err != nil {
144+
return
145+
}
146+
}
147+
// Wait for other goroutines to finish
148+
o.waitGroup.Wait()
149+
// Close channels
150+
close(o.errorChan)
151+
close(o.protoErrorChan)
152+
// We can only close a channel once, so we have to jump through a few hoops
153+
select {
154+
// The channel is either closed or has an item pending
155+
case _, ok := <-o.handshakeFinishedChan:
156+
// We successfully retrieved an item
157+
// This will probably never happen, but it doesn't hurt to cover this case
158+
if ok {
159+
close(o.handshakeFinishedChan)
160+
}
161+
// The channel is open and has no pending items
162+
default:
167163
close(o.handshakeFinishedChan)
168164
}
169-
// The channel is open and has no pending items
170-
default:
171-
close(o.handshakeFinishedChan)
172-
}
173-
return nil
165+
})
166+
return err
174167
}
175168

176169
// ChainSync returns the chain-sync protocol handler

protocol/blockfetch/client.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Client struct {
3232
startBatchResultChan chan error
3333
busyMutex sync.Mutex
3434
blockUseCallback bool
35+
onceStop sync.Once
3536
}
3637

3738
func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
@@ -77,8 +78,12 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
7778
}
7879

7980
func (c *Client) Stop() error {
80-
msg := NewMsgClientDone()
81-
return c.SendMessage(msg)
81+
var err error
82+
c.onceStop.Do(func() {
83+
msg := NewMsgClientDone()
84+
err = c.SendMessage(msg)
85+
})
86+
return err
8287
}
8388

8489
// GetBlockRange starts an async process to fetch all blocks in the specified range (inclusive)

protocol/chainsync/client.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type Client struct {
3535
currentTipChan chan Tip
3636
wantFirstBlock bool
3737
firstBlockChan chan common.Point
38+
onceStop sync.Once
3839
}
3940

4041
// NewClient returns a new ChainSync client object
@@ -116,13 +117,16 @@ func (c *Client) messageHandler(msg protocol.Message, isResponse bool) error {
116117

117118
// Stop transitions the protocol to the Done state. No more protocol operations will be possible afterward
118119
func (c *Client) Stop() error {
119-
c.busyMutex.Lock()
120-
defer c.busyMutex.Unlock()
121-
msg := NewMsgDone()
122-
if err := c.SendMessage(msg); err != nil {
123-
return err
124-
}
125-
return nil
120+
var err error
121+
c.onceStop.Do(func() {
122+
c.busyMutex.Lock()
123+
defer c.busyMutex.Unlock()
124+
msg := NewMsgDone()
125+
if err = c.SendMessage(msg); err != nil {
126+
return
127+
}
128+
})
129+
return err
126130
}
127131

128132
// GetCurrentTip returns the current chain tip

protocol/handshake/client.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@ package handshake
1616

1717
import (
1818
"fmt"
19+
"sync"
1920

2021
"github.com/blinklabs-io/gouroboros/protocol"
2122
)
2223

2324
// Client implements the Handshake client
2425
type Client struct {
2526
*protocol.Protocol
26-
config *Config
27+
config *Config
28+
onceStart sync.Once
2729
}
2830

2931
// NewClient returns a new Handshake client object
@@ -60,22 +62,24 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
6062

6163
// Start begins the handshake process
6264
func (c *Client) Start() {
63-
c.Protocol.Start()
64-
// Send our ProposeVersions message
65-
versionMap := make(map[uint16]interface{})
66-
diffusionMode := DiffusionModeInitiatorOnly
67-
if c.config.ClientFullDuplex {
68-
diffusionMode = DiffusionModeInitiatorAndResponder
69-
}
70-
for _, version := range c.config.ProtocolVersions {
71-
if c.Mode() == protocol.ProtocolModeNodeToNode {
72-
versionMap[version] = []interface{}{c.config.NetworkMagic, diffusionMode}
73-
} else {
74-
versionMap[version] = c.config.NetworkMagic
65+
c.onceStart.Do(func() {
66+
c.Protocol.Start()
67+
// Send our ProposeVersions message
68+
versionMap := make(map[uint16]interface{})
69+
diffusionMode := DiffusionModeInitiatorOnly
70+
if c.config.ClientFullDuplex {
71+
diffusionMode = DiffusionModeInitiatorAndResponder
7572
}
76-
}
77-
msg := NewMsgProposeVersions(versionMap)
78-
_ = c.SendMessage(msg)
73+
for _, version := range c.config.ProtocolVersions {
74+
if c.Mode() == protocol.ProtocolModeNodeToNode {
75+
versionMap[version] = []interface{}{c.config.NetworkMagic, diffusionMode}
76+
} else {
77+
versionMap[version] = c.config.NetworkMagic
78+
}
79+
}
80+
msg := NewMsgProposeVersions(versionMap)
81+
_ = c.SendMessage(msg)
82+
})
7983
}
8084

8185
func (c *Client) handleMessage(msg protocol.Message, isResponse bool) error {

protocol/keepalive/client.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@ package keepalive
1616

1717
import (
1818
"fmt"
19-
"github.com/blinklabs-io/gouroboros/protocol"
19+
"sync"
2020
"time"
21+
22+
"github.com/blinklabs-io/gouroboros/protocol"
2123
)
2224

2325
type Client struct {
2426
*protocol.Protocol
25-
config *Config
26-
timer *time.Timer
27+
config *Config
28+
timer *time.Timer
29+
onceStart sync.Once
2730
}
2831

2932
func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
@@ -68,8 +71,10 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
6871
}
6972

7073
func (c *Client) Start() {
71-
c.Protocol.Start()
72-
c.startTimer()
74+
c.onceStart.Do(func() {
75+
c.Protocol.Start()
76+
c.startTimer()
77+
})
7378
}
7479

7580
func (c *Client) startTimer() {

protocol/localtxmonitor/client.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ package localtxmonitor
1616

1717
import (
1818
"fmt"
19-
"github.com/blinklabs-io/gouroboros/protocol"
2019
"sync"
20+
21+
"github.com/blinklabs-io/gouroboros/protocol"
2122
)
2223

2324
// Client implements the LocalTxMonitor client
@@ -31,6 +32,7 @@ type Client struct {
3132
hasTxResultChan chan bool
3233
nextTxResultChan chan []byte
3334
getSizesResultChan chan MsgReplyGetSizesResult
35+
onceStop sync.Once
3436
}
3537

3638
// NewClient returns a new LocalTxMonitor client object
@@ -133,13 +135,16 @@ func (c *Client) Release() error {
133135

134136
// Stop transitions the protocol to the Done state. No more operations will be possible
135137
func (c *Client) Stop() error {
136-
c.busyMutex.Lock()
137-
defer c.busyMutex.Unlock()
138-
msg := NewMsgDone()
139-
if err := c.SendMessage(msg); err != nil {
140-
return err
141-
}
142-
return nil
138+
var err error
139+
c.onceStop.Do(func() {
140+
c.busyMutex.Lock()
141+
defer c.busyMutex.Unlock()
142+
msg := NewMsgDone()
143+
if err = c.SendMessage(msg); err != nil {
144+
return
145+
}
146+
})
147+
return err
143148
}
144149

145150
// HasTx returns whether or not the specified transaction ID exists in the mempool snapshot

protocol/localtxsubmission/client.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ package localtxsubmission
1616

1717
import (
1818
"fmt"
19+
"sync"
20+
1921
"github.com/blinklabs-io/gouroboros/ledger"
2022
"github.com/blinklabs-io/gouroboros/protocol"
21-
"sync"
2223
)
2324

2425
// Client implements the LocalTxSubmission client
@@ -27,6 +28,7 @@ type Client struct {
2728
config *Config
2829
busyMutex sync.Mutex
2930
submitResultChan chan error
31+
onceStop sync.Once
3032
}
3133

3234
// NewClient returns a new LocalTxSubmission client object
@@ -94,13 +96,16 @@ func (c *Client) SubmitTx(eraId uint16, tx []byte) error {
9496

9597
// Stop transitions the protocol to the Done state. No more operations will be possible
9698
func (c *Client) Stop() error {
97-
c.busyMutex.Lock()
98-
defer c.busyMutex.Unlock()
99-
msg := NewMsgDone()
100-
if err := c.SendMessage(msg); err != nil {
101-
return err
102-
}
103-
return nil
99+
var err error
100+
c.onceStop.Do(func() {
101+
c.busyMutex.Lock()
102+
defer c.busyMutex.Unlock()
103+
msg := NewMsgDone()
104+
if err = c.SendMessage(msg); err != nil {
105+
return
106+
}
107+
})
108+
return err
104109
}
105110

106111
func (c *Client) handleAcceptTx() error {

0 commit comments

Comments
 (0)