Skip to content

Commit d024f6b

Browse files
committed
feat!: require explicitly starting each protocol
This decentralizes the protocol configuration and reduces resource usage when not using all protocols. Fixes #87 BREAKING CHANGE: protocols must now be explicitly started before they can be used
1 parent 3b54dac commit d024f6b

File tree

11 files changed

+108
-105
lines changed

11 files changed

+108
-105
lines changed

cmd/go-ouroboros-network/chainsync.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,11 @@ func testChainSync(f *globalFlags) {
9797
conn := createClientConnection(f)
9898
errorChan := make(chan error)
9999
oOpts := &ouroboros.OuroborosOptions{
100-
Conn: conn,
101-
NetworkMagic: uint32(f.networkMagic),
102-
ErrorChan: errorChan,
103-
UseNodeToNodeProtocol: f.ntnProto,
104-
SendKeepAlives: true,
105-
ChainSyncCallbackConfig: buildChainSyncCallbackConfig(),
106-
BlockFetchCallbackConfig: buildBlockFetchCallbackConfig(),
100+
Conn: conn,
101+
NetworkMagic: uint32(f.networkMagic),
102+
ErrorChan: errorChan,
103+
UseNodeToNodeProtocol: f.ntnProto,
104+
SendKeepAlives: true,
107105
}
108106
go func() {
109107
for {
@@ -117,6 +115,8 @@ func testChainSync(f *globalFlags) {
117115
fmt.Printf("ERROR: %s\n", err)
118116
os.Exit(1)
119117
}
118+
o.ChainSync.Start(buildChainSyncCallbackConfig())
119+
o.BlockFetch.Start(buildBlockFetchCallbackConfig())
120120

121121
syncState.oConn = o
122122
syncState.readyForNextBlockChan = make(chan bool)

cmd/go-ouroboros-network/localtxsubmission.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,11 @@ func testLocalTxSubmission(f *globalFlags) {
5151
conn := createClientConnection(f)
5252
errorChan := make(chan error)
5353
oOpts := &ouroboros.OuroborosOptions{
54-
Conn: conn,
55-
NetworkMagic: uint32(f.networkMagic),
56-
ErrorChan: errorChan,
57-
UseNodeToNodeProtocol: f.ntnProto,
58-
SendKeepAlives: true,
59-
LocalTxSubmissionCallbackConfig: buildLocalTxSubmissionCallbackConfig(),
54+
Conn: conn,
55+
NetworkMagic: uint32(f.networkMagic),
56+
ErrorChan: errorChan,
57+
UseNodeToNodeProtocol: f.ntnProto,
58+
SendKeepAlives: true,
6059
}
6160
go func() {
6261
for {
@@ -70,6 +69,7 @@ func testLocalTxSubmission(f *globalFlags) {
7069
fmt.Printf("ERROR: %s\n", err)
7170
os.Exit(1)
7271
}
72+
o.LocalTxSubmission.Start(buildLocalTxSubmissionCallbackConfig())
7373

7474
txData, err := ioutil.ReadFile(localTxSubmissionFlags.txFile)
7575
if err != nil {

ouroboros.go

Lines changed: 31 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,51 +26,35 @@ type Ouroboros struct {
2626
sendKeepAlives bool
2727
delayMuxerStart bool
2828
// Mini-protocols
29-
Handshake *handshake.Handshake
30-
ChainSync *chainsync.ChainSync
31-
chainSyncCallbackConfig *chainsync.ChainSyncCallbackConfig
32-
BlockFetch *blockfetch.BlockFetch
33-
blockFetchCallbackConfig *blockfetch.BlockFetchCallbackConfig
34-
KeepAlive *keepalive.KeepAlive
35-
keepAliveCallbackConfig *keepalive.KeepAliveCallbackConfig
36-
LocalTxSubmission *localtxsubmission.LocalTxSubmission
37-
localTxSubmissionCallbackConfig *localtxsubmission.CallbackConfig
38-
LocalStateQuery *localstatequery.LocalStateQuery
39-
localStateQueryCallbackConfig *localstatequery.CallbackConfig
40-
TxSubmission *txsubmission.TxSubmission
41-
txSubmissionCallbackConfig *txsubmission.CallbackConfig
29+
Handshake *handshake.Handshake
30+
ChainSync *chainsync.ChainSync
31+
BlockFetch *blockfetch.BlockFetch
32+
KeepAlive *keepalive.KeepAlive
33+
LocalTxSubmission *localtxsubmission.LocalTxSubmission
34+
LocalStateQuery *localstatequery.LocalStateQuery
35+
TxSubmission *txsubmission.TxSubmission
4236
}
4337

4438
type OuroborosOptions struct {
45-
Conn net.Conn
46-
NetworkMagic uint32
47-
ErrorChan chan error
48-
Server bool
49-
UseNodeToNodeProtocol bool
50-
SendKeepAlives bool
51-
DelayMuxerStart bool
52-
ChainSyncCallbackConfig *chainsync.ChainSyncCallbackConfig
53-
BlockFetchCallbackConfig *blockfetch.BlockFetchCallbackConfig
54-
KeepAliveCallbackConfig *keepalive.KeepAliveCallbackConfig
55-
LocalTxSubmissionCallbackConfig *localtxsubmission.CallbackConfig
56-
LocalStateQueryCallbackConfig *localstatequery.CallbackConfig
39+
Conn net.Conn
40+
NetworkMagic uint32
41+
ErrorChan chan error
42+
Server bool
43+
UseNodeToNodeProtocol bool
44+
SendKeepAlives bool
45+
DelayMuxerStart bool
5746
}
5847

5948
func New(options *OuroborosOptions) (*Ouroboros, error) {
6049
o := &Ouroboros{
61-
conn: options.Conn,
62-
networkMagic: options.NetworkMagic,
63-
server: options.Server,
64-
useNodeToNodeProto: options.UseNodeToNodeProtocol,
65-
chainSyncCallbackConfig: options.ChainSyncCallbackConfig,
66-
blockFetchCallbackConfig: options.BlockFetchCallbackConfig,
67-
keepAliveCallbackConfig: options.KeepAliveCallbackConfig,
68-
localTxSubmissionCallbackConfig: options.LocalTxSubmissionCallbackConfig,
69-
localStateQueryCallbackConfig: options.LocalStateQueryCallbackConfig,
70-
ErrorChan: options.ErrorChan,
71-
sendKeepAlives: options.SendKeepAlives,
72-
delayMuxerStart: options.DelayMuxerStart,
73-
protoErrorChan: make(chan error, 10),
50+
conn: options.Conn,
51+
networkMagic: options.NetworkMagic,
52+
server: options.Server,
53+
useNodeToNodeProto: options.UseNodeToNodeProtocol,
54+
ErrorChan: options.ErrorChan,
55+
sendKeepAlives: options.SendKeepAlives,
56+
delayMuxerStart: options.DelayMuxerStart,
57+
protoErrorChan: make(chan error, 10),
7458
}
7559
if o.ErrorChan == nil {
7660
o.ErrorChan = make(chan error, 10)
@@ -147,6 +131,7 @@ func (o *Ouroboros) setupConnection() error {
147131
}
148132
// Perform handshake
149133
o.Handshake = handshake.New(protoOptions, protoVersions)
134+
o.Handshake.Start()
150135
// TODO: figure out better way to signify automatic handshaking and returning the chosen version
151136
if !o.server {
152137
err := o.Handshake.ProposeVersions(protoVersions, o.networkMagic)
@@ -178,22 +163,22 @@ func (o *Ouroboros) setupConnection() error {
178163
if o.useNodeToNodeProto {
179164
versionNtN := GetProtocolVersionNtN(o.Handshake.Version)
180165
protoOptions.Mode = protocol.ProtocolModeNodeToNode
181-
o.ChainSync = chainsync.New(protoOptions, o.chainSyncCallbackConfig)
182-
o.BlockFetch = blockfetch.New(protoOptions, o.blockFetchCallbackConfig)
183-
o.TxSubmission = txsubmission.New(protoOptions, o.txSubmissionCallbackConfig)
166+
o.ChainSync = chainsync.New(protoOptions)
167+
o.BlockFetch = blockfetch.New(protoOptions)
168+
o.TxSubmission = txsubmission.New(protoOptions)
184169
if versionNtN.EnableKeepAliveProtocol {
185-
o.KeepAlive = keepalive.New(protoOptions, o.keepAliveCallbackConfig)
170+
o.KeepAlive = keepalive.New(protoOptions)
186171
if o.sendKeepAlives {
187-
o.KeepAlive.Start()
172+
o.KeepAlive.Start(nil)
188173
}
189174
}
190175
} else {
191176
versionNtC := GetProtocolVersionNtC(o.Handshake.Version)
192177
protoOptions.Mode = protocol.ProtocolModeNodeToClient
193-
o.ChainSync = chainsync.New(protoOptions, o.chainSyncCallbackConfig)
194-
o.LocalTxSubmission = localtxsubmission.New(protoOptions, o.localTxSubmissionCallbackConfig)
178+
o.ChainSync = chainsync.New(protoOptions)
179+
o.LocalTxSubmission = localtxsubmission.New(protoOptions)
195180
if versionNtC.EnableLocalQueryProtocol {
196-
o.LocalStateQuery = localstatequery.New(protoOptions, o.localStateQueryCallbackConfig)
181+
o.LocalStateQuery = localstatequery.New(protoOptions)
197182
}
198183
}
199184
// Start muxer

protocol/blockfetch/blockfetch.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,8 @@ type BlockFetchNoBlocksFunc func() error
8282
type BlockFetchBlockFunc func(uint, interface{}) error
8383
type BlockFetchBatchDoneFunc func() error
8484

85-
func New(options protocol.ProtocolOptions, callbackConfig *BlockFetchCallbackConfig) *BlockFetch {
86-
b := &BlockFetch{
87-
callbackConfig: callbackConfig,
88-
}
85+
func New(options protocol.ProtocolOptions) *BlockFetch {
86+
b := &BlockFetch{}
8987
protoConfig := protocol.ProtocolConfig{
9088
Name: PROTOCOL_NAME,
9189
ProtocolId: PROTOCOL_ID,
@@ -102,6 +100,11 @@ func New(options protocol.ProtocolOptions, callbackConfig *BlockFetchCallbackCon
102100
return b
103101
}
104102

103+
func (b *BlockFetch) Start(callbackConfig *BlockFetchCallbackConfig) {
104+
b.callbackConfig = callbackConfig
105+
b.Protocol.Start()
106+
}
107+
105108
func (b *BlockFetch) messageHandler(msg protocol.Message, isResponse bool) error {
106109
var err error
107110
switch msg.Type() {

protocol/chainsync/chainsync.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ type ChainSyncIntersectFoundFunc func(interface{}, interface{}) error
108108
type ChainSyncIntersectNotFoundFunc func(interface{}) error
109109
type ChainSyncDoneFunc func() error
110110

111-
func New(options protocol.ProtocolOptions, callbackConfig *ChainSyncCallbackConfig) *ChainSync {
111+
func New(options protocol.ProtocolOptions) *ChainSync {
112112
// Use node-to-client protocol ID
113113
protocolId := PROTOCOL_ID_NTC
114114
msgFromCborFunc := NewMsgFromCborNtC
@@ -117,9 +117,7 @@ func New(options protocol.ProtocolOptions, callbackConfig *ChainSyncCallbackConf
117117
protocolId = PROTOCOL_ID_NTN
118118
msgFromCborFunc = NewMsgFromCborNtN
119119
}
120-
c := &ChainSync{
121-
callbackConfig: callbackConfig,
122-
}
120+
c := &ChainSync{}
123121
protoConfig := protocol.ProtocolConfig{
124122
Name: PROTOCOL_NAME,
125123
ProtocolId: protocolId,
@@ -136,6 +134,11 @@ func New(options protocol.ProtocolOptions, callbackConfig *ChainSyncCallbackConf
136134
return c
137135
}
138136

137+
func (c *ChainSync) Start(callbackConfig *ChainSyncCallbackConfig) {
138+
c.callbackConfig = callbackConfig
139+
c.Protocol.Start()
140+
}
141+
139142
func (c *ChainSync) messageHandler(msg protocol.Message, isResponse bool) error {
140143
var err error
141144
switch msg.Type() {

protocol/handshake/handshake.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ func New(options protocol.ProtocolOptions, allowedVersions []uint16) *Handshake
7575
return h
7676
}
7777

78+
func (h *Handshake) Start() {
79+
h.Protocol.Start()
80+
}
81+
7882
func (h *Handshake) handleMessage(msg protocol.Message, isResponse bool) error {
7983
var err error
8084
switch msg.Type() {

protocol/keepalive/keepalive.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,8 @@ type KeepAliveFunc func(uint16) error
6565
type KeepAliveResponseFunc func(uint16) error
6666
type DoneFunc func() error
6767

68-
func New(options protocol.ProtocolOptions, callbackConfig *KeepAliveCallbackConfig) *KeepAlive {
69-
k := &KeepAlive{
70-
callbackConfig: callbackConfig,
71-
}
68+
func New(options protocol.ProtocolOptions) *KeepAlive {
69+
k := &KeepAlive{}
7270
protoConfig := protocol.ProtocolConfig{
7371
Name: PROTOCOL_NAME,
7472
ProtocolId: PROTOCOL_ID,
@@ -85,6 +83,12 @@ func New(options protocol.ProtocolOptions, callbackConfig *KeepAliveCallbackConf
8583
return k
8684
}
8785

86+
func (k *KeepAlive) Start(callbackConfig *KeepAliveCallbackConfig) {
87+
k.callbackConfig = callbackConfig
88+
k.Protocol.Start()
89+
k.startTimer()
90+
}
91+
8892
func (k *KeepAlive) messageHandler(msg protocol.Message, isResponse bool) error {
8993
var err error
9094
switch msg.Type() {
@@ -100,22 +104,14 @@ func (k *KeepAlive) messageHandler(msg protocol.Message, isResponse bool) error
100104
return err
101105
}
102106

103-
func (k *KeepAlive) Start() {
107+
func (k *KeepAlive) startTimer() {
104108
k.timer = time.AfterFunc(KEEP_ALIVE_PERIOD*time.Second, func() {
105109
if err := k.KeepAlive(0); err != nil {
106110
k.SendError(err)
107111
}
108112
})
109113
}
110114

111-
func (k *KeepAlive) Stop() {
112-
if k.timer != nil {
113-
k.timer.Stop()
114-
}
115-
// Remove timer, since we check for its presence elsewhere
116-
k.timer = nil
117-
}
118-
119115
func (k *KeepAlive) KeepAlive(cookie uint16) error {
120116
msg := NewMsgKeepAlive(cookie)
121117
return k.SendMessage(msg)
@@ -137,7 +133,7 @@ func (k *KeepAlive) handleKeepAliveResponse(msgGeneric protocol.Message) error {
137133
msg := msgGeneric.(*MsgKeepAliveResponse)
138134
// Start the timer again if we had one previously
139135
if k.timer != nil {
140-
defer k.Start()
136+
defer k.startTimer()
141137
}
142138
if k.callbackConfig != nil && k.callbackConfig.KeepAliveResponseFunc != nil {
143139
// Call the user callback function

protocol/localstatequery/localstatequery.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,8 @@ type ReleaseFunc func() error
114114
type ReAcquireFunc func(interface{}) error
115115
type DoneFunc func() error
116116

117-
func New(options protocol.ProtocolOptions, callbackConfig *CallbackConfig) *LocalStateQuery {
118-
l := &LocalStateQuery{
119-
callbackConfig: callbackConfig,
120-
}
117+
func New(options protocol.ProtocolOptions) *LocalStateQuery {
118+
l := &LocalStateQuery{}
121119
protoConfig := protocol.ProtocolConfig{
122120
Name: PROTOCOL_NAME,
123121
ProtocolId: PROTOCOL_ID,
@@ -142,6 +140,11 @@ func New(options protocol.ProtocolOptions, callbackConfig *CallbackConfig) *Loca
142140
return l
143141
}
144142

143+
func (l *LocalStateQuery) Start(callbackConfig *CallbackConfig) {
144+
l.callbackConfig = callbackConfig
145+
l.Protocol.Start()
146+
}
147+
145148
func (l *LocalStateQuery) messageHandler(msg protocol.Message, isResponse bool) error {
146149
var err error
147150
switch msg.Type() {

protocol/localtxsubmission/localtxsubmission.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,8 @@ type AcceptTxFunc func() error
6262
type RejectTxFunc func(interface{}) error
6363
type DoneFunc func() error
6464

65-
func New(options protocol.ProtocolOptions, callbackConfig *CallbackConfig) *LocalTxSubmission {
66-
l := &LocalTxSubmission{
67-
callbackConfig: callbackConfig,
68-
}
65+
func New(options protocol.ProtocolOptions) *LocalTxSubmission {
66+
l := &LocalTxSubmission{}
6967
protoConfig := protocol.ProtocolConfig{
7068
Name: PROTOCOL_NAME,
7169
ProtocolId: PROTOCOL_ID,
@@ -82,6 +80,11 @@ func New(options protocol.ProtocolOptions, callbackConfig *CallbackConfig) *Loca
8280
return l
8381
}
8482

83+
func (l *LocalTxSubmission) Start(callbackConfig *CallbackConfig) {
84+
l.callbackConfig = callbackConfig
85+
l.Protocol.Start()
86+
}
87+
8588
func (l *LocalTxSubmission) messageHandler(msg protocol.Message, isResponse bool) error {
8689
var err error
8790
switch msg.Type() {

protocol/protocol.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,24 +70,27 @@ type MessageHandlerFunc func(Message, bool) error
7070
type MessageFromCborFunc func(uint, []byte) (Message, error)
7171

7272
func New(config ProtocolConfig) *Protocol {
73-
muxerSendChan, muxerRecvChan := config.Muxer.RegisterProtocol(config.ProtocolId)
7473
p := &Protocol{
75-
config: config,
76-
muxerSendChan: muxerSendChan,
77-
muxerRecvChan: muxerRecvChan,
78-
recvBuffer: bytes.NewBuffer(nil),
79-
sendQueueChan: make(chan Message, 50),
80-
sendStateQueueChan: make(chan Message, 50),
81-
recvReadyChan: make(chan bool, 1),
82-
sendReadyChan: make(chan bool, 1),
83-
doneChan: make(chan bool),
74+
config: config,
8475
}
76+
return p
77+
}
78+
79+
func (p *Protocol) Start() {
80+
// Register protocol with muxer
81+
p.muxerSendChan, p.muxerRecvChan = p.config.Muxer.RegisterProtocol(p.config.ProtocolId)
82+
// Create buffers and channels
83+
p.recvBuffer = bytes.NewBuffer(nil)
84+
p.sendQueueChan = make(chan Message, 50)
85+
p.sendStateQueueChan = make(chan Message, 50)
86+
p.recvReadyChan = make(chan bool, 1)
87+
p.sendReadyChan = make(chan bool, 1)
88+
p.doneChan = make(chan bool)
8589
// Set initial state
86-
p.setState(config.InitialState)
90+
p.setState(p.config.InitialState)
8791
// Start our send and receive Goroutines
8892
go p.recvLoop()
8993
go p.sendLoop()
90-
return p
9194
}
9295

9396
func (p *Protocol) Mode() ProtocolMode {

0 commit comments

Comments
 (0)