Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions gossip/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package gossip

import "time"

func setProgressThreshold(threshold time.Duration) {
noProgressTime = threshold
}

func setApplicationThreshold(threshold time.Duration) {
noAppMessageTime = threshold
}
97 changes: 81 additions & 16 deletions gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ const (
// txChanSize is the size of channel listening to NewTxsNotify.
// The number is referenced from the size of tx pool.
txChanSize = 4096

// percentage of useless peer nodes to allow
uselessPeerPercentage = 20 // 20%

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Why don't we use just a factor, e.g. 0.2, instead of then having to calculate each time the percentage?


// Number of application errors that can be tolerated before banning the node and disconnecting
toleranceOfApplicationErrors = 3
)

var (
ErrorProgressTimeout = errors.New("progress timeout")
ErrorApplicationTimeout = errors.New("application timeout")
)

func errResp(code errCode, format string, v ...interface{}) error {
Expand Down Expand Up @@ -782,23 +793,30 @@ func (h *handler) handle(p *peer) error {
p.Log().Error("Snapshot extension barrier failed", "err", err)
return err
}
useless := discfilter.Banned(p.Node().ID(), p.Node().Record())
if !useless && (!eligibleForSnap(p.Peer) || !strings.Contains(strings.ToLower(p.Name()), "opera")) {
useless = true

// Some clients have compatible caps and thus pass discovery checks and seep in to
// protocol handler. We should band these clients immediately.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: little typo

// ex: go-corex, Efireal, Geth all with caps=[opera/62]
if !strings.Contains(strings.ToLower(p.Name()), "opera") {
discfilter.Ban(p.ID())
return p2p.DiscProtocolError
}
if !p.Peer.Info().Network.Trusted && useless && h.peers.UselessNum() >= h.maxPeers/10 {
// don't allow more than 10% of useless peers

// A useless peer is the one which does not support protocols opera/63 & fsnap/1.
useless := !eligibleForSnap(p.Peer)
if !p.Peer.Info().Network.Trusted && useless && h.peers.UselessNum() >= (h.maxPeers*(uselessPeerPercentage/100)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I am not yet familiar with this useless stuff, but why do we even allow a percentage of useless peers at all? Why don't we just disconnect them all?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the peer is useless in the context of sync, i.e. it doesn't support fsnap/1 and opera/63.
But old peers supporting opera/62 should still be allowed to participate.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so I assume useless then already checked that the peer is a opera/62 peer. It's not just any peer. That would make sense.

// don't allow more than 20% of useless peers
return p2p.DiscTooManyPeers
}
if !p.Peer.Info().Network.Trusted && useless {
if h.peers.UselessNum() >= h.maxPeers/10 {
// don't allow more than 10% of useless peers
return p2p.DiscTooManyPeers
}
p.SetUseless()
}

// Disconnect if maxPeers is reached
if h.peers.Len() >= h.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers
}

h.peerWG.Add(1)
defer h.peerWG.Done()

Expand All @@ -809,9 +827,7 @@ func (h *handler) handle(p *peer) error {
)
if err := p.Handshake(h.NetworkID, myProgress, common.Hash(genesis)); err != nil {
p.Log().Debug("Handshake failed", "err", err)
if !useless {
discfilter.Ban(p.ID())
}
discfilter.Ban(p.ID())
return err
}

Expand Down Expand Up @@ -856,11 +872,51 @@ func (h *handler) handle(p *peer) error {
// after this will be sent via broadcasts.
h.syncTransactions(p, h.txpool.SampleHashes(h.config.Protocol.MaxInitialTxHashesSend))

// Handle incoming messages until the connection is torn down
// Handle incoming messages until the connection is torn down or the inactivity
// timer times out.
var noOfApplicationErrors = 0
for {
if err := h.handleMsg(p); err != nil {
p.Log().Debug("Message handling failed", "err", err)
return err
// progress and application
progressWatchDogTimer := time.NewTimer(noProgressTime)
applicationWatchDogTimer := time.NewTimer(noAppMessageTime)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we recreating the timer on each for iteration here? Therefore the Resets later are useless? It looks to me that either we have to create the timers outside of the for loop, and then Reset them as you do now, or recreating them in each loop iteration and just break when we Reset, although this then results in a lot of garbage collected timers? Or am I missing something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops... the timer should be outside the loop.

select {
case <-progressWatchDogTimer.C:
// If self syncing, don't check peer progress
if !h.syncStatus.AcceptTxs() {
progressWatchDogTimer.Reset(noProgressTime)
break
}
if p.IsPeerProgressing() {
progressWatchDogTimer.Reset(noProgressTime)
} else {
p.Log().Warn("progress timer timeout: ", "name", p.Name(), "node", p.Node().String())
discfilter.Ban(p.ID())
return ErrorProgressTimeout
}
case <- applicationWatchDogTimer.C:
if p.IsApplicationProgressing() {
applicationWatchDogTimer.Reset(noAppMessageTime)
} else {
p.Log().Warn("application timer timeout: ", "name", p.Name(), "node", p.Node().String())
discfilter.Ban(p.ID())
return ErrorApplicationTimeout
}
default:
err := h.handleMsg(p)
if err != nil {
p.Log().Debug("Message handling failed", "err", err)
if strings.Contains(err.Error(), errorToString[ErrPeerNotProgressing]) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use errors.Is here instead of comparing strings?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use errors.Is() only to compare errors. But in this place, the error is defined as a string.
If we want to change it, we should define all the errors as errors.New().

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agreed. If there are more such string based errors instead of errors.New() based ones (which I believe would be better) - then this should go into a separate PR to address. So up to you if you want to do anything in this PR.

discfilter.Ban(p.ID())
return err
}
// Ban peer and disconnect if the number of errors in the handling of application message
// crosses a threshold.
noOfApplicationErrors++
if noOfApplicationErrors > toleranceOfApplicationErrors {
discfilter.Ban(p.ID())
return err
}
}
}
}
}
Expand Down Expand Up @@ -1014,6 +1070,10 @@ func (h *handler) handleMsg(p *peer) error {
return errResp(ErrDecode, "%v: %v", msg, err)
}
p.SetProgress(progress)
// If peer has not progressed for noProgressTime minutes, then disconnect the peer.
if !p.IsPeerProgressing() {
return errResp(ErrPeerNotProgressing, "%v: %v %v", "epoch is not progressing for ", noProgressTime, "minutes")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: As noProgressTime is a duration, this would print "epoch is not progressing for 3m0s minutes", I think

}

case msg.Code == EvmTxsMsg:
// Transactions arrived, make sure we have a valid and fresh graph to handle them
Expand Down Expand Up @@ -1316,6 +1376,11 @@ func (h *handler) handleMsg(p *peer) error {
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}

if msg.Code != ProgressMsg {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not yet familiar with all message codes, but is ProgressMsg the only message which signals that there is progress?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

// Since a valid application message is received, set the peer as progressing.
p.SetApplicationProgress()
}
return nil
}

Expand Down
42 changes: 41 additions & 1 deletion gossip/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (
handshakeTimeout = 5 * time.Second
)

var (
noProgressTime = 3 * time.Minute // Max allowed minutes to elapse without Epoch progress
noAppMessageTime = 15 * time.Minute // Max allowed minutes to elapse without any Application message
)

// PeerInfo represents a short summary of the sub-protocol metadata known
// about a connected peer.
type PeerInfo struct {
Expand Down Expand Up @@ -62,7 +67,9 @@ type peer struct {
queuedDataSemaphore *datasemaphore.DataSemaphore
term chan struct{} // Termination channel to stop the broadcaster

progress PeerProgress
progress PeerProgress
progressTime time.Time // The last progress message (with progressed Epoch)
appMessageTime time.Time // The last valid application message received

snapExt *snapPeer // Satellite `snap` connection
syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time
Expand All @@ -85,7 +92,29 @@ func (p *peer) SetProgress(x PeerProgress) {
p.Lock()
defer p.Unlock()

// Check if the peer is progressing
if x.More(p.progress) {
p.setPeerAsProgressing(x)
}
}

func (p *peer) setPeerAsProgressing(x PeerProgress) {
p.progress = x
p.progressTime = time.Now()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason why p.appMessageTime is locked, but p.progressTime isn't?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's locked in SetProgress() where setPeerAsProgressing() is called.

}

func (p *peer) IsPeerProgressing() bool {
return time.Since(p.progressTime) < noProgressTime
}

func (p *peer) SetApplicationProgress() {
p.Lock()
defer p.Unlock()
p.appMessageTime = time.Now()
}

func (p *peer) IsApplicationProgressing() bool {
return time.Since(p.appMessageTime) < noAppMessageTime
}

func (p *peer) InterestedIn(h hash.Event) bool {
Expand All @@ -100,6 +129,15 @@ func (p *peer) InterestedIn(h hash.Event) bool {
!p.knownEvents.Contains(h)
}

func (a *PeerProgress) More(b PeerProgress) bool {
if a.Epoch > b.Epoch {
return true
} else if a.Epoch == b.Epoch && a.LastBlockIdx > b.LastBlockIdx {
return true
}
return false
}

func (a *PeerProgress) Less(b PeerProgress) bool {
if a.Epoch != b.Epoch {
return a.Epoch < b.Epoch
Expand All @@ -119,6 +157,8 @@ func newPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, cfg PeerCacheConfi
queue: make(chan broadcastItem, cfg.MaxQueuedItems),
queuedDataSemaphore: datasemaphore.New(dag.Metric{cfg.MaxQueuedItems, cfg.MaxQueuedSize}, getSemaphoreWarningFn("Peers queue")),
term: make(chan struct{}),
progressTime: time.Now(),
appMessageTime: time.Now(),
}

go peer.broadcast(peer.queue)
Expand Down
67 changes: 67 additions & 0 deletions gossip/peer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package gossip

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestPeerProgressWithEpoch(t *testing.T) {
// Increment epoch and see if the peer is progressing
setProgressThreshold(1 * time.Second)
newPeer := getPeer()
ep1 := PeerProgress{Epoch: 1}
newPeer.SetProgress(ep1)
time.Sleep(2 * time.Second) //set the threshold to 2 second

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these Sleep acctumulate to 9 seconds - making test runs 9 seconds slower as I understand. Isn't there a different way to test this? Do we actually even need to sleep?

ep2 := PeerProgress{Epoch: 2}
newPeer.SetProgress(ep2)
require.True(t, newPeer.IsPeerProgressing(), "Peer is not progressing")
}

func TestPeerNotProgressWithEpoch(t *testing.T) {
// Don't Increment epoch and check if the peer is not progressing
setProgressThreshold(1 * time.Second)
newPeer := getPeer()
ep1 := PeerProgress{Epoch: 1}
newPeer.SetProgress(ep1)
time.Sleep(2 * time.Second) //set the threshold to 2 second so that the threshold is expired
ep2 := PeerProgress{Epoch: 1}
newPeer.SetProgress(ep2)
require.False(t, newPeer.IsPeerProgressing(), "Peer is progressing")
}

func TestPeerNotProgressTimeout(t *testing.T) {
// Don't Increment epoch and check if the peer is not progressing
setProgressThreshold(1 * time.Second)
newPeer := getPeer()
ep1 := PeerProgress{Epoch: 1}
newPeer.SetProgress(ep1)
time.Sleep(2 * time.Second) //set the threshold to 2 second so that the timer expires
require.False(t, newPeer.IsPeerProgressing(), "Peer is progressing")
}

func TestApplicationProgressMessage(t *testing.T) {
// send a valid application message and check if the peer is progressing
setApplicationThreshold(2 * time.Second)
newPeer := getPeer()
newPeer.SetApplicationProgress() // simulate receiving of a valid application message
time.Sleep(1 * time.Second) //set the threshold to 1 second
require.True(t, newPeer.IsApplicationProgressing(), "Application is not progressing")
}

func TestApplicationNotProgressingMessage(t *testing.T) {
// send a valid application message and check if the peer is progressing
setApplicationThreshold(1 * time.Second)
newPeer := getPeer()
newPeer.SetApplicationProgress() // simulate receiving of a valid application message
time.Sleep(2 * time.Second) //set the threshold to 2 second so that the threshold timer expires
require.False(t, newPeer.IsApplicationProgressing(), "Application is progressing")
}

func getPeer() *peer {
peer := &peer{
progressTime: time.Now(),
}
return peer
}
2 changes: 2 additions & 0 deletions gossip/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const (
ErrNoStatusMsg
ErrExtraStatusMsg
ErrSuspendedPeer
ErrPeerNotProgressing
ErrEmptyMessage = 0xf00
)

Expand All @@ -97,6 +98,7 @@ var errorToString = map[int]string{
ErrNoStatusMsg: "No status message",
ErrExtraStatusMsg: "Extra status message",
ErrSuspendedPeer: "Suspended peer",
ErrPeerNotProgressing: "Peer not progressing",
ErrEmptyMessage: "Empty message",
}

Expand Down