diff --git a/connection.go b/connection.go index 387a4ba5..93a59628 100644 --- a/connection.go +++ b/connection.go @@ -39,6 +39,7 @@ import ( "github.com/blinklabs-io/gouroboros/protocol/chainsync" "github.com/blinklabs-io/gouroboros/protocol/handshake" "github.com/blinklabs-io/gouroboros/protocol/keepalive" + "github.com/blinklabs-io/gouroboros/protocol/leiosnotify" "github.com/blinklabs-io/gouroboros/protocol/localstatequery" "github.com/blinklabs-io/gouroboros/protocol/localtxmonitor" "github.com/blinklabs-io/gouroboros/protocol/localtxsubmission" @@ -84,6 +85,8 @@ type Connection struct { handshake *handshake.Handshake keepAlive *keepalive.KeepAlive keepAliveConfig *keepalive.Config + leiosNotify *leiosnotify.LeiosNotify + leiosNotifyConfig *leiosnotify.Config localStateQuery *localstatequery.LocalStateQuery localStateQueryConfig *localstatequery.Config localTxMonitor *localtxmonitor.LocalTxMonitor @@ -206,6 +209,11 @@ func (c *Connection) KeepAlive() *keepalive.KeepAlive { return c.keepAlive } +// LeiosNotify returns the leios-notify protocol handler +func (c *Connection) LeiosNotify() *leiosnotify.LeiosNotify { + return c.leiosNotify +} + // LocalStateQuery returns the local-state-query protocol handler func (c *Connection) LocalStateQuery() *localstatequery.LocalStateQuery { return c.localStateQuery @@ -396,6 +404,7 @@ func (c *Connection) setupConnection() error { if versionNtN.EnablePeerSharingProtocol { c.peerSharing = peersharing.New(protoOptions, c.peerSharingConfig) } + c.leiosNotify = leiosnotify.New(protoOptions, c.leiosNotifyConfig) // Start protocols if !c.delayProtocolStart { if (c.fullDuplex && handshakeFullDuplex) || !c.server { @@ -408,6 +417,7 @@ func (c *Connection) setupConnection() error { if c.peerSharing != nil { c.peerSharing.Client.Start() } + c.leiosNotify.Client.Start() } if (c.fullDuplex && handshakeFullDuplex) || c.server { c.blockFetch.Server.Start() @@ -419,6 +429,7 @@ func (c *Connection) setupConnection() error { if c.peerSharing != nil { c.peerSharing.Server.Start() } + c.leiosNotify.Server.Start() } } } else { diff --git a/protocol/leiosnotify/client.go b/protocol/leiosnotify/client.go new file mode 100644 index 00000000..9d974a7d --- /dev/null +++ b/protocol/leiosnotify/client.go @@ -0,0 +1,154 @@ +// Copyright 2025 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leiosnotify + +import ( + "fmt" + "sync" + + "github.com/blinklabs-io/gouroboros/protocol" +) + +type Client struct { + *protocol.Protocol + config *Config + callbackContext CallbackContext + requestNextChan chan protocol.Message + onceStart sync.Once + onceStop sync.Once +} + +func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { + if cfg == nil { + tmpCfg := NewConfig() + cfg = &tmpCfg + } + c := &Client{ + config: cfg, + requestNextChan: make(chan protocol.Message), + } + c.callbackContext = CallbackContext{ + Client: c, + ConnectionId: protoOptions.ConnectionId, + } + // Update state map with timeout + stateMap := StateMap.Copy() + if entry, ok := stateMap[StateBusy]; ok { + entry.Timeout = c.config.Timeout + stateMap[StateBusy] = entry + } + // Configure underlying Protocol + protoConfig := protocol.ProtocolConfig{ + Name: ProtocolName, + ProtocolId: ProtocolId, + Muxer: protoOptions.Muxer, + Logger: protoOptions.Logger, + ErrorChan: protoOptions.ErrorChan, + Mode: protoOptions.Mode, + Role: protocol.ProtocolRoleClient, + MessageHandlerFunc: c.messageHandler, + MessageFromCborFunc: NewMsgFromCbor, + StateMap: stateMap, + InitialState: StateIdle, + } + c.Protocol = protocol.New(protoConfig) + return c +} + +func (c *Client) Start() { + c.onceStart.Do(func() { + c.Protocol.Logger(). + Debug("starting client protocol", + "component", "network", + "protocol", ProtocolName, + "connection_id", c.callbackContext.ConnectionId.String(), + ) + c.Protocol.Start() + // Start goroutine to cleanup resources on protocol shutdown + go func() { + <-c.DoneChan() + close(c.requestNextChan) + }() + }) +} + +func (c *Client) Stop() error { + var err error + c.onceStop.Do(func() { + c.Protocol.Logger(). + Debug("stopping client protocol", + "component", "network", + "protocol", ProtocolName, + "connection_id", c.callbackContext.ConnectionId.String(), + ) + msg := NewMsgDone() + err = c.SendMessage(msg) + }) + return err +} + +// RequestNext fetches the next available notification. This function will block until a notification is received from the peer. +func (c *Client) RequestNext() (protocol.Message, error) { + msg := NewMsgNotificationRequestNext() + if err := c.SendMessage(msg); err != nil { + return nil, err + } + resp, ok := <-c.requestNextChan + if !ok { + return nil, protocol.ErrProtocolShuttingDown + } + return resp, nil +} + +func (c *Client) messageHandler(msg protocol.Message) error { + var err error + switch msg.Type() { + case MessageTypeBlockAnnouncement: + err = c.handleBlockAnnouncement(msg) + case MessageTypeBlockOffer: + err = c.handleBlockOffer(msg) + case MessageTypeBlockTxsOffer: + err = c.handleBlockTxsOffer(msg) + case MessageTypeVotesOffer: + err = c.handleVotesOffer(msg) + default: + err = fmt.Errorf( + "%s: received unexpected message type %d", + ProtocolName, + msg.Type(), + ) + } + return err +} + +func (c *Client) handleBlockAnnouncement(msg protocol.Message) error { + c.requestNextChan <- msg + return nil +} + +func (c *Client) handleBlockOffer(msg protocol.Message) error { + c.requestNextChan <- msg + return nil +} + +func (c *Client) handleBlockTxsOffer(msg protocol.Message) error { + c.requestNextChan <- msg + return nil +} + +func (c *Client) handleVotesOffer(msg protocol.Message) error { + c.requestNextChan <- msg + return nil +} diff --git a/protocol/leiosnotify/leiosnotify.go b/protocol/leiosnotify/leiosnotify.go new file mode 100644 index 00000000..93ebe063 --- /dev/null +++ b/protocol/leiosnotify/leiosnotify.go @@ -0,0 +1,128 @@ +// Copyright 2025 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leiosnotify + +import ( + "time" + + "github.com/blinklabs-io/gouroboros/connection" + "github.com/blinklabs-io/gouroboros/protocol" +) + +const ( + ProtocolName = "leios-notify" + ProtocolId uint16 = 998 // NOTE: this is a dummy value and will need to be changed +) + +var ( + StateIdle = protocol.NewState(1, "Idle") + StateBusy = protocol.NewState(2, "Busy") + StateDone = protocol.NewState(4, "Done") +) + +var StateMap = protocol.StateMap{ + StateIdle: protocol.StateMapEntry{ + Agency: protocol.AgencyClient, + Transitions: []protocol.StateTransition{ + { + MsgType: MessageTypeNotificationRequestNext, + NewState: StateBusy, + }, + { + MsgType: MessageTypeDone, + NewState: StateDone, + }, + }, + }, + StateBusy: protocol.StateMapEntry{ + Agency: protocol.AgencyServer, + Transitions: []protocol.StateTransition{ + { + MsgType: MessageTypeBlockAnnouncement, + NewState: StateIdle, + }, + { + MsgType: MessageTypeBlockOffer, + NewState: StateIdle, + }, + { + MsgType: MessageTypeBlockTxsOffer, + NewState: StateIdle, + }, + { + MsgType: MessageTypeVotesOffer, + NewState: StateIdle, + }, + }, + }, + StateDone: protocol.StateMapEntry{ + Agency: protocol.AgencyNone, + }, +} + +type LeiosNotify struct { + Client *Client + Server *Server +} + +type Config struct { + RequestNextFunc RequestNextFunc + Timeout time.Duration +} + +// Callback context +type CallbackContext struct { + ConnectionId connection.ConnectionId + Client *Client + Server *Server +} + +// Callback function types +type ( + RequestNextFunc func(CallbackContext) (protocol.Message, error) +) + +func New(protoOptions protocol.ProtocolOptions, cfg *Config) *LeiosNotify { + b := &LeiosNotify{ + Client: NewClient(protoOptions, cfg), + Server: NewServer(protoOptions, cfg), + } + return b +} + +type LeiosNotifyOptionFunc func(*Config) + +func NewConfig(options ...LeiosNotifyOptionFunc) Config { + c := Config{ + Timeout: 60 * time.Second, + } + // Apply provided options functions + for _, option := range options { + option(&c) + } + return c +} + +func WithRequestNextFunc(requestNextFunc RequestNextFunc) LeiosNotifyOptionFunc { + return func(c *Config) { + c.RequestNextFunc = requestNextFunc + } +} + +func WithTimeout(timeout time.Duration) LeiosNotifyOptionFunc { + return func(c *Config) { + c.Timeout = timeout + } +} diff --git a/protocol/leiosnotify/messages.go b/protocol/leiosnotify/messages.go new file mode 100644 index 00000000..19a31dfd --- /dev/null +++ b/protocol/leiosnotify/messages.go @@ -0,0 +1,154 @@ +// Copyright 2025 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leiosnotify + +import ( + "fmt" + + "github.com/blinklabs-io/gouroboros/cbor" + "github.com/blinklabs-io/gouroboros/protocol" +) + +// NOTE: these are dummy message IDs and will probably need to be changed +const ( + MessageTypeNotificationRequestNext = 0 + MessageTypeBlockAnnouncement = 1 + MessageTypeBlockOffer = 2 + MessageTypeBlockTxsOffer = 3 + MessageTypeVotesOffer = 4 + MessageTypeDone = 5 +) + +func NewMsgFromCbor(msgType uint, data []byte) (protocol.Message, error) { + var ret protocol.Message + switch msgType { + case MessageTypeNotificationRequestNext: + ret = &MsgNotificationRequestNext{} + case MessageTypeBlockAnnouncement: + ret = &MsgBlockAnnouncement{} + case MessageTypeBlockOffer: + ret = &MsgBlockOffer{} + case MessageTypeBlockTxsOffer: + ret = &MsgBlockTxsOffer{} + case MessageTypeVotesOffer: + ret = &MsgVotesOffer{} + case MessageTypeDone: + ret = &MsgDone{} + } + if _, err := cbor.Decode(data, ret); err != nil { + return nil, fmt.Errorf("%s: decode error: %w", ProtocolName, err) + } + if ret != nil { + // Store the raw message CBOR + ret.SetCbor(data) + } + return ret, nil +} + +type MsgNotificationRequestNext struct { + protocol.MessageBase +} + +func NewMsgNotificationRequestNext() *MsgNotificationRequestNext { + m := &MsgNotificationRequestNext{ + MessageBase: protocol.MessageBase{ + MessageType: MessageTypeNotificationRequestNext, + }, + } + return m +} + +type MsgBlockAnnouncement struct { + protocol.MessageBase + BlockHeaderRaw cbor.RawMessage +} + +func NewMsgBlockAnnouncement(blockHeader cbor.RawMessage) *MsgBlockAnnouncement { + m := &MsgBlockAnnouncement{ + MessageBase: protocol.MessageBase{ + MessageType: MessageTypeBlockAnnouncement, + }, + BlockHeaderRaw: blockHeader, + } + return m +} + +type MsgBlockOffer struct { + protocol.MessageBase + Slot uint64 + Hash []byte +} + +func NewMsgBlockOffer(slot uint64, hash []byte) *MsgBlockOffer { + m := &MsgBlockOffer{ + MessageBase: protocol.MessageBase{ + MessageType: MessageTypeBlockOffer, + }, + Slot: slot, + Hash: hash, + } + return m +} + +type MsgBlockTxsOffer struct { + protocol.MessageBase + Slot uint64 + Hash []byte +} + +func NewMsgBlockTxsOffer(slot uint64, hash []byte) *MsgBlockTxsOffer { + m := &MsgBlockTxsOffer{ + MessageBase: protocol.MessageBase{ + MessageType: MessageTypeBlockTxsOffer, + }, + Slot: slot, + Hash: hash, + } + return m +} + +type MsgVotesOffer struct { + protocol.MessageBase + Votes []MsgVotesOfferVote +} + +type MsgVotesOfferVote struct { + cbor.StructAsArray + Slot uint64 + VoteIssuerId []byte +} + +func NewMsgVotesOffer(votes []MsgVotesOfferVote) *MsgVotesOffer { + m := &MsgVotesOffer{ + MessageBase: protocol.MessageBase{ + MessageType: MessageTypeVotesOffer, + }, + Votes: votes, + } + return m +} + +type MsgDone struct { + protocol.MessageBase +} + +func NewMsgDone() *MsgDone { + m := &MsgDone{ + MessageBase: protocol.MessageBase{ + MessageType: MessageTypeDone, + }, + } + return m +} diff --git a/protocol/leiosnotify/server.go b/protocol/leiosnotify/server.go new file mode 100644 index 00000000..f52c0cf4 --- /dev/null +++ b/protocol/leiosnotify/server.go @@ -0,0 +1,117 @@ +// Copyright 2025 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leiosnotify + +import ( + "errors" + "fmt" + + "github.com/blinklabs-io/gouroboros/protocol" +) + +type Server struct { + *protocol.Protocol + config *Config + callbackContext CallbackContext + protoOptions protocol.ProtocolOptions +} + +func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server { + s := &Server{ + config: cfg, + // Save this for re-use later + protoOptions: protoOptions, + } + s.callbackContext = CallbackContext{ + Server: s, + ConnectionId: protoOptions.ConnectionId, + } + s.initProtocol() + return s +} + +func (s *Server) initProtocol() { + protoConfig := protocol.ProtocolConfig{ + Name: ProtocolName, + ProtocolId: ProtocolId, + Muxer: s.protoOptions.Muxer, + Logger: s.protoOptions.Logger, + ErrorChan: s.protoOptions.ErrorChan, + Mode: s.protoOptions.Mode, + Role: protocol.ProtocolRoleServer, + MessageHandlerFunc: s.messageHandler, + MessageFromCborFunc: NewMsgFromCbor, + StateMap: StateMap, + InitialState: StateIdle, + } + s.Protocol = protocol.New(protoConfig) +} + +func (s *Server) messageHandler(msg protocol.Message) error { + var err error + switch msg.Type() { + case MessageTypeNotificationRequestNext: + err = s.handleRequestNext() + case MessageTypeDone: + err = s.handleDone() + default: + err = fmt.Errorf( + "%s: received unexpected message type %d", + ProtocolName, + msg.Type(), + ) + } + return err +} + +func (s *Server) handleRequestNext() error { + s.Protocol.Logger(). + Debug("notificiation request next", + "component", "network", + "protocol", ProtocolName, + "role", "server", + "connection_id", s.callbackContext.ConnectionId.String(), + ) + if s.config == nil || s.config.RequestNextFunc == nil { + return errors.New( + "received leios-notify NotificationRequestNext message but no callback function is defined", + ) + } + resp, err := s.config.RequestNextFunc( + s.callbackContext, + ) + if err != nil { + return err + } + if err := s.SendMessage(resp); err != nil { + return err + } + return nil +} + +func (s *Server) handleDone() error { + s.Protocol.Logger(). + Debug("client done", + "component", "network", + "protocol", ProtocolName, + "role", "server", + "connection_id", s.callbackContext.ConnectionId.String(), + ) + // Restart protocol + s.Stop() + s.initProtocol() + s.Start() + return nil +}