Skip to content
46 changes: 46 additions & 0 deletions pkg/node/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2022 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package node

import (
"github.com/ethersphere/bee/v2/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)

type nodeMetrics struct {
// WarmupDuration measures time in seconds for the node warmup to complete
WarmupDuration prometheus.Histogram
// FullSyncDuration measures time in seconds for the full sync to complete
FullSyncDuration prometheus.Histogram
}

func newMetrics() nodeMetrics {
subsystem := "init"

return nodeMetrics{
WarmupDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Subsystem: subsystem,
Name: "warmup_duration_seconds",
Help: "Duration in seconds for node warmup to complete",
},
),
FullSyncDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Subsystem: subsystem,
Name: "full_sync_duration_seconds",
Help: "Duration in seconds for node warmup to complete",
},
),
}
}

// RegisterMetrics registers all metrics from the package
func (m nodeMetrics) RegisterMetrics() {
prometheus.MustRegister(m.WarmupDuration)
prometheus.MustRegister(m.FullSyncDuration)
}
50 changes: 48 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ func NewBee(
session accesscontrol.Session,
o *Options,
) (b *Bee, err error) {
// start time for node warmup duration measurement
warmupStartTime := time.Now()
var pullSyncStartTime time.Time

nodeMetrics := newMetrics()
nodeMetrics.RegisterMetrics()

tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{
Enabled: o.TracingEnabled,
Endpoint: o.TracingEndpoint,
Expand Down Expand Up @@ -598,9 +605,18 @@ func NewBee(
logger.Info("node warmup check initiated. monitoring activity rate to determine readiness.", "startTime", t)
}

detector.OnStabilized = func(t time.Time, totalCount int) {
logger.Info("node warmup complete. system is considered stable and ready.", "stabilizationTime", t, "totalMonitoredEvents", totalCount)
warmupMeasurement := func(t time.Time, totalCount int) {
warmupDuration := t.Sub(warmupStartTime).Seconds()
logger.Info("node warmup complete. system is considered stable and ready.",
"stabilizationTime", t,
"totalMonitoredEvents", totalCount,
"warmupDurationSeconds", warmupDuration)

// Record the warmup duration in the prometheus metric
nodeMetrics.WarmupDuration.Observe(warmupDuration)
pullSyncStartTime = t
}
detector.OnStabilized = warmupMeasurement

detector.OnPeriodComplete = func(t time.Time, periodCount int, stDev float64) {
logger.Debug("node warmup check: period complete.", "periodEndTime", t, "eventsInPeriod", periodCount, "rateStdDev", stDev)
Expand Down Expand Up @@ -1133,6 +1149,36 @@ func NewBee(
localStore.StartReserveWorker(ctx, pullerService, waitNetworkRFunc)
nodeStatus.SetSync(pullerService)

// measure full sync duration
detector.OnStabilized = func(t time.Time, totalCount int) {
warmupMeasurement(t, totalCount)

reserveTreshold := reserveCapacity >> 1
isFullySynced := func() bool {
return pullerService.SyncRate() == 0 && saludService.IsHealthy() && localStore.ReserveSize() >= reserveTreshold
}

syncCheckTicker := time.NewTicker(2 * time.Second)
go func() {
defer syncCheckTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-syncCheckTicker.C:
synced := isFullySynced()
logger.Debug("sync status check", "synced", synced, "reserveSize", localStore.ReserveSize(), "threshold", reserveTreshold, "syncRate", pullerService.SyncRate())
Copy link
Member

@gacevicljubisa gacevicljubisa Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change log level to Trace, because it will spam every second until ReserveSize reaches trashold? Or we can even increase the time checking to 2 seconds?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I increased the time check to 2 seconds because debug level is the most verbose.

if synced {
fullSyncTime := pullSyncStartTime.Sub(t)
nodeMetrics.FullSyncDuration.Observe(fullSyncTime.Seconds())
syncCheckTicker.Stop()
return
}
}
}
}()
}

if o.EnableStorageIncentives {

redistributionContractAddress := chainCfg.RedistributionAddress
Expand Down
15 changes: 15 additions & 0 deletions pkg/salud/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type metrics struct {
ReserveSizePercentErr prometheus.Gauge
Healthy prometheus.Counter
Unhealthy prometheus.Counter
NeighborhoodAvgDur prometheus.Gauge
NeighborCount prometheus.Gauge
}

func newMetrics() metrics {
Expand Down Expand Up @@ -79,6 +81,19 @@ func newMetrics() metrics {
Name: "reserve_size_percentage_err",
Help: "Percentage error of the reservesize relative to the network average.",
}),
// Neighborhood-specific metrics
NeighborhoodAvgDur: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "neighborhood_dur",
Help: "Average duration for snapshot response from neighborhood peers.",
}),
NeighborCount: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "neighbors",
Help: "Number of neighborhood peers.",
}),
}
}

Expand Down
33 changes: 27 additions & 6 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,12 @@ type peer struct {
// the allowed thresholds.
func (s *service) salud(mode string, durPercentile float64, connsPercentile float64) {
var (
mtx sync.Mutex
wg sync.WaitGroup
totaldur float64
peers []peer
mtx sync.Mutex
wg sync.WaitGroup
totaldur float64
peers []peer
neighborhoodPeers uint
neighborhoodTotalDur float64
)

err := s.topology.EachConnectedPeer(func(addr swarm.Address, bin uint8) (stop bool, jumpToNext bool, err error) {
Expand All @@ -163,7 +165,12 @@ func (s *service) salud(mode string, durPercentile float64, connsPercentile floa

mtx.Lock()
totaldur += dur.Seconds()
peers = append(peers, peer{snapshot, dur, addr, bin, s.reserve.IsWithinStorageRadius(addr)})
peer := peer{snapshot, dur, addr, bin, s.reserve.IsWithinStorageRadius(addr)}
peers = append(peers, peer)
if peer.neighbor {
neighborhoodPeers++
neighborhoodTotalDur += dur.Seconds()
}
mtx.Unlock()
}()
return false, false, nil
Expand All @@ -184,14 +191,28 @@ func (s *service) salud(mode string, durPercentile float64, connsPercentile floa
pConns := percentileConns(peers, connsPercentile)
commitment := commitment(peers)

if neighborhoodPeers > 0 {
neighborhoodAvgDur := neighborhoodTotalDur / float64(neighborhoodPeers)

s.metrics.NeighborhoodAvgDur.Set(neighborhoodAvgDur)
s.metrics.NeighborCount.Set(float64(neighborhoodPeers))

s.logger.Debug("neighborhood metrics", "avg_dur", neighborhoodAvgDur, "count", neighborhoodPeers)
} else {
s.metrics.NeighborhoodAvgDur.Set(0)
s.metrics.NeighborCount.Set(0)

s.logger.Debug("no neighborhood peers found for metrics")
}

s.metrics.AvgDur.Set(avgDur)
s.metrics.PDur.Set(pDur)
s.metrics.PConns.Set(float64(pConns))
s.metrics.NetworkRadius.Set(float64(networkRadius))
s.metrics.NeighborhoodRadius.Set(float64(nHoodRadius))
s.metrics.Commitment.Set(float64(commitment))

s.logger.Debug("computed", "avg_dur", avgDur, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment)
s.logger.Debug("computed", "avg_dur", avgDur, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment, "neighborhood_peers", neighborhoodPeers)

// sort peers by duration, highest first to give priority to the fastest peers
sort.Slice(peers, func(i, j int) bool {
Expand Down
Loading