Skip to content

Commit 662f679

Browse files
authored
Merge pull request #21 from cloudstruct/feature/keep-alive
Implement keep-alive protocol
2 parents 512dd82 + 07d02ff commit 662f679

File tree

6 files changed

+227
-2
lines changed

6 files changed

+227
-2
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ but the node-to-node protocols will also be implemented in time.
2222
| TxSubmission | Not Implemented |
2323
| Local TxSubmission | Not Implemented |
2424
| Local State Query | Not Implemented |
25-
| Keep-Alive | Not Implemented |
25+
| Keep-Alive | Implemented |
2626

2727
## Testing
2828

cmd/go-ouroboros-network/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func main() {
7979
NetworkMagic: uint32(f.networkMagic),
8080
ErrorChan: errorChan,
8181
UseNodeToNodeProtocol: f.ntnProto,
82+
SendKeepAlives: true,
8283
ChainSyncCallbackConfig: buildChainSyncCallbackConfig(),
8384
BlockFetchCallbackConfig: buildBlockFetchCallbackConfig(),
8485
}

ouroboros.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"github.com/cloudstruct/go-ouroboros-network/protocol/blockfetch"
66
"github.com/cloudstruct/go-ouroboros-network/protocol/chainsync"
77
"github.com/cloudstruct/go-ouroboros-network/protocol/handshake"
8+
"github.com/cloudstruct/go-ouroboros-network/protocol/keepalive"
89
"io"
910
"net"
1011
)
@@ -17,12 +18,15 @@ type Ouroboros struct {
1718
handshakeComplete bool
1819
muxer *muxer.Muxer
1920
ErrorChan chan error
21+
sendKeepAlives bool
2022
// Mini-protocols
2123
Handshake *handshake.Handshake
2224
ChainSync *chainsync.ChainSync
2325
chainSyncCallbackConfig *chainsync.ChainSyncCallbackConfig
2426
BlockFetch *blockfetch.BlockFetch
2527
blockFetchCallbackConfig *blockfetch.BlockFetchCallbackConfig
28+
KeepAlive *keepalive.KeepAlive
29+
keepAliveCallbackConfig *keepalive.KeepAliveCallbackConfig
2630
}
2731

2832
type OuroborosOptions struct {
@@ -33,8 +37,10 @@ type OuroborosOptions struct {
3337
// for servers
3438
WaitForHandshake bool
3539
UseNodeToNodeProtocol bool
40+
SendKeepAlives bool
3641
ChainSyncCallbackConfig *chainsync.ChainSyncCallbackConfig
3742
BlockFetchCallbackConfig *blockfetch.BlockFetchCallbackConfig
43+
KeepAliveCallbackConfig *keepalive.KeepAliveCallbackConfig
3844
}
3945

4046
func New(options *OuroborosOptions) (*Ouroboros, error) {
@@ -45,7 +51,9 @@ func New(options *OuroborosOptions) (*Ouroboros, error) {
4551
useNodeToNodeProto: options.UseNodeToNodeProtocol,
4652
chainSyncCallbackConfig: options.ChainSyncCallbackConfig,
4753
blockFetchCallbackConfig: options.BlockFetchCallbackConfig,
54+
keepAliveCallbackConfig: options.KeepAliveCallbackConfig,
4855
ErrorChan: options.ErrorChan,
56+
sendKeepAlives: options.SendKeepAlives,
4957
}
5058
if o.ErrorChan == nil {
5159
o.ErrorChan = make(chan error, 10)
@@ -97,9 +105,15 @@ func (o *Ouroboros) setupConnection() error {
97105
o.handshakeComplete = <-o.Handshake.Finished
98106
// TODO: register additional mini-protocols
99107
if o.useNodeToNodeProto {
100-
//versionNtN := GetProtocolVersionNtN(o.Handshake.Version)
108+
versionNtN := GetProtocolVersionNtN(o.Handshake.Version)
101109
o.ChainSync = chainsync.New(o.muxer, o.ErrorChan, o.useNodeToNodeProto, o.chainSyncCallbackConfig)
102110
o.BlockFetch = blockfetch.New(o.muxer, o.ErrorChan, o.blockFetchCallbackConfig)
111+
if versionNtN.EnableKeepAliveProtocol {
112+
o.KeepAlive = keepalive.New(o.muxer, o.ErrorChan, o.keepAliveCallbackConfig)
113+
if o.sendKeepAlives {
114+
o.KeepAlive.Start()
115+
}
116+
}
103117
} else {
104118
//versionNtC := GetProtocolVersionNtC(o.Handshake.Version)
105119
o.ChainSync = chainsync.New(o.muxer, o.ErrorChan, o.useNodeToNodeProto, o.chainSyncCallbackConfig)

protocol/keepalive/keepalive.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package keepalive
2+
3+
import (
4+
"fmt"
5+
"github.com/cloudstruct/go-ouroboros-network/muxer"
6+
"github.com/cloudstruct/go-ouroboros-network/protocol"
7+
"time"
8+
)
9+
10+
const (
11+
PROTOCOL_NAME = "keep-alive"
12+
PROTOCOL_ID uint16 = 8
13+
14+
// Time between keep-alive probes, in seconds
15+
KEEP_ALIVE_PERIOD = 60
16+
)
17+
18+
var (
19+
STATE_CLIENT = protocol.NewState(1, "Client")
20+
STATE_SERVER = protocol.NewState(2, "Server")
21+
STATE_DONE = protocol.NewState(3, "Done")
22+
)
23+
24+
type KeepAlive struct {
25+
proto *protocol.Protocol
26+
callbackConfig *KeepAliveCallbackConfig
27+
timer *time.Timer
28+
}
29+
30+
type KeepAliveCallbackConfig struct {
31+
KeepAliveFunc KeepAliveFunc
32+
KeepAliveResponseFunc KeepAliveResponseFunc
33+
DoneFunc DoneFunc
34+
}
35+
36+
// Callback function types
37+
type KeepAliveFunc func(uint16) error
38+
type KeepAliveResponseFunc func(uint16) error
39+
type DoneFunc func() error
40+
41+
func New(m *muxer.Muxer, errorChan chan error, callbackConfig *KeepAliveCallbackConfig) *KeepAlive {
42+
k := &KeepAlive{
43+
callbackConfig: callbackConfig,
44+
}
45+
k.proto = protocol.New(PROTOCOL_NAME, PROTOCOL_ID, m, errorChan, k.messageHandler, NewMsgFromCbor)
46+
// Set initial state
47+
k.proto.SetState(STATE_CLIENT)
48+
return k
49+
}
50+
51+
func (k *KeepAlive) messageHandler(msg protocol.Message) error {
52+
var err error
53+
switch msg.Type() {
54+
case MESSAGE_TYPE_KEEP_ALIVE:
55+
err = k.handleKeepAlive(msg)
56+
case MESSAGE_TYPE_KEEP_ALIVE_RESPONSE:
57+
err = k.handleKeepAliveResponse(msg)
58+
case MESSAGE_TYPE_DONE:
59+
err = k.handleDone()
60+
default:
61+
err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type())
62+
}
63+
return err
64+
}
65+
66+
func (k *KeepAlive) Start() {
67+
k.timer = time.AfterFunc(KEEP_ALIVE_PERIOD*time.Second, func() {
68+
if err := k.KeepAlive(0); err != nil {
69+
k.proto.SendError(err)
70+
}
71+
})
72+
}
73+
74+
func (k *KeepAlive) Stop() {
75+
if k.timer != nil {
76+
k.timer.Stop()
77+
}
78+
// Remove timer, since we check for its presence elsewhere
79+
k.timer = nil
80+
}
81+
82+
func (k *KeepAlive) KeepAlive(cookie uint16) error {
83+
if err := k.proto.LockState([]protocol.State{STATE_CLIENT}); err != nil {
84+
return fmt.Errorf("%s: KeepAlive: protocol not in expected state", PROTOCOL_NAME)
85+
}
86+
msg := newMsgKeepAlive(cookie)
87+
// Unlock and change state when we're done
88+
defer k.proto.UnlockState(STATE_SERVER)
89+
// Send request
90+
return k.proto.SendMessage(msg, false)
91+
}
92+
93+
func (k *KeepAlive) handleKeepAlive(msgGeneric protocol.Message) error {
94+
if err := k.proto.LockState([]protocol.State{STATE_CLIENT}); err != nil {
95+
return fmt.Errorf("received keep-alive KeepAlive message when protocol not in expected state")
96+
}
97+
msg := msgGeneric.(*msgKeepAlive)
98+
// Unlock and change state when we're done
99+
defer k.proto.UnlockState(STATE_CLIENT)
100+
if k.callbackConfig != nil && k.callbackConfig.KeepAliveFunc != nil {
101+
// Call the user callback function
102+
return k.callbackConfig.KeepAliveFunc(msg.Cookie)
103+
} else {
104+
// Send the keep-alive response
105+
resp := newMsgKeepAliveResponse(msg.Cookie)
106+
return k.proto.SendMessage(resp, true)
107+
}
108+
}
109+
110+
func (k *KeepAlive) handleKeepAliveResponse(msgGeneric protocol.Message) error {
111+
if err := k.proto.LockState([]protocol.State{STATE_SERVER}); err != nil {
112+
return fmt.Errorf("received keep-alive KeepAliveResponse message when protocol not in expected state")
113+
}
114+
msg := msgGeneric.(*msgKeepAliveResponse)
115+
// Unlock and change state when we're done
116+
defer k.proto.UnlockState(STATE_CLIENT)
117+
// Start the timer again if we had one previously
118+
if k.timer != nil {
119+
defer k.Start()
120+
}
121+
if k.callbackConfig != nil && k.callbackConfig.KeepAliveResponseFunc != nil {
122+
// Call the user callback function
123+
return k.callbackConfig.KeepAliveResponseFunc(msg.Cookie)
124+
}
125+
return nil
126+
}
127+
128+
func (k *KeepAlive) handleDone() error {
129+
if err := k.proto.LockState([]protocol.State{STATE_CLIENT}); err != nil {
130+
return fmt.Errorf("received keep-alive Done message when protocol not in expected state")
131+
}
132+
// Unlock and change state when we're done
133+
defer k.proto.UnlockState(STATE_DONE)
134+
if k.callbackConfig != nil && k.callbackConfig.DoneFunc != nil {
135+
// Call the user callback function
136+
return k.callbackConfig.DoneFunc()
137+
}
138+
return nil
139+
}

protocol/keepalive/messages.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package keepalive
2+
3+
import (
4+
"fmt"
5+
"github.com/cloudstruct/go-ouroboros-network/protocol"
6+
"github.com/cloudstruct/go-ouroboros-network/utils"
7+
)
8+
9+
const (
10+
MESSAGE_TYPE_KEEP_ALIVE = 0
11+
MESSAGE_TYPE_KEEP_ALIVE_RESPONSE = 1
12+
MESSAGE_TYPE_DONE = 2
13+
)
14+
15+
func NewMsgFromCbor(msgType uint, data []byte) (protocol.Message, error) {
16+
var ret protocol.Message
17+
switch msgType {
18+
case MESSAGE_TYPE_KEEP_ALIVE:
19+
ret = &msgKeepAlive{}
20+
case MESSAGE_TYPE_KEEP_ALIVE_RESPONSE:
21+
ret = &msgKeepAliveResponse{}
22+
case MESSAGE_TYPE_DONE:
23+
ret = &msgDone{}
24+
}
25+
if _, err := utils.CborDecode(data, ret); err != nil {
26+
return nil, fmt.Errorf("%s: decode error: %s", PROTOCOL_NAME, err)
27+
}
28+
if ret != nil {
29+
// Store the raw message CBOR
30+
ret.SetCbor(data)
31+
}
32+
return ret, nil
33+
}
34+
35+
type msgKeepAlive struct {
36+
protocol.MessageBase
37+
Cookie uint16
38+
}
39+
40+
func newMsgKeepAlive(cookie uint16) *msgKeepAlive {
41+
msg := &msgKeepAlive{
42+
MessageBase: protocol.MessageBase{
43+
MessageType: MESSAGE_TYPE_KEEP_ALIVE,
44+
},
45+
Cookie: cookie,
46+
}
47+
return msg
48+
}
49+
50+
type msgKeepAliveResponse struct {
51+
protocol.MessageBase
52+
Cookie uint16
53+
}
54+
55+
func newMsgKeepAliveResponse(cookie uint16) *msgKeepAliveResponse {
56+
msg := &msgKeepAliveResponse{
57+
MessageBase: protocol.MessageBase{
58+
MessageType: MESSAGE_TYPE_KEEP_ALIVE_RESPONSE,
59+
},
60+
Cookie: cookie,
61+
}
62+
return msg
63+
}
64+
65+
type msgDone struct {
66+
protocol.MessageBase
67+
}

protocol/protocol.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ func (p *Protocol) SendMessage(msg interface{}, isResponse bool) error {
9797
return nil
9898
}
9999

100+
func (p *Protocol) SendError(err error) {
101+
p.errorChan <- err
102+
}
103+
100104
func (p *Protocol) recvLoop() {
101105
leftoverData := false
102106
for {

0 commit comments

Comments
 (0)