diff --git a/protocol/blockfetch/client.go b/protocol/blockfetch/client.go index 39881ff4..3b124401 100644 --- a/protocol/blockfetch/client.go +++ b/protocol/blockfetch/client.go @@ -82,7 +82,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { func (c *Client) Start() { c.onceStart.Do(func() { c.Protocol.Logger(). - Debug(fmt.Sprintf("starting protocol: %s", ProtocolName)) + Debug(fmt.Sprintf("%s: starting protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) c.Protocol.Start() // Start goroutine to cleanup resources on protocol shutdown go func() { @@ -97,7 +97,7 @@ func (c *Client) Stop() error { var err error c.onceStop.Do(func() { c.Protocol.Logger(). - Debug(fmt.Sprintf("stopping protocol: %s", ProtocolName)) + Debug(fmt.Sprintf("%s: stopping protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) msg := NewMsgClientDone() err = c.SendMessage(msg) }) @@ -107,7 +107,7 @@ func (c *Client) Stop() error { // GetBlockRange starts an async process to fetch all blocks in the specified range (inclusive) func (c *Client) GetBlockRange(start common.Point, end common.Point) error { c.Protocol.Logger(). - Debug(fmt.Sprintf("client called %s GetBlockRange(start: %+v, end: %+v)", ProtocolName, start, end)) + Debug(fmt.Sprintf("%s: client called GetBlockRange(start: {Slot: %d, Hash: %x}, end: {Slot: %d, Hash: %x})", ProtocolName, start.Slot, start.Hash, end.Slot, end.Hash)) c.busyMutex.Lock() c.blockUseCallback = true msg := NewMsgRequestRange(start, end) @@ -129,7 +129,7 @@ func (c *Client) GetBlockRange(start common.Point, end common.Point) error { // GetBlock requests and returns a single block specified by the provided point func (c *Client) GetBlock(point common.Point) (ledger.Block, error) { c.Protocol.Logger(). - Debug(fmt.Sprintf("client called %s GetBlock(point: %+v)", ProtocolName, point)) + Debug(fmt.Sprintf("%s: client called GetBlock(point: {Slot: %d, Hash: %x})", ProtocolName, point.Slot, point.Hash)) c.busyMutex.Lock() c.blockUseCallback = false msg := NewMsgRequestRange(point, point) @@ -153,8 +153,6 @@ func (c *Client) GetBlock(point common.Point) (ledger.Block, error) { } func (c *Client) messageHandler(msg protocol.Message) error { - c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client message for %s", ProtocolName)) var err error switch msg.Type() { case MessageTypeStartBatch: @@ -177,14 +175,14 @@ func (c *Client) messageHandler(msg protocol.Message) error { func (c *Client) handleStartBatch() error { c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client start batch for %s", ProtocolName)) + Debug(fmt.Sprintf("%s: client start batch for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) c.startBatchResultChan <- nil return nil } func (c *Client) handleNoBlocks() error { c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client no blocks found for %s", ProtocolName)) + Debug(fmt.Sprintf("%s: client no blocks found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) err := fmt.Errorf("block(s) not found") c.startBatchResultChan <- err return nil @@ -192,7 +190,7 @@ func (c *Client) handleNoBlocks() error { func (c *Client) handleBlock(msgGeneric protocol.Message) error { c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client block found for %s", ProtocolName)) + Debug(fmt.Sprintf("%s: client block found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) msg := msgGeneric.(*MsgBlock) // Decode only enough to get the block type value var wrappedBlock WrappedBlock @@ -219,7 +217,7 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error { func (c *Client) handleBatchDone() error { c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client batch done for %s", ProtocolName)) + Debug(fmt.Sprintf("%s: client batch done for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) c.busyMutex.Unlock() return nil } diff --git a/protocol/chainsync/client.go b/protocol/chainsync/client.go index 77e3b205..0546400c 100644 --- a/protocol/chainsync/client.go +++ b/protocol/chainsync/client.go @@ -117,7 +117,7 @@ func NewClient( func (c *Client) Start() { c.onceStart.Do(func() { c.Protocol.Logger(). - Debug(fmt.Sprintf("starting protocol: %s", ProtocolName)) + Debug(fmt.Sprintf("%s: starting protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) c.Protocol.Start() // Start goroutine to cleanup resources on protocol shutdown go func() { @@ -132,7 +132,7 @@ func (c *Client) Stop() error { var err error c.onceStop.Do(func() { c.Protocol.Logger(). - Debug(fmt.Sprintf("stopping protocol: %s", ProtocolName)) + Debug(fmt.Sprintf("%s: stopping protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) c.busyMutex.Lock() defer c.busyMutex.Unlock() msg := NewMsgDone() @@ -146,7 +146,7 @@ func (c *Client) Stop() error { // GetCurrentTip returns the current chain tip func (c *Client) GetCurrentTip() (*Tip, error) { c.Protocol.Logger(). - Debug(fmt.Sprintf("client called %s GetCurrentTip()", ProtocolName)) + Debug(fmt.Sprintf("%s: client %+v called GetCurrentTip()", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) done := atomic.Bool{} requestResultChan := make(chan Tip, 1) requestErrorChan := make(chan error, 1) @@ -185,10 +185,14 @@ func (c *Client) GetCurrentTip() (*Tip, error) { // The request is being handled by another request, wait for the result. waitingForCurrentTipChan = nil case tip := <-waitingResultChan: + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: returning tip results {Slot: %d, Hash: %x, BlockNumber: %d} to %+v", ProtocolName, tip.Point.Slot, tip.Point.Hash, tip.BlockNumber, c.callbackContext.ConnectionId.RemoteAddr)) // The result from the other request is ready. done.Store(true) return &tip, nil case tip := <-requestResultChan: + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: returning tip results {Slot: %d, Hash: %x, BlockNumber: %d} to %+v", ProtocolName, tip.Point.Slot, tip.Point.Hash, tip.BlockNumber, c.callbackContext.ConnectionId.RemoteAddr)) // If waitingForCurrentTipChan is full, the for loop that empties it might finish the // loop before the select statement that writes to it is triggered. For that reason we // require requestResultChan here. @@ -204,8 +208,6 @@ func (c *Client) GetCurrentTip() (*Tip, error) { func (c *Client) GetAvailableBlockRange( intersectPoints []common.Point, ) (common.Point, common.Point, error) { - c.Protocol.Logger(). - Debug(fmt.Sprintf("client called %s GetAvailableBlockRange(intersectPoints: %+v)", ProtocolName, intersectPoints)) c.busyMutex.Lock() defer c.busyMutex.Unlock() @@ -213,6 +215,18 @@ func (c *Client) GetAvailableBlockRange( if len(intersectPoints) == 0 { intersectPoints = []common.Point{common.NewPointOrigin()} } + switch len(intersectPoints) { + case 1: + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash)) + case 2: + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash, intersectPoints[1].Slot, intersectPoints[1].Hash)) + default: + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: %+v)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints)) + } + // Find our chain intersection result := c.requestFindIntersect(intersectPoints) if result.error != nil { @@ -279,14 +293,24 @@ func (c *Client) GetAvailableBlockRange( // Sync begins a chain-sync operation using the provided intersect point(s). Incoming blocks will be delivered // via the RollForward callback function specified in the protocol config func (c *Client) Sync(intersectPoints []common.Point) error { - c.Protocol.Logger(). - Debug(fmt.Sprintf("client called %s Sync(intersectPoints: %+v)", ProtocolName, intersectPoints)) c.busyMutex.Lock() defer c.busyMutex.Unlock() + // Use origin if no intersect points were specified if len(intersectPoints) == 0 { intersectPoints = []common.Point{common.NewPointOrigin()} } + switch len(intersectPoints) { + case 1: + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: []{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash)) + case 2: + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash, intersectPoints[1].Slot, intersectPoints[1].Hash)) + default: + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: %+v)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints)) + } intersectResultChan, cancel := c.wantIntersectFound() msg := NewMsgFindIntersect(intersectPoints) @@ -430,8 +454,6 @@ func (c *Client) requestFindIntersect( } func (c *Client) messageHandler(msg protocol.Message) error { - c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client message for %s", ProtocolName)) var err error switch msg.Type() { case MessageTypeAwaitReply: @@ -456,13 +478,13 @@ func (c *Client) messageHandler(msg protocol.Message) error { func (c *Client) handleAwaitReply() error { c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client await reply for %s", ProtocolName)) + Debug(fmt.Sprintf("%s: client await reply for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) return nil } func (c *Client) handleRollForward(msgGeneric protocol.Message) error { c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client roll forward for %s", ProtocolName)) + Debug(fmt.Sprintf("%s: client roll forward for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) firstBlockChan := func() chan<- clientPointResult { select { case ch := <-c.wantFirstBlockChan: @@ -572,7 +594,7 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error { func (c *Client) handleRollBackward(msg protocol.Message) error { c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client roll backward for %s", ProtocolName)) + Debug(fmt.Sprintf("%s: client roll backward for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) msgRollBackward := msg.(*MsgRollBackward) c.sendCurrentTip(msgRollBackward.Tip) if len(c.wantFirstBlockChan) == 0 { @@ -599,7 +621,7 @@ func (c *Client) handleRollBackward(msg protocol.Message) error { func (c *Client) handleIntersectFound(msg protocol.Message) error { c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client intersect found for %s", ProtocolName)) + Debug(fmt.Sprintf("%s: client intersect found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) msgIntersectFound := msg.(*MsgIntersectFound) c.sendCurrentTip(msgIntersectFound.Tip) @@ -613,7 +635,7 @@ func (c *Client) handleIntersectFound(msg protocol.Message) error { func (c *Client) handleIntersectNotFound(msgGeneric protocol.Message) error { c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client intersect not found for %s", ProtocolName)) + Debug(fmt.Sprintf("%s: client intersect not found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) msgIntersectNotFound := msgGeneric.(*MsgIntersectNotFound) c.sendCurrentTip(msgIntersectNotFound.Tip) diff --git a/protocol/handshake/client.go b/protocol/handshake/client.go index ca0eaa44..31b6b5a1 100644 --- a/protocol/handshake/client.go +++ b/protocol/handshake/client.go @@ -57,7 +57,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { ErrorChan: protoOptions.ErrorChan, Mode: protoOptions.Mode, Role: protocol.ProtocolRoleClient, - MessageHandlerFunc: c.handleMessage, + MessageHandlerFunc: c.messageHandler, MessageFromCborFunc: NewMsgFromCbor, StateMap: stateMap, InitialState: statePropose, @@ -70,7 +70,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { func (c *Client) Start() { c.onceStart.Do(func() { c.Protocol.Logger(). - Debug(fmt.Sprintf("starting protocol: %s", ProtocolName)) + Debug(fmt.Sprintf("%s: starting protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) c.Protocol.Start() // Send our ProposeVersions message msg := NewMsgProposeVersions(c.config.ProtocolVersionMap) @@ -78,9 +78,7 @@ func (c *Client) Start() { }) } -func (c *Client) handleMessage(msg protocol.Message) error { - c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client message for %s", ProtocolName)) +func (c *Client) messageHandler(msg protocol.Message) error { var err error switch msg.Type() { case MessageTypeAcceptVersion: @@ -99,7 +97,7 @@ func (c *Client) handleMessage(msg protocol.Message) error { func (c *Client) handleAcceptVersion(msg protocol.Message) error { c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client accept version for %s", ProtocolName)) + Debug(fmt.Sprintf("%s: client accept version for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) if c.config.FinishedFunc == nil { return fmt.Errorf( "received handshake AcceptVersion message but no callback function is defined", @@ -122,7 +120,7 @@ func (c *Client) handleAcceptVersion(msg protocol.Message) error { func (c *Client) handleRefuse(msgGeneric protocol.Message) error { c.Protocol.Logger(). - Debug(fmt.Sprintf("handling client refuse for %s", ProtocolName)) + Debug(fmt.Sprintf("%s: client refuse for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) msg := msgGeneric.(*MsgRefuse) var err error switch msg.Reason[0].(uint64) { diff --git a/protocol/keepalive/client.go b/protocol/keepalive/client.go index fd219add..5e65861b 100644 --- a/protocol/keepalive/client.go +++ b/protocol/keepalive/client.go @@ -54,6 +54,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { Name: ProtocolName, ProtocolId: ProtocolId, Muxer: protoOptions.Muxer, + Logger: protoOptions.Logger, ErrorChan: protoOptions.ErrorChan, Mode: protoOptions.Mode, Role: protocol.ProtocolRoleClient, @@ -68,6 +69,8 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { func (c *Client) Start() { c.onceStart.Do(func() { + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: starting protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) c.Protocol.Start() // Start goroutine to cleanup resources on protocol shutdown go func() { @@ -119,6 +122,8 @@ func (c *Client) messageHandler(msg protocol.Message) error { } func (c *Client) handleKeepAliveResponse(msgGeneric protocol.Message) error { + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client keepalive response for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) msg := msgGeneric.(*MsgKeepAliveResponse) if msg.Cookie != c.config.Cookie { return fmt.Errorf( diff --git a/protocol/peersharing/client.go b/protocol/peersharing/client.go index a960e4e1..9ca18c6b 100644 --- a/protocol/peersharing/client.go +++ b/protocol/peersharing/client.go @@ -53,6 +53,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { Name: ProtocolName, ProtocolId: ProtocolId, Muxer: protoOptions.Muxer, + Logger: protoOptions.Logger, ErrorChan: protoOptions.ErrorChan, Mode: protoOptions.Mode, Role: protocol.ProtocolRoleClient, @@ -66,6 +67,8 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { } func (c *Client) GetPeers(amount uint8) ([]PeerAddress, error) { + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client %+v called GetPeers(amount: %d)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, amount)) msg := NewMsgShareRequest(amount) if err := c.SendMessage(msg); err != nil { return nil, err @@ -78,6 +81,8 @@ func (c *Client) GetPeers(amount uint8) ([]PeerAddress, error) { } func (c *Client) handleMessage(msg protocol.Message) error { + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client message for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) var err error switch msg.Type() { case MessageTypeSharePeers: @@ -93,6 +98,8 @@ func (c *Client) handleMessage(msg protocol.Message) error { } func (c *Client) handleSharePeers(msg protocol.Message) error { + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client share peers for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) msgSharePeers := msg.(*MsgSharePeers) c.sharePeersChan <- msgSharePeers.PeerAddresses return nil diff --git a/protocol/protocol.go b/protocol/protocol.go index 829443b6..fc6d5359 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -121,7 +121,6 @@ func New(config ProtocolConfig) *Protocol { func (p *Protocol) Start() { p.onceStart.Do(func() { // Register protocol with muxer - p.Logger().Debug("registering protocol with muxer") muxerProtocolRole := muxer.ProtocolRoleInitiator if p.config.Role == ProtocolRoleServer { muxerProtocolRole = muxer.ProtocolRoleResponder @@ -160,7 +159,6 @@ func (p *Protocol) Start() { func (p *Protocol) Stop() { p.onceStop.Do(func() { // Unregister protocol from muxer - p.Logger().Debug("unregistering protocol with muxer") muxerProtocolRole := muxer.ProtocolRoleInitiator if p.config.Role == ProtocolRoleServer { muxerProtocolRole = muxer.ProtocolRoleResponder diff --git a/protocol/txsubmission/client.go b/protocol/txsubmission/client.go index 3fa09244..553b7bcd 100644 --- a/protocol/txsubmission/client.go +++ b/protocol/txsubmission/client.go @@ -53,6 +53,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { Name: ProtocolName, ProtocolId: ProtocolId, Muxer: protoOptions.Muxer, + Logger: protoOptions.Logger, ErrorChan: protoOptions.ErrorChan, Mode: protoOptions.Mode, Role: protocol.ProtocolRoleClient, @@ -75,6 +76,8 @@ func (c *Client) Init() { } func (c *Client) messageHandler(msg protocol.Message) error { + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client message for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) var err error switch msg.Type() { case MessageTypeRequestTxIds: @@ -92,6 +95,8 @@ func (c *Client) messageHandler(msg protocol.Message) error { } func (c *Client) handleRequestTxIds(msg protocol.Message) error { + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client request tx ids for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) if c.config.RequestTxIdsFunc == nil { return fmt.Errorf( "received tx-submission RequestTxIds message but no callback function is defined", @@ -116,6 +121,8 @@ func (c *Client) handleRequestTxIds(msg protocol.Message) error { } func (c *Client) handleRequestTxs(msg protocol.Message) error { + c.Protocol.Logger(). + Debug(fmt.Sprintf("%s: client request txs for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr)) if c.config.RequestTxsFunc == nil { return fmt.Errorf( "received tx-submission RequestTxs message but no callback function is defined",