Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 10 additions & 26 deletions protocol/txsubmission/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ func (s *Server) Start() {
"connection_id", s.callbackContext.ConnectionId.String(),
)
s.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
<-s.Protocol.DoneChan()
close(s.requestTxIdsResultChan)
close(s.requestTxsResultChan)
}()
})
}

Expand All @@ -103,13 +97,14 @@ func (s *Server) RequestTxIds(
return nil, err
}
// Wait for result
txIds, ok := <-s.requestTxIdsResultChan
if !ok {
select {
case <-s.DoneChan():
return nil, protocol.ProtocolShuttingDownError
case txIds := <-s.requestTxIdsResultChan:
// Update ack count for next call
s.ackCount = len(txIds)
return txIds, nil
}
// Update ack count for next call
s.ackCount = len(txIds)
return txIds, nil
}

// RequestTxs requests the content of the requested TX identifiers from the remote node's mempool
Expand All @@ -127,11 +122,12 @@ func (s *Server) RequestTxs(txIds []TxId) ([]TxBody, error) {
return nil, err
}
// Wait for result
txs, ok := <-s.requestTxsResultChan
if !ok {
select {
case <-s.DoneChan():
return nil, protocol.ProtocolShuttingDownError
case txs := <-s.requestTxsResultChan:
return txs, nil
}
return txs, nil
}

func (s *Server) messageHandler(msg protocol.Message) error {
Expand Down Expand Up @@ -163,12 +159,6 @@ func (s *Server) handleReplyTxIds(msg protocol.Message) error {
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
// Check for shutdown
select {
case <-s.Protocol.DoneChan():
return protocol.ProtocolShuttingDownError
default:
}
msgReplyTxIds := msg.(*MsgReplyTxIds)
s.requestTxIdsResultChan <- msgReplyTxIds.TxIds
return nil
Expand All @@ -182,12 +172,6 @@ func (s *Server) handleReplyTxs(msg protocol.Message) error {
"role", "server",
"connection_id", s.callbackContext.ConnectionId.String(),
)
// Check for shutdown
select {
case <-s.Protocol.DoneChan():
return protocol.ProtocolShuttingDownError
default:
}
msgReplyTxs := msg.(*MsgReplyTxs)
s.requestTxsResultChan <- msgReplyTxs.Txs
return nil
Expand Down