Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/blinklabs-io/gouroboros/protocol/chainsync"
"github.com/blinklabs-io/gouroboros/protocol/handshake"
"github.com/blinklabs-io/gouroboros/protocol/keepalive"
"github.com/blinklabs-io/gouroboros/protocol/leiosnotify"
"github.com/blinklabs-io/gouroboros/protocol/localstatequery"
"github.com/blinklabs-io/gouroboros/protocol/localtxmonitor"
"github.com/blinklabs-io/gouroboros/protocol/localtxsubmission"
Expand Down Expand Up @@ -84,6 +85,8 @@ type Connection struct {
handshake *handshake.Handshake
keepAlive *keepalive.KeepAlive
keepAliveConfig *keepalive.Config
leiosNotify *leiosnotify.LeiosNotify
leiosNotifyConfig *leiosnotify.Config
localStateQuery *localstatequery.LocalStateQuery
localStateQueryConfig *localstatequery.Config
localTxMonitor *localtxmonitor.LocalTxMonitor
Expand Down Expand Up @@ -206,6 +209,11 @@ func (c *Connection) KeepAlive() *keepalive.KeepAlive {
return c.keepAlive
}

// LeiosNotify returns the leios-notify protocol handler
func (c *Connection) LeiosNotify() *leiosnotify.LeiosNotify {
return c.leiosNotify
}

// LocalStateQuery returns the local-state-query protocol handler
func (c *Connection) LocalStateQuery() *localstatequery.LocalStateQuery {
return c.localStateQuery
Expand Down Expand Up @@ -396,6 +404,7 @@ func (c *Connection) setupConnection() error {
if versionNtN.EnablePeerSharingProtocol {
c.peerSharing = peersharing.New(protoOptions, c.peerSharingConfig)
}
c.leiosNotify = leiosnotify.New(protoOptions, c.leiosNotifyConfig)
// Start protocols
if !c.delayProtocolStart {
if (c.fullDuplex && handshakeFullDuplex) || !c.server {
Expand All @@ -408,6 +417,7 @@ func (c *Connection) setupConnection() error {
if c.peerSharing != nil {
c.peerSharing.Client.Start()
}
c.leiosNotify.Client.Start()
}
if (c.fullDuplex && handshakeFullDuplex) || c.server {
c.blockFetch.Server.Start()
Expand All @@ -419,6 +429,7 @@ func (c *Connection) setupConnection() error {
if c.peerSharing != nil {
c.peerSharing.Server.Start()
}
c.leiosNotify.Server.Start()
}
}
} else {
Expand Down
154 changes: 154 additions & 0 deletions protocol/leiosnotify/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2025 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leiosnotify

import (
"fmt"
"sync"

"github.com/blinklabs-io/gouroboros/protocol"
)

type Client struct {
*protocol.Protocol
config *Config
callbackContext CallbackContext
requestNextChan chan protocol.Message
onceStart sync.Once
onceStop sync.Once
}

func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
if cfg == nil {
tmpCfg := NewConfig()
cfg = &tmpCfg
}
c := &Client{
config: cfg,
requestNextChan: make(chan protocol.Message),
}
c.callbackContext = CallbackContext{
Client: c,
ConnectionId: protoOptions.ConnectionId,
}
// Update state map with timeout
stateMap := StateMap.Copy()
if entry, ok := stateMap[StateBusy]; ok {
entry.Timeout = c.config.Timeout
stateMap[StateBusy] = entry
}
// Configure underlying Protocol
protoConfig := protocol.ProtocolConfig{
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
Logger: protoOptions.Logger,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
MessageHandlerFunc: c.messageHandler,
MessageFromCborFunc: NewMsgFromCbor,
StateMap: stateMap,
InitialState: StateIdle,
}
c.Protocol = protocol.New(protoConfig)
return c
}

func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug("starting client protocol",
"component", "network",
"protocol", ProtocolName,
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
<-c.DoneChan()
close(c.requestNextChan)
}()
})
}

func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug("stopping client protocol",
"component", "network",
"protocol", ProtocolName,
"connection_id", c.callbackContext.ConnectionId.String(),
)
msg := NewMsgDone()
err = c.SendMessage(msg)
})
return err
}

// RequestNext fetches the next available notification. This function will block until a notification is received from the peer.
func (c *Client) RequestNext() (protocol.Message, error) {
msg := NewMsgNotificationRequestNext()
if err := c.SendMessage(msg); err != nil {
return nil, err
}
resp, ok := <-c.requestNextChan
if !ok {
return nil, protocol.ErrProtocolShuttingDown
}
return resp, nil
}

func (c *Client) messageHandler(msg protocol.Message) error {
var err error
switch msg.Type() {
case MessageTypeBlockAnnouncement:
err = c.handleBlockAnnouncement(msg)
case MessageTypeBlockOffer:
err = c.handleBlockOffer(msg)
case MessageTypeBlockTxsOffer:
err = c.handleBlockTxsOffer(msg)
case MessageTypeVotesOffer:
err = c.handleVotesOffer(msg)
default:
err = fmt.Errorf(
"%s: received unexpected message type %d",
ProtocolName,
msg.Type(),
)
}
return err
}

func (c *Client) handleBlockAnnouncement(msg protocol.Message) error {
c.requestNextChan <- msg
return nil
}

func (c *Client) handleBlockOffer(msg protocol.Message) error {
c.requestNextChan <- msg
return nil
}

func (c *Client) handleBlockTxsOffer(msg protocol.Message) error {
c.requestNextChan <- msg
return nil
}

func (c *Client) handleVotesOffer(msg protocol.Message) error {
c.requestNextChan <- msg
return nil
}
128 changes: 128 additions & 0 deletions protocol/leiosnotify/leiosnotify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2025 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leiosnotify

import (
"time"

"github.com/blinklabs-io/gouroboros/connection"
"github.com/blinklabs-io/gouroboros/protocol"
)

const (
ProtocolName = "leios-notify"
ProtocolId uint16 = 998 // NOTE: this is a dummy value and will need to be changed
)

var (
StateIdle = protocol.NewState(1, "Idle")
StateBusy = protocol.NewState(2, "Busy")
StateDone = protocol.NewState(4, "Done")
)

var StateMap = protocol.StateMap{
StateIdle: protocol.StateMapEntry{
Agency: protocol.AgencyClient,
Transitions: []protocol.StateTransition{
{
MsgType: MessageTypeNotificationRequestNext,
NewState: StateBusy,
},
{
MsgType: MessageTypeDone,
NewState: StateDone,
},
},
},
StateBusy: protocol.StateMapEntry{
Agency: protocol.AgencyServer,
Transitions: []protocol.StateTransition{
{
MsgType: MessageTypeBlockAnnouncement,
NewState: StateIdle,
},
{
MsgType: MessageTypeBlockOffer,
NewState: StateIdle,
},
{
MsgType: MessageTypeBlockTxsOffer,
NewState: StateIdle,
},
{
MsgType: MessageTypeVotesOffer,
NewState: StateIdle,
},
},
},
StateDone: protocol.StateMapEntry{
Agency: protocol.AgencyNone,
},
}

type LeiosNotify struct {
Client *Client
Server *Server
}

type Config struct {
RequestNextFunc RequestNextFunc
Timeout time.Duration
}

// Callback context
type CallbackContext struct {
ConnectionId connection.ConnectionId
Client *Client
Server *Server
}

// Callback function types
type (
RequestNextFunc func(CallbackContext) (protocol.Message, error)
)

func New(protoOptions protocol.ProtocolOptions, cfg *Config) *LeiosNotify {
b := &LeiosNotify{
Client: NewClient(protoOptions, cfg),
Server: NewServer(protoOptions, cfg),
}
return b
}

type LeiosNotifyOptionFunc func(*Config)

func NewConfig(options ...LeiosNotifyOptionFunc) Config {
c := Config{
Timeout: 60 * time.Second,
}
// Apply provided options functions
for _, option := range options {
option(&c)
}
return c
}

func WithRequestNextFunc(requestNextFunc RequestNextFunc) LeiosNotifyOptionFunc {
return func(c *Config) {
c.RequestNextFunc = requestNextFunc
}
}

func WithTimeout(timeout time.Duration) LeiosNotifyOptionFunc {
return func(c *Config) {
c.Timeout = timeout
}
}
Loading
Loading