Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions protocol/blockfetch/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,37 @@ func (s *Server) initProtocol() {

func (s *Server) NoBlocks() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server %+v called NoBlocks()", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("calling NoBlocks()",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
msg := NewMsgNoBlocks()
return s.SendMessage(msg)
}

func (s *Server) StartBatch() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server %+v called StartBatch()", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("calling StartBatch()",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
msg := NewMsgStartBatch()
return s.SendMessage(msg)
}

func (s *Server) Block(blockType uint, blockData []byte) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server %+v called Block(blockType: %+v, blockData: %x)", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr, blockType, blockData))
Debug(
fmt.Sprintf("calling Block(blockType: %+x, blockData: %x)", blockType, blockData),
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
wrappedBlock := WrappedBlock{
Type: blockType,
RawBlock: blockData,
Expand All @@ -90,7 +106,12 @@ func (s *Server) Block(blockType uint, blockData []byte) error {

func (s *Server) BatchDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server %+v called BatchDone()", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("calling BatchDone()",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
msg := NewMsgBatchDone()
return s.SendMessage(msg)
}
Expand All @@ -114,7 +135,12 @@ func (s *Server) messageHandler(msg protocol.Message) error {

func (s *Server) handleRequestRange(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server request range for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("request range",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.config == nil || s.config.RequestRangeFunc == nil {
return fmt.Errorf(
"received block-fetch RequestRange message but no callback function is defined",
Expand All @@ -130,7 +156,12 @@ func (s *Server) handleRequestRange(msg protocol.Message) error {

func (s *Server) handleClientDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server client done for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("client done",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
// Restart protocol
s.Protocol.Stop()
s.initProtocol()
Expand Down
53 changes: 47 additions & 6 deletions protocol/chainsync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,47 @@ func (s *Server) initProtocol() {

func (s *Server) RollBackward(point common.Point, tip Tip) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server %+v called RollBackward(point: {Slot: %d, Hash: %x}, tip: {Point: %+v, BlockNumber: %d})", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr, point.Slot, point.Hash, tip.Point, tip.BlockNumber))
Debug(
fmt.Sprintf("calling RollBackward(point: {Slot: %d, Hash: %x}, tip: {Point: {Slot: %d, Hash: %x}, BlockNumber: %d})",
point.Slot, point.Hash,
tip.Point.Slot, tip.Point.Hash,
tip.BlockNumber,
),
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
msg := NewMsgRollBackward(point, tip)
return s.SendMessage(msg)
}

func (s *Server) AwaitReply() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server %+v called AwaitReply()", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("calling AwaitReply()",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
msg := NewMsgAwaitReply()
return s.SendMessage(msg)
}

func (s *Server) RollForward(blockType uint, blockData []byte, tip Tip) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server %+v called RollForward(blockType: %+v, blockData: %x, tip: {Point: {Slot: %d, Hash: %x}, BlockNumber: %d})", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr, blockType, blockData, tip.Point.Slot, tip.Point.Hash, tip.BlockNumber))
Debug(
fmt.Sprintf("calling RollForward(blockType: %+x, blockData: %x, tip: {Point: {Slot: %d, Hash: %x}, BlockNumber: %d})",
blockType,
blockData,
tip.Point.Slot, tip.Point.Hash,
tip.BlockNumber,
),
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.Mode() == protocol.ProtocolModeNodeToNode {
eraId := ledger.BlockToBlockHeaderTypeMap[blockType]
msg := NewMsgRollForwardNtN(
Expand Down Expand Up @@ -136,7 +162,12 @@ func (s *Server) handleRequestNext() error {
// TODO: figure out why this one log message causes a panic (and only this one)
// during tests
//s.Protocol.Logger().
// Debug(fmt.Sprintf("%s: server request next for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
// Debug("request next",
// "component", "network",
// "protocol", ProtocolName,
// "role", "server",
// "connection_id", s.callbackContext.ConnectionId.String(),
// )
if s.config == nil || s.config.RequestNextFunc == nil {
return fmt.Errorf(
"received chain-sync RequestNext message but no callback function is defined",
Expand All @@ -147,7 +178,12 @@ func (s *Server) handleRequestNext() error {

func (s *Server) handleFindIntersect(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server find intersect for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("find intersect",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.config == nil || s.config.FindIntersectFunc == nil {
return fmt.Errorf(
"received chain-sync FindIntersect message but no callback function is defined",
Expand Down Expand Up @@ -177,7 +213,12 @@ func (s *Server) handleFindIntersect(msg protocol.Message) error {

func (s *Server) handleDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server done for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("done",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
// Restart protocol
s.Protocol.Stop()
s.initProtocol()
Expand Down
7 changes: 6 additions & 1 deletion protocol/handshake/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ func (s *Server) handleMessage(msg protocol.Message) error {

func (s *Server) handleProposeVersions(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server propose versions for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("propose versions",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.config.FinishedFunc == nil {
return fmt.Errorf(
"received handshake ProposeVersions message but no callback function is defined",
Expand Down
14 changes: 12 additions & 2 deletions protocol/keepalive/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ func (s *Server) messageHandler(msg protocol.Message) error {

func (s *Server) handleKeepAlive(msgGeneric protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server keep alive for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("keep alive",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
msg := msgGeneric.(*MsgKeepAlive)
if s.config != nil && s.config.KeepAliveFunc != nil {
// Call the user callback function
Expand All @@ -84,7 +89,12 @@ func (s *Server) handleKeepAlive(msgGeneric protocol.Message) error {

func (s *Server) handleDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server done for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("done",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.config != nil && s.config.DoneFunc != nil {
// Call the user callback function
return s.config.DoneFunc(s.callbackContext)
Expand Down
35 changes: 30 additions & 5 deletions protocol/localstatequery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ func (s *Server) messageHandler(msg protocol.Message) error {

func (s *Server) handleAcquire(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server acquire for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("acquire",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.config.AcquireFunc == nil {
return fmt.Errorf(
"received local-state-query Acquire message but no callback function is defined",
Expand All @@ -112,7 +117,12 @@ func (s *Server) handleAcquire(msg protocol.Message) error {

func (s *Server) handleQuery(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server query for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("query",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.config.QueryFunc == nil {
return fmt.Errorf(
"received local-state-query Query message but no callback function is defined",
Expand All @@ -125,7 +135,12 @@ func (s *Server) handleQuery(msg protocol.Message) error {

func (s *Server) handleRelease() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server release for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("release",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.config.ReleaseFunc == nil {
return fmt.Errorf(
"received local-state-query Release message but no callback function is defined",
Expand All @@ -137,7 +152,12 @@ func (s *Server) handleRelease() error {

func (s *Server) handleReAcquire(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server reacquire for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("reacquire",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.config.ReAcquireFunc == nil {
return fmt.Errorf(
"received local-state-query ReAcquire message but no callback function is defined",
Expand All @@ -156,7 +176,12 @@ func (s *Server) handleReAcquire(msg protocol.Message) error {

func (s *Server) handleDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server done for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("done",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.config.DoneFunc == nil {
return fmt.Errorf(
"received local-state-query Done message but no callback function is defined",
Expand Down
42 changes: 36 additions & 6 deletions protocol/localtxmonitor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ func (s *Server) messageHandler(msg protocol.Message) error {

func (s *Server) handleAcquire() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server acquire for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("acquire",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.config.GetMempoolFunc == nil {
return fmt.Errorf(
"received local-tx-monitor Acquire message but no GetMempool callback function is defined",
Expand Down Expand Up @@ -126,21 +131,36 @@ func (s *Server) handleAcquire() error {

func (s *Server) handleDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server done for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("done",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
return nil
}

func (s *Server) handleRelease() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server release for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("release",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
s.mempoolCapacity = 0
s.mempoolTxs = nil
return nil
}

func (s *Server) handleHasTx(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server has tx for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("has tx",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
msgHasTx := msg.(*MsgHasTx)
txId := hex.EncodeToString(msgHasTx.TxId)
hasTx := false
Expand All @@ -159,7 +179,12 @@ func (s *Server) handleHasTx(msg protocol.Message) error {

func (s *Server) handleNextTx() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server next tx for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("next tx",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.mempoolNextTxIdx > len(s.mempoolTxs) {
newMsg := NewMsgReplyNextTx(0, nil)
if err := s.SendMessage(newMsg); err != nil {
Expand All @@ -178,7 +203,12 @@ func (s *Server) handleNextTx() error {

func (s *Server) handleGetSizes() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server get sizes for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("get sizes",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
totalTxSize := 0
for _, tx := range s.mempoolTxs {
totalTxSize += len(tx.Tx)
Expand Down
14 changes: 12 additions & 2 deletions protocol/localtxsubmission/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ func (s *Server) messageHandler(msg protocol.Message) error {

func (s *Server) handleSubmitTx(msg protocol.Message) error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server submit tx for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("submit tx",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
if s.config.SubmitTxFunc == nil {
return fmt.Errorf(
"received local-tx-submission SubmitTx message but no callback function is defined",
Expand Down Expand Up @@ -102,6 +107,11 @@ func (s *Server) handleSubmitTx(msg protocol.Message) error {

func (s *Server) handleDone() error {
s.Protocol.Logger().
Debug(fmt.Sprintf("%s: server done for %+v", ProtocolName, s.callbackContext.ConnectionId.RemoteAddr))
Debug("done",
"component", "network",
"protocol", ProtocolName,
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
return nil
}
Loading