Skip to content

Commit ab74d22

Browse files
authored
fix: move protocol cleanup routines to Start() (#531)
Fixes #529
1 parent cb6c052 commit ab74d22

File tree

7 files changed

+93
-50
lines changed

7 files changed

+93
-50
lines changed

protocol/blockfetch/client.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Client struct {
3232
startBatchResultChan chan error
3333
busyMutex sync.Mutex
3434
blockUseCallback bool
35+
onceStart sync.Once
3536
onceStop sync.Once
3637
}
3738

@@ -69,15 +70,21 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
6970
InitialState: StateIdle,
7071
}
7172
c.Protocol = protocol.New(protoConfig)
72-
// Start goroutine to cleanup resources on protocol shutdown
73-
go func() {
74-
<-c.Protocol.DoneChan()
75-
close(c.blockChan)
76-
close(c.startBatchResultChan)
77-
}()
7873
return c
7974
}
8075

76+
func (c *Client) Start() {
77+
c.onceStart.Do(func() {
78+
c.Protocol.Start()
79+
// Start goroutine to cleanup resources on protocol shutdown
80+
go func() {
81+
<-c.Protocol.DoneChan()
82+
close(c.blockChan)
83+
close(c.startBatchResultChan)
84+
}()
85+
})
86+
}
87+
8188
func (c *Client) Stop() error {
8289
var err error
8390
c.onceStop.Do(func() {

protocol/chainsync/client.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type Client struct {
3737
firstBlockChan chan common.Point
3838
wantIntersectPoint bool
3939
intersectPointChan chan common.Point
40+
onceStart sync.Once
4041
onceStop sync.Once
4142
}
4243

@@ -88,18 +89,24 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
8889
InitialState: stateIdle,
8990
}
9091
c.Protocol = protocol.New(protoConfig)
91-
// Start goroutine to cleanup resources on protocol shutdown
92-
go func() {
93-
<-c.Protocol.DoneChan()
94-
close(c.intersectResultChan)
95-
close(c.readyForNextBlockChan)
96-
close(c.currentTipChan)
97-
close(c.firstBlockChan)
98-
close(c.intersectPointChan)
99-
}()
10092
return c
10193
}
10294

95+
func (c *Client) Start() {
96+
c.onceStart.Do(func() {
97+
c.Protocol.Start()
98+
// Start goroutine to cleanup resources on protocol shutdown
99+
go func() {
100+
<-c.Protocol.DoneChan()
101+
close(c.intersectResultChan)
102+
close(c.readyForNextBlockChan)
103+
close(c.currentTipChan)
104+
close(c.firstBlockChan)
105+
close(c.intersectPointChan)
106+
}()
107+
})
108+
}
109+
103110
func (c *Client) messageHandler(msg protocol.Message) error {
104111
var err error
105112
switch msg.Type() {

protocol/keepalive/client.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,22 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
5858
InitialState: StateClient,
5959
}
6060
c.Protocol = protocol.New(protoConfig)
61-
// Start goroutine to cleanup resources on protocol shutdown
62-
go func() {
63-
<-c.Protocol.DoneChan()
64-
// Stop any existing timer
65-
c.timerMutex.Lock()
66-
if c.timer != nil {
67-
c.timer.Stop()
68-
}
69-
c.timerMutex.Unlock()
70-
}()
7161
return c
7262
}
7363

7464
func (c *Client) Start() {
7565
c.onceStart.Do(func() {
7666
c.Protocol.Start()
67+
// Start goroutine to cleanup resources on protocol shutdown
68+
go func() {
69+
<-c.Protocol.DoneChan()
70+
// Stop any existing timer
71+
c.timerMutex.Lock()
72+
if c.timer != nil {
73+
c.timer.Stop()
74+
}
75+
c.timerMutex.Unlock()
76+
}()
7777
c.sendKeepAlive()
7878
})
7979
}

protocol/localstatequery/client.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type Client struct {
3535
queryResultChan chan []byte
3636
acquireResultChan chan error
3737
currentEra int
38+
onceStart sync.Once
3839
}
3940

4041
// NewClient returns a new LocalStateQuery client object
@@ -82,15 +83,21 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
8283
c.enableGetRewardInfoPoolsBlock = true
8384
}
8485
c.Protocol = protocol.New(protoConfig)
85-
// Start goroutine to cleanup resources on protocol shutdown
86-
go func() {
87-
<-c.Protocol.DoneChan()
88-
close(c.queryResultChan)
89-
close(c.acquireResultChan)
90-
}()
9186
return c
9287
}
9388

89+
func (c *Client) Start() {
90+
c.onceStart.Do(func() {
91+
c.Protocol.Start()
92+
// Start goroutine to cleanup resources on protocol shutdown
93+
go func() {
94+
<-c.Protocol.DoneChan()
95+
close(c.queryResultChan)
96+
close(c.acquireResultChan)
97+
}()
98+
})
99+
}
100+
94101
func (c *Client) messageHandler(msg protocol.Message) error {
95102
var err error
96103
switch msg.Type() {

protocol/localtxmonitor/client.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Client struct {
3232
hasTxResultChan chan bool
3333
nextTxResultChan chan []byte
3434
getSizesResultChan chan MsgReplyGetSizesResult
35+
onceStart sync.Once
3536
onceStop sync.Once
3637
}
3738

@@ -72,17 +73,23 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
7273
InitialState: stateIdle,
7374
}
7475
c.Protocol = protocol.New(protoConfig)
75-
// Start goroutine to cleanup resources on protocol shutdown
76-
go func() {
77-
<-c.Protocol.DoneChan()
78-
close(c.acquireResultChan)
79-
close(c.hasTxResultChan)
80-
close(c.nextTxResultChan)
81-
close(c.getSizesResultChan)
82-
}()
8376
return c
8477
}
8578

79+
func (c *Client) Start() {
80+
c.onceStart.Do(func() {
81+
c.Protocol.Start()
82+
// Start goroutine to cleanup resources on protocol shutdown
83+
go func() {
84+
<-c.Protocol.DoneChan()
85+
close(c.acquireResultChan)
86+
close(c.hasTxResultChan)
87+
close(c.nextTxResultChan)
88+
close(c.getSizesResultChan)
89+
}()
90+
})
91+
}
92+
8693
func (c *Client) messageHandler(msg protocol.Message) error {
8794
var err error
8895
switch msg.Type() {

protocol/localtxsubmission/client.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Client struct {
2828
config *Config
2929
busyMutex sync.Mutex
3030
submitResultChan chan error
31+
onceStart sync.Once
3132
onceStop sync.Once
3233
}
3334

@@ -61,14 +62,20 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
6162
InitialState: stateIdle,
6263
}
6364
c.Protocol = protocol.New(protoConfig)
64-
// Start goroutine to cleanup resources on protocol shutdown
65-
go func() {
66-
<-c.Protocol.DoneChan()
67-
close(c.submitResultChan)
68-
}()
6965
return c
7066
}
7167

68+
func (c *Client) Start() {
69+
c.onceStart.Do(func() {
70+
c.Protocol.Start()
71+
// Start goroutine to cleanup resources on protocol shutdown
72+
go func() {
73+
<-c.Protocol.DoneChan()
74+
close(c.submitResultChan)
75+
}()
76+
})
77+
}
78+
7279
func (c *Client) messageHandler(msg protocol.Message) error {
7380
var err error
7481
switch msg.Type() {

protocol/txsubmission/server.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package txsubmission
1616

1717
import (
1818
"fmt"
19+
"sync"
1920

2021
"github.com/blinklabs-io/gouroboros/protocol"
2122
)
@@ -27,6 +28,7 @@ type Server struct {
2728
stateDone bool
2829
requestTxIdsResultChan chan []TxIdAndSize
2930
requestTxsResultChan chan []TxBody
31+
onceStart sync.Once
3032
}
3133

3234
func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server {
@@ -48,15 +50,21 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server {
4850
InitialState: stateInit,
4951
}
5052
s.Protocol = protocol.New(protoConfig)
51-
// Start goroutine to cleanup resources on protocol shutdown
52-
go func() {
53-
<-s.Protocol.DoneChan()
54-
close(s.requestTxIdsResultChan)
55-
close(s.requestTxsResultChan)
56-
}()
5753
return s
5854
}
5955

56+
func (s *Server) Start() {
57+
s.onceStart.Do(func() {
58+
s.Protocol.Start()
59+
// Start goroutine to cleanup resources on protocol shutdown
60+
go func() {
61+
<-s.Protocol.DoneChan()
62+
close(s.requestTxIdsResultChan)
63+
close(s.requestTxsResultChan)
64+
}()
65+
})
66+
}
67+
6068
func (s *Server) messageHandler(msg protocol.Message) error {
6169
var err error
6270
switch msg.Type() {

0 commit comments

Comments
 (0)