Skip to content

Commit 1bc1cd9

Browse files
committed
feat: finish basic tx-submission client implementation
Fixes #54
1 parent f46e609 commit 1bc1cd9

File tree

7 files changed

+185
-74
lines changed

7 files changed

+185
-74
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
/tx-monitor
99
/block-fetch
1010
/peer-sharing
11+
/tx-submission
1112

1213
# Test binary, built with `go test -c`
1314
*.test

cmd/tx-submission/main.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"fmt"
19+
"os"
20+
21+
ouroboros "github.com/blinklabs-io/gouroboros"
22+
"github.com/blinklabs-io/gouroboros/cmd/common"
23+
"github.com/blinklabs-io/gouroboros/protocol/txsubmission"
24+
)
25+
26+
type txSubmissionFlags struct {
27+
*common.GlobalFlags
28+
}
29+
30+
func main() {
31+
// Parse commandline
32+
f := txSubmissionFlags{
33+
GlobalFlags: common.NewGlobalFlags(),
34+
}
35+
f.Parse()
36+
// Create connection
37+
conn := common.CreateClientConnection(f.GlobalFlags)
38+
errorChan := make(chan error)
39+
go func() {
40+
for {
41+
err := <-errorChan
42+
fmt.Printf("ERROR(async): %s\n", err)
43+
os.Exit(1)
44+
}
45+
}()
46+
o, err := ouroboros.New(
47+
ouroboros.WithConnection(conn),
48+
ouroboros.WithNetworkMagic(uint32(f.NetworkMagic)),
49+
ouroboros.WithErrorChan(errorChan),
50+
ouroboros.WithNodeToNode(f.NtnProto),
51+
ouroboros.WithKeepAlive(true),
52+
ouroboros.WithTxSubmissionConfig(
53+
txsubmission.NewConfig(
54+
txsubmission.WithRequestTxIdsFunc(
55+
// TODO: do something more useful
56+
func(blocking bool, ack uint16, req uint16) ([]txsubmission.TxIdAndSize, error) {
57+
return []txsubmission.TxIdAndSize{}, nil
58+
},
59+
),
60+
txsubmission.WithRequestTxsFunc(
61+
// TODO: do something more useful
62+
func(txIds []txsubmission.TxId) ([]txsubmission.TxBody, error) {
63+
return []txsubmission.TxBody{}, nil
64+
},
65+
),
66+
),
67+
),
68+
)
69+
if err != nil {
70+
fmt.Printf("ERROR: %s\n", err)
71+
os.Exit(1)
72+
}
73+
74+
// Start the TxSubmission activity loop
75+
o.TxSubmission().Client.Init()
76+
77+
// Wait forever
78+
select {}
79+
}

protocol/txsubmission/client.go

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@ package txsubmission
1616

1717
import (
1818
"fmt"
19+
"sync"
20+
1921
"github.com/blinklabs-io/gouroboros/protocol"
2022
)
2123

2224
type Client struct {
2325
*protocol.Protocol
24-
config *Config
26+
config *Config
27+
onceInit sync.Once
2528
}
2629

2730
func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
@@ -34,36 +37,45 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
3437
}
3538
// Update state map with timeout
3639
stateMap := StateMap.Copy()
37-
if entry, ok := stateMap[STATE_IDLE]; ok {
40+
if entry, ok := stateMap[stateIdle]; ok {
3841
entry.Timeout = c.config.IdleTimeout
39-
stateMap[STATE_IDLE] = entry
42+
stateMap[stateIdle] = entry
4043
}
4144
// Configure underlying Protocol
4245
protoConfig := protocol.ProtocolConfig{
43-
Name: PROTOCOL_NAME,
44-
ProtocolId: PROTOCOL_ID,
46+
Name: ProtocolName,
47+
ProtocolId: ProtocolId,
4548
Muxer: protoOptions.Muxer,
4649
ErrorChan: protoOptions.ErrorChan,
4750
Mode: protoOptions.Mode,
4851
Role: protocol.ProtocolRoleClient,
4952
MessageHandlerFunc: c.messageHandler,
5053
MessageFromCborFunc: NewMsgFromCbor,
5154
StateMap: stateMap,
52-
InitialState: STATE_INIT,
55+
InitialState: stateInit,
5356
}
5457
c.Protocol = protocol.New(protoConfig)
5558
return c
5659
}
5760

61+
// Init tells the server to begin asking us for transactions
62+
func (c *Client) Init() {
63+
c.onceInit.Do(func() {
64+
// Send our Init message
65+
msg := NewMsgInit()
66+
_ = c.SendMessage(msg)
67+
})
68+
}
69+
5870
func (c *Client) messageHandler(msg protocol.Message, isResponse bool) error {
5971
var err error
6072
switch msg.Type() {
61-
case MESSAGE_TYPE_REQUEST_TX_IDS:
73+
case MessageTypeRequestTxIds:
6274
err = c.handleRequestTxIds(msg)
63-
case MESSAGE_TYPE_REQUEST_TXS:
75+
case MessageTypeRequestTxs:
6476
err = c.handleRequestTxs(msg)
6577
default:
66-
err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type())
78+
err = fmt.Errorf("%s: received unexpected message type %d", ProtocolName, msg.Type())
6779
}
6880
return err
6981
}
@@ -74,7 +86,15 @@ func (c *Client) handleRequestTxIds(msg protocol.Message) error {
7486
}
7587
msgRequestTxIds := msg.(*MsgRequestTxIds)
7688
// Call the user callback function
77-
return c.config.RequestTxIdsFunc(msgRequestTxIds.Blocking, msgRequestTxIds.Ack, msgRequestTxIds.Req)
89+
txIds, err := c.config.RequestTxIdsFunc(msgRequestTxIds.Blocking, msgRequestTxIds.Ack, msgRequestTxIds.Req)
90+
if err != nil {
91+
return err
92+
}
93+
resp := NewMsgReplyTxIds(txIds)
94+
if err := c.SendMessage(resp); err != nil {
95+
return err
96+
}
97+
return nil
7898
}
7999

80100
func (c *Client) handleRequestTxs(msg protocol.Message) error {
@@ -83,5 +103,13 @@ func (c *Client) handleRequestTxs(msg protocol.Message) error {
83103
}
84104
msgRequestTxs := msg.(*MsgRequestTxs)
85105
// Call the user callback function
86-
return c.config.RequestTxsFunc(msgRequestTxs.TxIds)
106+
txs, err := c.config.RequestTxsFunc(msgRequestTxs.TxIds)
107+
if err != nil {
108+
return err
109+
}
110+
resp := NewMsgReplyTxs(txs)
111+
if err := c.SendMessage(resp); err != nil {
112+
return err
113+
}
114+
return nil
87115
}

protocol/txsubmission/messages.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,37 +16,38 @@ package txsubmission
1616

1717
import (
1818
"fmt"
19+
1920
"github.com/blinklabs-io/gouroboros/cbor"
2021
"github.com/blinklabs-io/gouroboros/protocol"
2122
)
2223

2324
const (
24-
MESSAGE_TYPE_REQUEST_TX_IDS = 0
25-
MESSAGE_TYPE_REPLY_TX_IDS = 1
26-
MESSAGE_TYPE_REQUEST_TXS = 2
27-
MESSAGE_TYPE_REPLY_TXS = 3
28-
MESSAGE_TYPE_DONE = 4
29-
MESSAGE_TYPE_INIT = 6
25+
MessageTypeRequestTxIds = 0
26+
MessageTypeReplyTxIds = 1
27+
MessageTypeRequestTxs = 2
28+
MessageTypeReplyTxs = 3
29+
MessageTypeDone = 4
30+
MessageTypeInit = 6
3031
)
3132

3233
func NewMsgFromCbor(msgType uint, data []byte) (protocol.Message, error) {
3334
var ret protocol.Message
3435
switch msgType {
35-
case MESSAGE_TYPE_REQUEST_TX_IDS:
36+
case MessageTypeRequestTxIds:
3637
ret = &MsgRequestTxIds{}
37-
case MESSAGE_TYPE_REPLY_TX_IDS:
38+
case MessageTypeReplyTxIds:
3839
ret = &MsgReplyTxIds{}
39-
case MESSAGE_TYPE_REQUEST_TXS:
40+
case MessageTypeRequestTxs:
4041
ret = &MsgRequestTxs{}
41-
case MESSAGE_TYPE_REPLY_TXS:
42+
case MessageTypeReplyTxs:
4243
ret = &MsgReplyTxs{}
43-
case MESSAGE_TYPE_DONE:
44+
case MessageTypeDone:
4445
ret = &MsgDone{}
45-
case MESSAGE_TYPE_INIT:
46+
case MessageTypeInit:
4647
ret = &MsgInit{}
4748
}
4849
if _, err := cbor.Decode(data, ret); err != nil {
49-
return nil, fmt.Errorf("%s: decode error: %s", PROTOCOL_NAME, err)
50+
return nil, fmt.Errorf("%s: decode error: %s", ProtocolName, err)
5051
}
5152
if ret != nil {
5253
// Store the raw message CBOR
@@ -65,7 +66,7 @@ type MsgRequestTxIds struct {
6566
func NewMsgRequestTxIds(blocking bool, ack uint16, req uint16) *MsgRequestTxIds {
6667
m := &MsgRequestTxIds{
6768
MessageBase: protocol.MessageBase{
68-
MessageType: MESSAGE_TYPE_REQUEST_TX_IDS,
69+
MessageType: MessageTypeRequestTxIds,
6970
},
7071
Blocking: blocking,
7172
Ack: ack,
@@ -82,7 +83,7 @@ type MsgReplyTxIds struct {
8283
func NewMsgReplyTxIds(txIds []TxIdAndSize) *MsgReplyTxIds {
8384
m := &MsgReplyTxIds{
8485
MessageBase: protocol.MessageBase{
85-
MessageType: MESSAGE_TYPE_REPLY_TX_IDS,
86+
MessageType: MessageTypeReplyTxIds,
8687
},
8788
TxIds: txIds,
8889
}
@@ -97,7 +98,7 @@ type MsgRequestTxs struct {
9798
func NewMsgRequestTxs(txIds []TxId) *MsgRequestTxs {
9899
m := &MsgRequestTxs{
99100
MessageBase: protocol.MessageBase{
100-
MessageType: MESSAGE_TYPE_REQUEST_TXS,
101+
MessageType: MessageTypeRequestTxs,
101102
},
102103
TxIds: txIds,
103104
}
@@ -112,7 +113,7 @@ type MsgReplyTxs struct {
112113
func NewMsgReplyTxs(txs []TxBody) *MsgReplyTxs {
113114
m := &MsgReplyTxs{
114115
MessageBase: protocol.MessageBase{
115-
MessageType: MESSAGE_TYPE_REPLY_TXS,
116+
MessageType: MessageTypeReplyTxs,
116117
},
117118
Txs: txs,
118119
}
@@ -126,7 +127,7 @@ type MsgDone struct {
126127
func NewMsgDone() *MsgDone {
127128
m := &MsgDone{
128129
MessageBase: protocol.MessageBase{
129-
MessageType: MESSAGE_TYPE_DONE,
130+
MessageType: MessageTypeDone,
130131
},
131132
}
132133
return m
@@ -139,7 +140,7 @@ type MsgInit struct {
139140
func NewMsgInit() *MsgInit {
140141
m := &MsgInit{
141142
MessageBase: protocol.MessageBase{
142-
MessageType: MESSAGE_TYPE_INIT,
143+
MessageType: MessageTypeInit,
143144
},
144145
}
145146
return m

protocol/txsubmission/messages_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ package txsubmission
1616

1717
import (
1818
"encoding/hex"
19-
"github.com/blinklabs-io/gouroboros/cbor"
20-
"github.com/blinklabs-io/gouroboros/protocol"
2119
"reflect"
2220
"testing"
21+
22+
"github.com/blinklabs-io/gouroboros/cbor"
23+
"github.com/blinklabs-io/gouroboros/protocol"
2324
)
2425

2526
type testDefinition struct {
@@ -33,12 +34,12 @@ var tests = []testDefinition{
3334
{
3435
CborHex: "8104",
3536
Message: NewMsgDone(),
36-
MessageType: MESSAGE_TYPE_DONE,
37+
MessageType: MessageTypeDone,
3738
},
3839
{
3940
CborHex: "8106",
4041
Message: NewMsgInit(),
41-
MessageType: MESSAGE_TYPE_INIT,
42+
MessageType: MessageTypeInit,
4243
},
4344
}
4445

protocol/txsubmission/server.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package txsubmission
1616

1717
import (
1818
"fmt"
19+
1920
"github.com/blinklabs-io/gouroboros/protocol"
2021
)
2122

@@ -29,16 +30,16 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server {
2930
config: cfg,
3031
}
3132
protoConfig := protocol.ProtocolConfig{
32-
Name: PROTOCOL_NAME,
33-
ProtocolId: PROTOCOL_ID,
33+
Name: ProtocolName,
34+
ProtocolId: ProtocolId,
3435
Muxer: protoOptions.Muxer,
3536
ErrorChan: protoOptions.ErrorChan,
3637
Mode: protoOptions.Mode,
3738
Role: protocol.ProtocolRoleServer,
3839
MessageHandlerFunc: s.messageHandler,
3940
MessageFromCborFunc: NewMsgFromCbor,
4041
StateMap: StateMap,
41-
InitialState: STATE_INIT,
42+
InitialState: stateInit,
4243
}
4344
s.Protocol = protocol.New(protoConfig)
4445
return s
@@ -47,16 +48,16 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server {
4748
func (s *Server) messageHandler(msg protocol.Message, isResponse bool) error {
4849
var err error
4950
switch msg.Type() {
50-
case MESSAGE_TYPE_REPLY_TX_IDS:
51+
case MessageTypeReplyTxIds:
5152
err = s.handleReplyTxIds(msg)
52-
case MESSAGE_TYPE_REPLY_TXS:
53+
case MessageTypeReplyTxs:
5354
err = s.handleReplyTxs(msg)
54-
case MESSAGE_TYPE_DONE:
55+
case MessageTypeDone:
5556
err = s.handleDone()
56-
case MESSAGE_TYPE_INIT:
57+
case MessageTypeInit:
5758
err = s.handleInit()
5859
default:
59-
err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type())
60+
err = fmt.Errorf("%s: received unexpected message type %d", ProtocolName, msg.Type())
6061
}
6162
return err
6263
}

0 commit comments

Comments
 (0)