Skip to content

Commit 1da19f7

Browse files
authored
Merge pull request #123 from cloudstruct/feature/blockfetch-client-server-split
feat: split blockfetch protocol into client and server
2 parents 8f479a2 + 5bcaf93 commit 1da19f7

File tree

4 files changed

+150
-95
lines changed

4 files changed

+150
-95
lines changed

cmd/go-ouroboros-network/chainsync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func testChainSync(f *globalFlags) {
132132
os.Exit(1)
133133
}
134134
o.ChainSync.Start()
135-
o.BlockFetch.Start()
135+
o.BlockFetch.Client.Start()
136136

137137
syncState.oConn = o
138138
syncState.readyForNextBlockChan = make(chan bool)
@@ -209,7 +209,7 @@ func chainSyncRollForwardHandler(blockType uint, blockData interface{}) error {
209209
blockSlot = h.Body.Slot
210210
blockHash, _ = hex.DecodeString(h.Id())
211211
}
212-
if err := syncState.oConn.BlockFetch.RequestRange([]interface{}{blockSlot, blockHash}, []interface{}{blockSlot, blockHash}); err != nil {
212+
if err := syncState.oConn.BlockFetch.Client.RequestRange([]interface{}{blockSlot, blockHash}, []interface{}{blockSlot, blockHash}); err != nil {
213213
fmt.Printf("error calling RequestRange: %s\n", err)
214214
return err
215215
}

protocol/blockfetch/blockfetch.go

Lines changed: 5 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package blockfetch
22

33
import (
4-
"fmt"
5-
"github.com/cloudstruct/go-cardano-ledger"
64
"github.com/cloudstruct/go-ouroboros-network/protocol"
7-
"github.com/cloudstruct/go-ouroboros-network/utils"
85
)
96

107
const (
@@ -65,8 +62,8 @@ var StateMap = protocol.StateMap{
6562
}
6663

6764
type BlockFetch struct {
68-
*protocol.Protocol
69-
config *Config
65+
Client *Client
66+
Server *Server
7067
}
7168

7269
type Config struct {
@@ -82,95 +79,10 @@ type NoBlocksFunc func() error
8279
type BlockFunc func(uint, interface{}) error
8380
type BatchDoneFunc func() error
8481

85-
func New(options protocol.ProtocolOptions, cfg *Config) *BlockFetch {
82+
func New(protoOptions protocol.ProtocolOptions, cfg *Config) *BlockFetch {
8683
b := &BlockFetch{
87-
config: cfg,
84+
Client: NewClient(protoOptions, cfg),
85+
Server: NewServer(protoOptions, cfg),
8886
}
89-
protoConfig := protocol.ProtocolConfig{
90-
Name: PROTOCOL_NAME,
91-
ProtocolId: PROTOCOL_ID,
92-
Muxer: options.Muxer,
93-
ErrorChan: options.ErrorChan,
94-
Mode: options.Mode,
95-
Role: options.Role,
96-
MessageHandlerFunc: b.messageHandler,
97-
MessageFromCborFunc: NewMsgFromCbor,
98-
StateMap: StateMap,
99-
InitialState: STATE_IDLE,
100-
}
101-
b.Protocol = protocol.New(protoConfig)
10287
return b
10388
}
104-
105-
func (b *BlockFetch) Start() {
106-
b.Protocol.Start()
107-
}
108-
109-
func (b *BlockFetch) messageHandler(msg protocol.Message, isResponse bool) error {
110-
var err error
111-
switch msg.Type() {
112-
case MESSAGE_TYPE_START_BATCH:
113-
err = b.handleStartBatch()
114-
case MESSAGE_TYPE_NO_BLOCKS:
115-
err = b.handleNoBlocks()
116-
case MESSAGE_TYPE_BLOCK:
117-
err = b.handleBlock(msg)
118-
case MESSAGE_TYPE_BATCH_DONE:
119-
err = b.handleBatchDone()
120-
default:
121-
err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type())
122-
}
123-
return err
124-
}
125-
126-
func (b *BlockFetch) RequestRange(start []interface{}, end []interface{}) error {
127-
msg := NewMsgRequestRange(start, end)
128-
return b.SendMessage(msg)
129-
}
130-
131-
func (b *BlockFetch) ClientDone() error {
132-
msg := NewMsgClientDone()
133-
return b.SendMessage(msg)
134-
}
135-
136-
func (b *BlockFetch) handleStartBatch() error {
137-
if b.config.StartBatchFunc == nil {
138-
return fmt.Errorf("received block-fetch StartBatch message but no callback function is defined")
139-
}
140-
// Call the user callback function
141-
return b.config.StartBatchFunc()
142-
}
143-
144-
func (b *BlockFetch) handleNoBlocks() error {
145-
if b.config.NoBlocksFunc == nil {
146-
return fmt.Errorf("received block-fetch NoBlocks message but no callback function is defined")
147-
}
148-
// Call the user callback function
149-
return b.config.NoBlocksFunc()
150-
}
151-
152-
func (b *BlockFetch) handleBlock(msgGeneric protocol.Message) error {
153-
if b.config.BlockFunc == nil {
154-
return fmt.Errorf("received block-fetch Block message but no callback function is defined")
155-
}
156-
msg := msgGeneric.(*MsgBlock)
157-
// Decode only enough to get the block type value
158-
var wrappedBlock WrappedBlock
159-
if _, err := utils.CborDecode(msg.WrappedBlock, &wrappedBlock); err != nil {
160-
return fmt.Errorf("%s: decode error: %s", PROTOCOL_NAME, err)
161-
}
162-
blk, err := ledger.NewBlockFromCbor(wrappedBlock.Type, wrappedBlock.RawBlock)
163-
if err != nil {
164-
return err
165-
}
166-
// Call the user callback function
167-
return b.config.BlockFunc(wrappedBlock.Type, blk)
168-
}
169-
170-
func (b *BlockFetch) handleBatchDone() error {
171-
if b.config.BatchDoneFunc == nil {
172-
return fmt.Errorf("received block-fetch BatchDone message but no callback function is defined")
173-
}
174-
// Call the user callback function
175-
return b.config.BatchDoneFunc()
176-
}

protocol/blockfetch/client.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package blockfetch
2+
3+
import (
4+
"fmt"
5+
"github.com/cloudstruct/go-cardano-ledger"
6+
"github.com/cloudstruct/go-ouroboros-network/protocol"
7+
"github.com/cloudstruct/go-ouroboros-network/utils"
8+
)
9+
10+
type Client struct {
11+
*protocol.Protocol
12+
config *Config
13+
}
14+
15+
func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
16+
c := &Client{
17+
config: cfg,
18+
}
19+
protoConfig := protocol.ProtocolConfig{
20+
Name: PROTOCOL_NAME,
21+
ProtocolId: PROTOCOL_ID,
22+
Muxer: protoOptions.Muxer,
23+
ErrorChan: protoOptions.ErrorChan,
24+
Mode: protoOptions.Mode,
25+
Role: protocol.ProtocolRoleClient,
26+
MessageHandlerFunc: c.messageHandler,
27+
MessageFromCborFunc: NewMsgFromCbor,
28+
StateMap: StateMap,
29+
InitialState: STATE_IDLE,
30+
}
31+
c.Protocol = protocol.New(protoConfig)
32+
return c
33+
}
34+
35+
func (c *Client) RequestRange(start []interface{}, end []interface{}) error {
36+
msg := NewMsgRequestRange(start, end)
37+
return c.SendMessage(msg)
38+
}
39+
40+
func (c *Client) ClientDone() error {
41+
msg := NewMsgClientDone()
42+
return c.SendMessage(msg)
43+
}
44+
45+
func (c *Client) messageHandler(msg protocol.Message, isResponse bool) error {
46+
var err error
47+
switch msg.Type() {
48+
case MESSAGE_TYPE_START_BATCH:
49+
err = c.handleStartBatch()
50+
case MESSAGE_TYPE_NO_BLOCKS:
51+
err = c.handleNoBlocks()
52+
case MESSAGE_TYPE_BLOCK:
53+
err = c.handleBlock(msg)
54+
case MESSAGE_TYPE_BATCH_DONE:
55+
err = c.handleBatchDone()
56+
default:
57+
err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type())
58+
}
59+
return err
60+
}
61+
62+
func (c *Client) handleStartBatch() error {
63+
if c.config.StartBatchFunc == nil {
64+
return fmt.Errorf("received block-fetch StartBatch message but no callback function is defined")
65+
}
66+
// Call the user callback function
67+
return c.config.StartBatchFunc()
68+
}
69+
70+
func (c *Client) handleNoBlocks() error {
71+
if c.config.NoBlocksFunc == nil {
72+
return fmt.Errorf("received block-fetch NoBlocks message but no callback function is defined")
73+
}
74+
// Call the user callback function
75+
return c.config.NoBlocksFunc()
76+
}
77+
78+
func (c *Client) handleBlock(msgGeneric protocol.Message) error {
79+
if c.config.BlockFunc == nil {
80+
return fmt.Errorf("received block-fetch Block message but no callback function is defined")
81+
}
82+
msg := msgGeneric.(*MsgBlock)
83+
// Decode only enough to get the block type value
84+
var wrappedBlock WrappedBlock
85+
if _, err := utils.CborDecode(msg.WrappedBlock, &wrappedBlock); err != nil {
86+
return fmt.Errorf("%s: decode error: %s", PROTOCOL_NAME, err)
87+
}
88+
blk, err := ledger.NewBlockFromCbor(wrappedBlock.Type, wrappedBlock.RawBlock)
89+
if err != nil {
90+
return err
91+
}
92+
// Call the user callback function
93+
return c.config.BlockFunc(wrappedBlock.Type, blk)
94+
}
95+
96+
func (c *Client) handleBatchDone() error {
97+
if c.config.BatchDoneFunc == nil {
98+
return fmt.Errorf("received block-fetch BatchDone message but no callback function is defined")
99+
}
100+
// Call the user callback function
101+
return c.config.BatchDoneFunc()
102+
}

protocol/blockfetch/server.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package blockfetch
2+
3+
import (
4+
"fmt"
5+
"github.com/cloudstruct/go-ouroboros-network/protocol"
6+
)
7+
8+
type Server struct {
9+
*protocol.Protocol
10+
config *Config
11+
}
12+
13+
func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server {
14+
s := &Server{
15+
config: cfg,
16+
}
17+
protoConfig := protocol.ProtocolConfig{
18+
Name: PROTOCOL_NAME,
19+
ProtocolId: PROTOCOL_ID,
20+
Muxer: protoOptions.Muxer,
21+
ErrorChan: protoOptions.ErrorChan,
22+
Mode: protoOptions.Mode,
23+
Role: protocol.ProtocolRoleServer,
24+
MessageHandlerFunc: s.messageHandler,
25+
MessageFromCborFunc: NewMsgFromCbor,
26+
StateMap: StateMap,
27+
InitialState: STATE_IDLE,
28+
}
29+
s.Protocol = protocol.New(protoConfig)
30+
return s
31+
}
32+
33+
func (s *Server) messageHandler(msg protocol.Message, isResponse bool) error {
34+
var err error
35+
// TODO: add cases for messages from client
36+
switch msg.Type() {
37+
default:
38+
err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type())
39+
}
40+
return err
41+
}

0 commit comments

Comments
 (0)