Skip to content

Commit 6421b3c

Browse files
committed
feat: initial LeoisNotify protocol
This is an initial implementation of the LeiosNotify protocol as described in CIP-0164. Many of the details are fudged since the protocol is not fully specified in the CIP. Signed-off-by: Aurora Gaffney <[email protected]>
1 parent 3e80bd1 commit 6421b3c

File tree

5 files changed

+564
-0
lines changed

5 files changed

+564
-0
lines changed

connection.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/blinklabs-io/gouroboros/protocol/chainsync"
4040
"github.com/blinklabs-io/gouroboros/protocol/handshake"
4141
"github.com/blinklabs-io/gouroboros/protocol/keepalive"
42+
"github.com/blinklabs-io/gouroboros/protocol/leiosnotify"
4243
"github.com/blinklabs-io/gouroboros/protocol/localstatequery"
4344
"github.com/blinklabs-io/gouroboros/protocol/localtxmonitor"
4445
"github.com/blinklabs-io/gouroboros/protocol/localtxsubmission"
@@ -84,6 +85,8 @@ type Connection struct {
8485
handshake *handshake.Handshake
8586
keepAlive *keepalive.KeepAlive
8687
keepAliveConfig *keepalive.Config
88+
leiosNotify *leiosnotify.LeiosNotify
89+
leiosNotifyConfig *leiosnotify.Config
8790
localStateQuery *localstatequery.LocalStateQuery
8891
localStateQueryConfig *localstatequery.Config
8992
localTxMonitor *localtxmonitor.LocalTxMonitor
@@ -206,6 +209,11 @@ func (c *Connection) KeepAlive() *keepalive.KeepAlive {
206209
return c.keepAlive
207210
}
208211

212+
// LeiosNotify returns the leios-notify protocol handler
213+
func (c *Connection) LeiosNotify() *leiosnotify.LeiosNotify {
214+
return c.leiosNotify
215+
}
216+
209217
// LocalStateQuery returns the local-state-query protocol handler
210218
func (c *Connection) LocalStateQuery() *localstatequery.LocalStateQuery {
211219
return c.localStateQuery
@@ -396,6 +404,7 @@ func (c *Connection) setupConnection() error {
396404
if versionNtN.EnablePeerSharingProtocol {
397405
c.peerSharing = peersharing.New(protoOptions, c.peerSharingConfig)
398406
}
407+
c.leiosNotify = leiosnotify.New(protoOptions, c.leiosNotifyConfig)
399408
// Start protocols
400409
if !c.delayProtocolStart {
401410
if (c.fullDuplex && handshakeFullDuplex) || !c.server {
@@ -408,6 +417,7 @@ func (c *Connection) setupConnection() error {
408417
if c.peerSharing != nil {
409418
c.peerSharing.Client.Start()
410419
}
420+
c.leiosNotify.Client.Start()
411421
}
412422
if (c.fullDuplex && handshakeFullDuplex) || c.server {
413423
c.blockFetch.Server.Start()
@@ -419,6 +429,7 @@ func (c *Connection) setupConnection() error {
419429
if c.peerSharing != nil {
420430
c.peerSharing.Server.Start()
421431
}
432+
c.leiosNotify.Server.Start()
422433
}
423434
}
424435
} else {

protocol/leiosnotify/client.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright 2025 Blink Labs Software
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package leiosnotify
16+
17+
import (
18+
"fmt"
19+
"sync"
20+
21+
"github.com/blinklabs-io/gouroboros/protocol"
22+
)
23+
24+
type Client struct {
25+
*protocol.Protocol
26+
config *Config
27+
callbackContext CallbackContext
28+
requestNextChan chan protocol.Message
29+
onceStart sync.Once
30+
onceStop sync.Once
31+
}
32+
33+
func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
34+
if cfg == nil {
35+
tmpCfg := NewConfig()
36+
cfg = &tmpCfg
37+
}
38+
c := &Client{
39+
config: cfg,
40+
requestNextChan: make(chan protocol.Message),
41+
}
42+
c.callbackContext = CallbackContext{
43+
Client: c,
44+
ConnectionId: protoOptions.ConnectionId,
45+
}
46+
// Update state map with timeout
47+
stateMap := StateMap.Copy()
48+
if entry, ok := stateMap[StateBusy]; ok {
49+
entry.Timeout = c.config.Timeout
50+
stateMap[StateBusy] = entry
51+
}
52+
// Configure underlying Protocol
53+
protoConfig := protocol.ProtocolConfig{
54+
Name: ProtocolName,
55+
ProtocolId: ProtocolId,
56+
Muxer: protoOptions.Muxer,
57+
Logger: protoOptions.Logger,
58+
ErrorChan: protoOptions.ErrorChan,
59+
Mode: protoOptions.Mode,
60+
Role: protocol.ProtocolRoleClient,
61+
MessageHandlerFunc: c.messageHandler,
62+
MessageFromCborFunc: NewMsgFromCbor,
63+
StateMap: stateMap,
64+
InitialState: StateIdle,
65+
}
66+
c.Protocol = protocol.New(protoConfig)
67+
return c
68+
}
69+
70+
func (c *Client) Start() {
71+
c.onceStart.Do(func() {
72+
c.Protocol.Logger().
73+
Debug("starting client protocol",
74+
"component", "network",
75+
"protocol", ProtocolName,
76+
"connection_id", c.callbackContext.ConnectionId.String(),
77+
)
78+
c.Protocol.Start()
79+
// Start goroutine to cleanup resources on protocol shutdown
80+
go func() {
81+
<-c.DoneChan()
82+
close(c.requestNextChan)
83+
}()
84+
})
85+
}
86+
87+
func (c *Client) Stop() error {
88+
var err error
89+
c.onceStop.Do(func() {
90+
c.Protocol.Logger().
91+
Debug("stopping client protocol",
92+
"component", "network",
93+
"protocol", ProtocolName,
94+
"connection_id", c.callbackContext.ConnectionId.String(),
95+
)
96+
msg := NewMsgDone()
97+
err = c.SendMessage(msg)
98+
})
99+
return err
100+
}
101+
102+
// RequestNext fetches the next available notification. This function will block until a notification is received from the peer.
103+
func (c *Client) RequestNext() (protocol.Message, error) {
104+
msg := NewMsgNotificationRequestNext()
105+
if err := c.SendMessage(msg); err != nil {
106+
return nil, err
107+
}
108+
resp, ok := <-c.requestNextChan
109+
if !ok {
110+
return nil, protocol.ErrProtocolShuttingDown
111+
}
112+
return resp, nil
113+
}
114+
115+
func (c *Client) messageHandler(msg protocol.Message) error {
116+
var err error
117+
switch msg.Type() {
118+
case MessageTypeBlockAnnouncement:
119+
err = c.handleBlockAnnouncement(msg)
120+
case MessageTypeBlockOffer:
121+
err = c.handleBlockOffer(msg)
122+
case MessageTypeBlockTxsOffer:
123+
err = c.handleBlockTxsOffer(msg)
124+
case MessageTypeVotesOffer:
125+
err = c.handleVotesOffer(msg)
126+
default:
127+
err = fmt.Errorf(
128+
"%s: received unexpected message type %d",
129+
ProtocolName,
130+
msg.Type(),
131+
)
132+
}
133+
return err
134+
}
135+
136+
func (c *Client) handleBlockAnnouncement(msg protocol.Message) error {
137+
c.requestNextChan <- msg
138+
return nil
139+
}
140+
141+
func (c *Client) handleBlockOffer(msg protocol.Message) error {
142+
c.requestNextChan <- msg
143+
return nil
144+
}
145+
146+
func (c *Client) handleBlockTxsOffer(msg protocol.Message) error {
147+
c.requestNextChan <- msg
148+
return nil
149+
}
150+
151+
func (c *Client) handleVotesOffer(msg protocol.Message) error {
152+
c.requestNextChan <- msg
153+
return nil
154+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright 2024 Blink Labs Software
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package leiosnotify
16+
17+
import (
18+
"time"
19+
20+
"github.com/blinklabs-io/gouroboros/connection"
21+
"github.com/blinklabs-io/gouroboros/protocol"
22+
)
23+
24+
const (
25+
ProtocolName = "leios-notify"
26+
ProtocolId uint16 = 998 // NOTE: this is a dummy value and will need to be changed
27+
)
28+
29+
var (
30+
StateIdle = protocol.NewState(1, "Idle")
31+
StateBusy = protocol.NewState(2, "Busy")
32+
StateDone = protocol.NewState(4, "Done")
33+
)
34+
35+
var StateMap = protocol.StateMap{
36+
StateIdle: protocol.StateMapEntry{
37+
Agency: protocol.AgencyClient,
38+
Transitions: []protocol.StateTransition{
39+
{
40+
MsgType: MessageTypeNotificationRequestNext,
41+
NewState: StateBusy,
42+
},
43+
{
44+
MsgType: MessageTypeDone,
45+
NewState: StateDone,
46+
},
47+
},
48+
},
49+
StateBusy: protocol.StateMapEntry{
50+
Agency: protocol.AgencyServer,
51+
Transitions: []protocol.StateTransition{
52+
{
53+
MsgType: MessageTypeBlockAnnouncement,
54+
NewState: StateIdle,
55+
},
56+
{
57+
MsgType: MessageTypeBlockOffer,
58+
NewState: StateIdle,
59+
},
60+
{
61+
MsgType: MessageTypeBlockTxsOffer,
62+
NewState: StateIdle,
63+
},
64+
{
65+
MsgType: MessageTypeVotesOffer,
66+
NewState: StateIdle,
67+
},
68+
},
69+
},
70+
StateDone: protocol.StateMapEntry{
71+
Agency: protocol.AgencyNone,
72+
},
73+
}
74+
75+
type LeiosNotify struct {
76+
Client *Client
77+
Server *Server
78+
}
79+
80+
type Config struct {
81+
RequestNextFunc RequestNextFunc
82+
Timeout time.Duration
83+
}
84+
85+
// Callback context
86+
type CallbackContext struct {
87+
ConnectionId connection.ConnectionId
88+
Client *Client
89+
Server *Server
90+
}
91+
92+
// Callback function types
93+
type (
94+
RequestNextFunc func(CallbackContext) (protocol.Message, error)
95+
)
96+
97+
func New(protoOptions protocol.ProtocolOptions, cfg *Config) *LeiosNotify {
98+
b := &LeiosNotify{
99+
Client: NewClient(protoOptions, cfg),
100+
Server: NewServer(protoOptions, cfg),
101+
}
102+
return b
103+
}
104+
105+
type LeiosNotifyOptionFunc func(*Config)
106+
107+
func NewConfig(options ...LeiosNotifyOptionFunc) Config {
108+
c := Config{
109+
Timeout: 60 * time.Second,
110+
}
111+
// Apply provided options functions
112+
for _, option := range options {
113+
option(&c)
114+
}
115+
return c
116+
}
117+
118+
func WithRequestNextFunc(requestNextFunc RequestNextFunc) LeiosNotifyOptionFunc {
119+
return func(c *Config) {
120+
c.RequestNextFunc = requestNextFunc
121+
}
122+
}
123+
124+
func WithTimeout(timeout time.Duration) LeiosNotifyOptionFunc {
125+
return func(c *Config) {
126+
c.Timeout = timeout
127+
}
128+
}

0 commit comments

Comments
 (0)