Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ func NewBee(
return nil, fmt.Errorf("status service: %w", err)
}

saludService := salud.New(nodeStatus, kad, localStore, logger, detector, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile)
saludService := salud.New(nodeStatus, kad, localStore, logger, detector, api.FullMode.String(), salud.DefaultDurPercentile, salud.DefaultConnsPercentile)
b.saludCloser = saludService

rC, unsub := saludService.SubscribeNetworkStorageRadius()
Expand Down
22 changes: 5 additions & 17 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ const (
initialBackoffDelay = 10 * time.Second
maxBackoffDelay = 5 * time.Minute
backoffFactor = 2
DefaultMinPeersPerBin = 4
DefaultDurPercentile = 0.4 // consider 40% as healthy, lower percentile = stricter duration check
DefaultConnsPercentile = 0.8 // consider 80% as healthy, lower percentile = stricter conns check
)
Expand Down Expand Up @@ -64,7 +63,6 @@ func New(
logger log.Logger,
startupStabilizer stabilization.Subscriber,
mode string,
minPeersPerbin int,
durPercentile float64,
connsPercentile float64,
) *service {
Expand All @@ -81,12 +79,12 @@ func New(
}

s.wg.Add(1)
go s.worker(startupStabilizer, mode, minPeersPerbin, durPercentile, connsPercentile)
go s.worker(startupStabilizer, mode, durPercentile, connsPercentile)

return s
}

func (s *service) worker(startupStabilizer stabilization.Subscriber, mode string, minPeersPerbin int, durPercentile float64, connsPercentile float64) {
func (s *service) worker(startupStabilizer stabilization.Subscriber, mode string, durPercentile float64, connsPercentile float64) {
defer s.wg.Done()

sub, unsubscribe := startupStabilizer.Subscribe()
Expand All @@ -102,7 +100,7 @@ func (s *service) worker(startupStabilizer stabilization.Subscriber, mode string
currentDelay := initialBackoffDelay

for {
s.salud(mode, minPeersPerbin, durPercentile, connsPercentile)
s.salud(mode, durPercentile, connsPercentile)

select {
case <-s.quit:
Expand Down Expand Up @@ -134,13 +132,12 @@ type peer struct {
// salud acquires the status snapshot of every peer and computes an nth percentile of response duration and connected
// per count, the most common storage radius, and the batch commitment, and based on these values, marks peers as unhealhy that fall beyond
// the allowed thresholds.
func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, connsPercentile float64) {
func (s *service) salud(mode string, durPercentile float64, connsPercentile float64) {
var (
mtx sync.Mutex
wg sync.WaitGroup
totaldur float64
peers []peer
bins [swarm.MaxBins]int
)

err := s.topology.EachConnectedPeer(func(addr swarm.Address, bin uint8) (stop bool, jumpToNext bool, err error) {
Expand All @@ -165,7 +162,6 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
}

mtx.Lock()
bins[bin]++
totaldur += dur.Seconds()
peers = append(peers, peer{snapshot, dur, addr, bin, s.reserve.IsWithinStorageRadius(addr)})
mtx.Unlock()
Expand Down Expand Up @@ -206,17 +202,10 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,

var healthy bool

// every bin should have at least some peers, healthy or not
if bins[peer.bin] <= minPeersPerbin {
s.metrics.Healthy.Inc()
s.topology.UpdatePeerHealth(peer.addr, true, peer.dur)
continue
}

if networkRadius > 0 && peer.status.CommittedDepth < uint32(networkRadius-2) {
s.logger.Debug("radius health failure", "radius", peer.status.CommittedDepth, "peer_address", peer.addr, "bin", peer.bin)
} else if peer.dur.Seconds() > pDur {
s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr, "bin", peer.bin)
s.logger.Debug("response duration above threshold", "duration", peer.dur, "peer_address", peer.addr, "bin", peer.bin)
} else if peer.status.ConnectedPeers < pConns {
s.logger.Debug("connections count below threshold", "connections", peer.status.ConnectedPeers, "peer_address", peer.addr, "bin", peer.bin)
} else if peer.status.BatchCommitment != commitment {
Expand All @@ -230,7 +219,6 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
s.metrics.Healthy.Inc()
} else {
s.metrics.Unhealthy.Inc()
bins[peer.bin]--
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/salud/salud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestSalud(t *testing.T) {
mockstorer.WithCapacityDoubling(2),
)

service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0, 0.8, 0.8)
service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestSelfUnhealthyRadius(t *testing.T) {
mockstorer.WithCapacityDoubling(0),
)

service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0, 0.8, 0.8)
service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestSelfHealthyCapacityDoubling(t *testing.T) {
mockstorer.WithCapacityDoubling(2),
)

service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0, 0.8, 0.8)
service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestSubToRadius(t *testing.T) {

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, stabilmock.NewSubscriber(true), "full", 0, 0.8, 0.8)
service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8)

c, unsub := service.SubscribeNetworkStorageRadius()
t.Cleanup(unsub)
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestUnsub(t *testing.T) {

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, stabilmock.NewSubscriber(true), "full", 0, 0.8, 0.8)
service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8)
testutil.CleanupCloser(t, service)

c, unsub := service.SubscribeNetworkStorageRadius()
Expand Down
Loading