Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions protocol/blockfetch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("starting protocol: %s", ProtocolName))
Debug(fmt.Sprintf("%s: starting protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand All @@ -97,7 +97,7 @@ func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("stopping protocol: %s", ProtocolName))
Debug(fmt.Sprintf("%s: stopping protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msg := NewMsgClientDone()
err = c.SendMessage(msg)
})
Expand All @@ -107,7 +107,7 @@ func (c *Client) Stop() error {
// GetBlockRange starts an async process to fetch all blocks in the specified range (inclusive)
func (c *Client) GetBlockRange(start common.Point, end common.Point) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetBlockRange(start: %+v, end: %+v)", ProtocolName, start, end))
Debug(fmt.Sprintf("%s: client called GetBlockRange(start: {Slot: %d, Hash: %x}, end: {Slot: %d, Hash: %x})", ProtocolName, start.Slot, start.Hash, end.Slot, end.Hash))
c.busyMutex.Lock()
c.blockUseCallback = true
msg := NewMsgRequestRange(start, end)
Expand All @@ -129,7 +129,7 @@ func (c *Client) GetBlockRange(start common.Point, end common.Point) error {
// GetBlock requests and returns a single block specified by the provided point
func (c *Client) GetBlock(point common.Point) (ledger.Block, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetBlock(point: %+v)", ProtocolName, point))
Debug(fmt.Sprintf("%s: client called GetBlock(point: {Slot: %d, Hash: %x})", ProtocolName, point.Slot, point.Hash))
c.busyMutex.Lock()
c.blockUseCallback = false
msg := NewMsgRequestRange(point, point)
Expand All @@ -153,8 +153,6 @@ func (c *Client) GetBlock(point common.Point) (ledger.Block, error) {
}

func (c *Client) messageHandler(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client message for %s", ProtocolName))
var err error
switch msg.Type() {
case MessageTypeStartBatch:
Expand All @@ -177,22 +175,22 @@ func (c *Client) messageHandler(msg protocol.Message) error {

func (c *Client) handleStartBatch() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client start batch for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client start batch for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.startBatchResultChan <- nil
return nil
}

func (c *Client) handleNoBlocks() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client no blocks found for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client no blocks found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
err := fmt.Errorf("block(s) not found")
c.startBatchResultChan <- err
return nil
}

func (c *Client) handleBlock(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client block found for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client block found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msg := msgGeneric.(*MsgBlock)
// Decode only enough to get the block type value
var wrappedBlock WrappedBlock
Expand All @@ -219,7 +217,7 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error {

func (c *Client) handleBatchDone() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client batch done for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client batch done for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.busyMutex.Unlock()
return nil
}
50 changes: 36 additions & 14 deletions protocol/chainsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func NewClient(
func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("starting protocol: %s", ProtocolName))
Debug(fmt.Sprintf("%s: starting protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand All @@ -132,7 +132,7 @@ func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("stopping protocol: %s", ProtocolName))
Debug(fmt.Sprintf("%s: stopping protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.busyMutex.Lock()
defer c.busyMutex.Unlock()
msg := NewMsgDone()
Expand All @@ -146,7 +146,7 @@ func (c *Client) Stop() error {
// GetCurrentTip returns the current chain tip
func (c *Client) GetCurrentTip() (*Tip, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetCurrentTip()", ProtocolName))
Debug(fmt.Sprintf("%s: client %+v called GetCurrentTip()", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
done := atomic.Bool{}
requestResultChan := make(chan Tip, 1)
requestErrorChan := make(chan error, 1)
Expand Down Expand Up @@ -185,10 +185,14 @@ func (c *Client) GetCurrentTip() (*Tip, error) {
// The request is being handled by another request, wait for the result.
waitingForCurrentTipChan = nil
case tip := <-waitingResultChan:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: returning tip results {Slot: %d, Hash: %x, BlockNumber: %d} to %+v", ProtocolName, tip.Point.Slot, tip.Point.Hash, tip.BlockNumber, c.callbackContext.ConnectionId.RemoteAddr))
// The result from the other request is ready.
done.Store(true)
return &tip, nil
case tip := <-requestResultChan:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: returning tip results {Slot: %d, Hash: %x, BlockNumber: %d} to %+v", ProtocolName, tip.Point.Slot, tip.Point.Hash, tip.BlockNumber, c.callbackContext.ConnectionId.RemoteAddr))
// If waitingForCurrentTipChan is full, the for loop that empties it might finish the
// loop before the select statement that writes to it is triggered. For that reason we
// require requestResultChan here.
Expand All @@ -204,15 +208,25 @@ func (c *Client) GetCurrentTip() (*Tip, error) {
func (c *Client) GetAvailableBlockRange(
intersectPoints []common.Point,
) (common.Point, common.Point, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetAvailableBlockRange(intersectPoints: %+v)", ProtocolName, intersectPoints))
c.busyMutex.Lock()
defer c.busyMutex.Unlock()

// Use origin if no intersect points were specified
if len(intersectPoints) == 0 {
intersectPoints = []common.Point{common.NewPointOrigin()}
}
switch len(intersectPoints) {
case 1:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash))
case 2:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash, intersectPoints[1].Slot, intersectPoints[1].Hash))
default:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: %+v)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints))
}

// Find our chain intersection
result := c.requestFindIntersect(intersectPoints)
if result.error != nil {
Expand Down Expand Up @@ -279,14 +293,24 @@ func (c *Client) GetAvailableBlockRange(
// Sync begins a chain-sync operation using the provided intersect point(s). Incoming blocks will be delivered
// via the RollForward callback function specified in the protocol config
func (c *Client) Sync(intersectPoints []common.Point) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s Sync(intersectPoints: %+v)", ProtocolName, intersectPoints))
c.busyMutex.Lock()
defer c.busyMutex.Unlock()

// Use origin if no intersect points were specified
if len(intersectPoints) == 0 {
intersectPoints = []common.Point{common.NewPointOrigin()}
}
switch len(intersectPoints) {
case 1:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: []{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash))
case 2:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash, intersectPoints[1].Slot, intersectPoints[1].Hash))
default:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: %+v)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints))
}

intersectResultChan, cancel := c.wantIntersectFound()
msg := NewMsgFindIntersect(intersectPoints)
Expand Down Expand Up @@ -430,8 +454,6 @@ func (c *Client) requestFindIntersect(
}

func (c *Client) messageHandler(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client message for %s", ProtocolName))
var err error
switch msg.Type() {
case MessageTypeAwaitReply:
Expand All @@ -456,13 +478,13 @@ func (c *Client) messageHandler(msg protocol.Message) error {

func (c *Client) handleAwaitReply() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client await reply for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client await reply for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
return nil
}

func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client roll forward for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client roll forward for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
firstBlockChan := func() chan<- clientPointResult {
select {
case ch := <-c.wantFirstBlockChan:
Expand Down Expand Up @@ -572,7 +594,7 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {

func (c *Client) handleRollBackward(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client roll backward for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client roll backward for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msgRollBackward := msg.(*MsgRollBackward)
c.sendCurrentTip(msgRollBackward.Tip)
if len(c.wantFirstBlockChan) == 0 {
Expand All @@ -599,7 +621,7 @@ func (c *Client) handleRollBackward(msg protocol.Message) error {

func (c *Client) handleIntersectFound(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client intersect found for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client intersect found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msgIntersectFound := msg.(*MsgIntersectFound)
c.sendCurrentTip(msgIntersectFound.Tip)

Expand All @@ -613,7 +635,7 @@ func (c *Client) handleIntersectFound(msg protocol.Message) error {

func (c *Client) handleIntersectNotFound(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client intersect not found for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client intersect not found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msgIntersectNotFound := msgGeneric.(*MsgIntersectNotFound)
c.sendCurrentTip(msgIntersectNotFound.Tip)

Expand Down
12 changes: 5 additions & 7 deletions protocol/handshake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
MessageHandlerFunc: c.handleMessage,
MessageHandlerFunc: c.messageHandler,
MessageFromCborFunc: NewMsgFromCbor,
StateMap: stateMap,
InitialState: statePropose,
Expand All @@ -70,17 +70,15 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("starting protocol: %s", ProtocolName))
Debug(fmt.Sprintf("%s: starting protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.Protocol.Start()
// Send our ProposeVersions message
msg := NewMsgProposeVersions(c.config.ProtocolVersionMap)
_ = c.SendMessage(msg)
})
}

func (c *Client) handleMessage(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client message for %s", ProtocolName))
func (c *Client) messageHandler(msg protocol.Message) error {
var err error
switch msg.Type() {
case MessageTypeAcceptVersion:
Expand All @@ -99,7 +97,7 @@ func (c *Client) handleMessage(msg protocol.Message) error {

func (c *Client) handleAcceptVersion(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client accept version for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client accept version for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
if c.config.FinishedFunc == nil {
return fmt.Errorf(
"received handshake AcceptVersion message but no callback function is defined",
Expand All @@ -122,7 +120,7 @@ func (c *Client) handleAcceptVersion(msg protocol.Message) error {

func (c *Client) handleRefuse(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client refuse for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client refuse for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msg := msgGeneric.(*MsgRefuse)
var err error
switch msg.Reason[0].(uint64) {
Expand Down
5 changes: 5 additions & 0 deletions protocol/keepalive/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
Logger: protoOptions.Logger,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
Expand All @@ -68,6 +69,8 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {

func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: starting protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand Down Expand Up @@ -119,6 +122,8 @@ func (c *Client) messageHandler(msg protocol.Message) error {
}

func (c *Client) handleKeepAliveResponse(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client keepalive response for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msg := msgGeneric.(*MsgKeepAliveResponse)
if msg.Cookie != c.config.Cookie {
return fmt.Errorf(
Expand Down
7 changes: 7 additions & 0 deletions protocol/peersharing/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
Logger: protoOptions.Logger,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
Expand All @@ -66,6 +67,8 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
}

func (c *Client) GetPeers(amount uint8) ([]PeerAddress, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetPeers(amount: %d)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, amount))
msg := NewMsgShareRequest(amount)
if err := c.SendMessage(msg); err != nil {
return nil, err
Expand All @@ -78,6 +81,8 @@ func (c *Client) GetPeers(amount uint8) ([]PeerAddress, error) {
}

func (c *Client) handleMessage(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client message for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
var err error
switch msg.Type() {
case MessageTypeSharePeers:
Expand All @@ -93,6 +98,8 @@ func (c *Client) handleMessage(msg protocol.Message) error {
}

func (c *Client) handleSharePeers(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client share peers for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msgSharePeers := msg.(*MsgSharePeers)
c.sharePeersChan <- msgSharePeers.PeerAddresses
return nil
Expand Down
2 changes: 0 additions & 2 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func New(config ProtocolConfig) *Protocol {
func (p *Protocol) Start() {
p.onceStart.Do(func() {
// Register protocol with muxer
p.Logger().Debug("registering protocol with muxer")
muxerProtocolRole := muxer.ProtocolRoleInitiator
if p.config.Role == ProtocolRoleServer {
muxerProtocolRole = muxer.ProtocolRoleResponder
Expand Down Expand Up @@ -160,7 +159,6 @@ func (p *Protocol) Start() {
func (p *Protocol) Stop() {
p.onceStop.Do(func() {
// Unregister protocol from muxer
p.Logger().Debug("unregistering protocol with muxer")
muxerProtocolRole := muxer.ProtocolRoleInitiator
if p.config.Role == ProtocolRoleServer {
muxerProtocolRole = muxer.ProtocolRoleResponder
Expand Down
7 changes: 7 additions & 0 deletions protocol/txsubmission/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
Logger: protoOptions.Logger,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
Expand All @@ -75,6 +76,8 @@ func (c *Client) Init() {
}

func (c *Client) messageHandler(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client message for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
var err error
switch msg.Type() {
case MessageTypeRequestTxIds:
Expand All @@ -92,6 +95,8 @@ func (c *Client) messageHandler(msg protocol.Message) error {
}

func (c *Client) handleRequestTxIds(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client request tx ids for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
if c.config.RequestTxIdsFunc == nil {
return fmt.Errorf(
"received tx-submission RequestTxIds message but no callback function is defined",
Expand All @@ -116,6 +121,8 @@ func (c *Client) handleRequestTxIds(msg protocol.Message) error {
}

func (c *Client) handleRequestTxs(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client request txs for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
if c.config.RequestTxsFunc == nil {
return fmt.Errorf(
"received tx-submission RequestTxs message but no callback function is defined",
Expand Down