Skip to content

Commit 36845d7

Browse files
authored
feat: initial LeoisFetch protocol (#1209)
Signed-off-by: Aurora Gaffney <[email protected]>
1 parent d9fff7d commit 36845d7

File tree

6 files changed

+881
-2
lines changed

6 files changed

+881
-2
lines changed

connection.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/blinklabs-io/gouroboros/protocol/chainsync"
4040
"github.com/blinklabs-io/gouroboros/protocol/handshake"
4141
"github.com/blinklabs-io/gouroboros/protocol/keepalive"
42+
"github.com/blinklabs-io/gouroboros/protocol/leiosfetch"
4243
"github.com/blinklabs-io/gouroboros/protocol/leiosnotify"
4344
"github.com/blinklabs-io/gouroboros/protocol/localstatequery"
4445
"github.com/blinklabs-io/gouroboros/protocol/localtxmonitor"
@@ -85,6 +86,8 @@ type Connection struct {
8586
handshake *handshake.Handshake
8687
keepAlive *keepalive.KeepAlive
8788
keepAliveConfig *keepalive.Config
89+
leiosFetch *leiosfetch.LeiosFetch
90+
leiosFetchConfig *leiosfetch.Config
8891
leiosNotify *leiosnotify.LeiosNotify
8992
leiosNotifyConfig *leiosnotify.Config
9093
localStateQuery *localstatequery.LocalStateQuery
@@ -209,6 +212,11 @@ func (c *Connection) KeepAlive() *keepalive.KeepAlive {
209212
return c.keepAlive
210213
}
211214

215+
// LeiosFetch returns the leios-fetch protocol handler
216+
func (c *Connection) LeiosFetch() *leiosfetch.LeiosFetch {
217+
return c.leiosFetch
218+
}
219+
212220
// LeiosNotify returns the leios-notify protocol handler
213221
func (c *Connection) LeiosNotify() *leiosnotify.LeiosNotify {
214222
return c.leiosNotify
@@ -405,6 +413,7 @@ func (c *Connection) setupConnection() error {
405413
c.peerSharing = peersharing.New(protoOptions, c.peerSharingConfig)
406414
}
407415
c.leiosNotify = leiosnotify.New(protoOptions, c.leiosNotifyConfig)
416+
c.leiosFetch = leiosfetch.New(protoOptions, c.leiosFetchConfig)
408417
// Start protocols
409418
if !c.delayProtocolStart {
410419
if (c.fullDuplex && handshakeFullDuplex) || !c.server {
@@ -418,6 +427,7 @@ func (c *Connection) setupConnection() error {
418427
c.peerSharing.Client.Start()
419428
}
420429
c.leiosNotify.Client.Start()
430+
c.leiosFetch.Client.Start()
421431
}
422432
if (c.fullDuplex && handshakeFullDuplex) || c.server {
423433
c.blockFetch.Server.Start()
@@ -430,6 +440,7 @@ func (c *Connection) setupConnection() error {
430440
c.peerSharing.Server.Start()
431441
}
432442
c.leiosNotify.Server.Start()
443+
c.leiosFetch.Server.Start()
433444
}
434445
}
435446
} else {

protocol/leiosfetch/client.go

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
// Copyright 2025 Blink Labs Software
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package leiosfetch
16+
17+
import (
18+
"fmt"
19+
"sync"
20+
21+
"github.com/blinklabs-io/gouroboros/protocol"
22+
pcommon "github.com/blinklabs-io/gouroboros/protocol/common"
23+
)
24+
25+
type Client struct {
26+
*protocol.Protocol
27+
config *Config
28+
callbackContext CallbackContext
29+
onceStart sync.Once
30+
onceStop sync.Once
31+
blockResultChan chan protocol.Message
32+
blockTxsResultChan chan protocol.Message
33+
votesResultChan chan protocol.Message
34+
blockRangeResultChan chan protocol.Message
35+
}
36+
37+
func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
38+
if cfg == nil {
39+
tmpCfg := NewConfig()
40+
cfg = &tmpCfg
41+
}
42+
c := &Client{
43+
config: cfg,
44+
blockResultChan: make(chan protocol.Message),
45+
blockTxsResultChan: make(chan protocol.Message),
46+
votesResultChan: make(chan protocol.Message),
47+
blockRangeResultChan: make(chan protocol.Message),
48+
}
49+
c.callbackContext = CallbackContext{
50+
Client: c,
51+
ConnectionId: protoOptions.ConnectionId,
52+
}
53+
// Update state map with timeout
54+
stateMap := StateMap.Copy()
55+
if entry, ok := stateMap[StateBlock]; ok {
56+
entry.Timeout = c.config.Timeout
57+
stateMap[StateBlock] = entry
58+
}
59+
if entry, ok := stateMap[StateBlockTxs]; ok {
60+
entry.Timeout = c.config.Timeout
61+
stateMap[StateBlockTxs] = entry
62+
}
63+
if entry, ok := stateMap[StateVotes]; ok {
64+
entry.Timeout = c.config.Timeout
65+
stateMap[StateVotes] = entry
66+
}
67+
if entry, ok := stateMap[StateBlockRange]; ok {
68+
entry.Timeout = c.config.Timeout
69+
stateMap[StateBlockRange] = entry
70+
}
71+
// Configure underlying Protocol
72+
protoConfig := protocol.ProtocolConfig{
73+
Name: ProtocolName,
74+
ProtocolId: ProtocolId,
75+
Muxer: protoOptions.Muxer,
76+
Logger: protoOptions.Logger,
77+
ErrorChan: protoOptions.ErrorChan,
78+
Mode: protoOptions.Mode,
79+
Role: protocol.ProtocolRoleClient,
80+
MessageHandlerFunc: c.messageHandler,
81+
MessageFromCborFunc: NewMsgFromCbor,
82+
StateMap: stateMap,
83+
InitialState: StateIdle,
84+
}
85+
c.Protocol = protocol.New(protoConfig)
86+
return c
87+
}
88+
89+
func (c *Client) Start() {
90+
c.onceStart.Do(func() {
91+
c.Protocol.Logger().
92+
Debug("starting client protocol",
93+
"component", "network",
94+
"protocol", ProtocolName,
95+
"connection_id", c.callbackContext.ConnectionId.String(),
96+
)
97+
c.Protocol.Start()
98+
// Start goroutine to cleanup resources on protocol shutdown
99+
go func() {
100+
<-c.DoneChan()
101+
close(c.blockResultChan)
102+
close(c.blockTxsResultChan)
103+
close(c.votesResultChan)
104+
close(c.blockRangeResultChan)
105+
}()
106+
})
107+
}
108+
109+
func (c *Client) Stop() error {
110+
var err error
111+
c.onceStop.Do(func() {
112+
c.Protocol.Logger().
113+
Debug("stopping client protocol",
114+
"component", "network",
115+
"protocol", ProtocolName,
116+
"connection_id", c.callbackContext.ConnectionId.String(),
117+
)
118+
msg := NewMsgDone()
119+
err = c.SendMessage(msg)
120+
})
121+
return err
122+
}
123+
124+
// BlockRequest fetches the requested EB identified by the slot and Leios hash
125+
func (c *Client) BlockRequest(slot uint64, hash []byte) (protocol.Message, error) {
126+
msg := NewMsgBlockRequest(slot, hash)
127+
if err := c.SendMessage(msg); err != nil {
128+
return nil, err
129+
}
130+
resp, ok := <-c.blockResultChan
131+
if !ok {
132+
return nil, protocol.ErrProtocolShuttingDown
133+
}
134+
return resp, nil
135+
}
136+
137+
// BlockTxsRequest fetches the requested TXs identified by the slot, Leios hash, and TX bitmap
138+
func (c *Client) BlockTxsRequest(slot uint64, hash []byte, txBitmap [8]byte) (protocol.Message, error) {
139+
msg := NewMsgBlockTxsRequest(slot, hash, txBitmap)
140+
if err := c.SendMessage(msg); err != nil {
141+
return nil, err
142+
}
143+
resp, ok := <-c.blockTxsResultChan
144+
if !ok {
145+
return nil, protocol.ErrProtocolShuttingDown
146+
}
147+
return resp, nil
148+
}
149+
150+
// VotesRequest fetches the requested votes
151+
func (c *Client) VotesRequest(voteIds []MsgVotesRequestVoteId) (protocol.Message, error) {
152+
msg := NewMsgVotesRequest(voteIds)
153+
if err := c.SendMessage(msg); err != nil {
154+
return nil, err
155+
}
156+
resp, ok := <-c.votesResultChan
157+
if !ok {
158+
return nil, protocol.ErrProtocolShuttingDown
159+
}
160+
return resp, nil
161+
}
162+
163+
// BlockRangeRequest fetches a range of EBs and their TXs that are certified by RBs within the provided range.
164+
// This function will block until all EBs and TXs in the requested range have been received
165+
func (c *Client) BlockRangeRequest(start pcommon.Point, end pcommon.Point) ([]protocol.Message, error) {
166+
msg := NewMsgBlockRangeRequest(start, end)
167+
if err := c.SendMessage(msg); err != nil {
168+
return nil, err
169+
}
170+
ret := make([]protocol.Message, 0, 20)
171+
for {
172+
resp, ok := <-c.blockRangeResultChan
173+
if !ok {
174+
return nil, protocol.ErrProtocolShuttingDown
175+
}
176+
ret = append(ret, resp)
177+
if _, ok := resp.(*MsgLastBlockAndTxsInRange); ok {
178+
break
179+
}
180+
}
181+
return ret, nil
182+
}
183+
184+
func (c *Client) messageHandler(msg protocol.Message) error {
185+
var err error
186+
switch msg.Type() {
187+
case MessageTypeBlock:
188+
err = c.handleBlock(msg)
189+
case MessageTypeBlockTxs:
190+
err = c.handleBlockTxs(msg)
191+
case MessageTypeVotes:
192+
err = c.handleVotes(msg)
193+
case MessageTypeNextBlockAndTxsInRange:
194+
err = c.handleNextBlockAndTxsInRange(msg)
195+
case MessageTypeLastBlockAndTxsInRange:
196+
err = c.handleLastBlockAndTxsInRange(msg)
197+
default:
198+
err = fmt.Errorf(
199+
"%s: received unexpected message type %d",
200+
ProtocolName,
201+
msg.Type(),
202+
)
203+
}
204+
return err
205+
}
206+
207+
func (c *Client) handleBlock(msg protocol.Message) error {
208+
c.blockResultChan <- msg
209+
return nil
210+
}
211+
212+
func (c *Client) handleBlockTxs(msg protocol.Message) error {
213+
c.blockTxsResultChan <- msg
214+
return nil
215+
}
216+
217+
func (c *Client) handleVotes(msg protocol.Message) error {
218+
c.votesResultChan <- msg
219+
return nil
220+
}
221+
222+
func (c *Client) handleNextBlockAndTxsInRange(msg protocol.Message) error {
223+
c.blockRangeResultChan <- msg
224+
return nil
225+
}
226+
227+
func (c *Client) handleLastBlockAndTxsInRange(msg protocol.Message) error {
228+
c.blockRangeResultChan <- msg
229+
return nil
230+
}

0 commit comments

Comments
 (0)