diff --git a/pkg/node/metrics.go b/pkg/node/metrics.go new file mode 100644 index 00000000000..90005ceec7a --- /dev/null +++ b/pkg/node/metrics.go @@ -0,0 +1,46 @@ +// Copyright 2022 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package node + +import ( + "github.com/ethersphere/bee/v2/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +type nodeMetrics struct { + // WarmupDuration measures time in seconds for the node warmup to complete + WarmupDuration prometheus.Histogram + // FullSyncDuration measures time in seconds for the full sync to complete + FullSyncDuration prometheus.Histogram +} + +func newMetrics() nodeMetrics { + subsystem := "init" + + return nodeMetrics{ + WarmupDuration: prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: metrics.Namespace, + Subsystem: subsystem, + Name: "warmup_duration_seconds", + Help: "Duration in seconds for node warmup to complete", + }, + ), + FullSyncDuration: prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: metrics.Namespace, + Subsystem: subsystem, + Name: "full_sync_duration_seconds", + Help: "Duration in seconds for node warmup to complete", + }, + ), + } +} + +// RegisterMetrics registers all metrics from the package +func (m nodeMetrics) RegisterMetrics() { + prometheus.MustRegister(m.WarmupDuration) + prometheus.MustRegister(m.FullSyncDuration) +} diff --git a/pkg/node/node.go b/pkg/node/node.go index 477fe7c230d..f3f37b3f054 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -207,6 +207,13 @@ func NewBee( session accesscontrol.Session, o *Options, ) (b *Bee, err error) { + // start time for node warmup duration measurement + warmupStartTime := time.Now() + var pullSyncStartTime time.Time + + nodeMetrics := newMetrics() + nodeMetrics.RegisterMetrics() + tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{ Enabled: o.TracingEnabled, Endpoint: o.TracingEndpoint, @@ -598,9 +605,18 @@ func NewBee( logger.Info("node warmup check initiated. monitoring activity rate to determine readiness.", "startTime", t) } - detector.OnStabilized = func(t time.Time, totalCount int) { - logger.Info("node warmup complete. system is considered stable and ready.", "stabilizationTime", t, "totalMonitoredEvents", totalCount) + warmupMeasurement := func(t time.Time, totalCount int) { + warmupDuration := t.Sub(warmupStartTime).Seconds() + logger.Info("node warmup complete. system is considered stable and ready.", + "stabilizationTime", t, + "totalMonitoredEvents", totalCount, + "warmupDurationSeconds", warmupDuration) + + // Record the warmup duration in the prometheus metric + nodeMetrics.WarmupDuration.Observe(warmupDuration) + pullSyncStartTime = t } + detector.OnStabilized = warmupMeasurement detector.OnPeriodComplete = func(t time.Time, periodCount int, stDev float64) { logger.Debug("node warmup check: period complete.", "periodEndTime", t, "eventsInPeriod", periodCount, "rateStdDev", stDev) @@ -1133,6 +1149,36 @@ func NewBee( localStore.StartReserveWorker(ctx, pullerService, waitNetworkRFunc) nodeStatus.SetSync(pullerService) + // measure full sync duration + detector.OnStabilized = func(t time.Time, totalCount int) { + warmupMeasurement(t, totalCount) + + reserveTreshold := reserveCapacity >> 1 + isFullySynced := func() bool { + return pullerService.SyncRate() == 0 && saludService.IsHealthy() && localStore.ReserveSize() >= reserveTreshold + } + + syncCheckTicker := time.NewTicker(2 * time.Second) + go func() { + defer syncCheckTicker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-syncCheckTicker.C: + synced := isFullySynced() + logger.Debug("sync status check", "synced", synced, "reserveSize", localStore.ReserveSize(), "threshold", reserveTreshold, "syncRate", pullerService.SyncRate()) + if synced { + fullSyncTime := pullSyncStartTime.Sub(t) + nodeMetrics.FullSyncDuration.Observe(fullSyncTime.Seconds()) + syncCheckTicker.Stop() + return + } + } + } + }() + } + if o.EnableStorageIncentives { redistributionContractAddress := chainCfg.RedistributionAddress diff --git a/pkg/salud/metrics.go b/pkg/salud/metrics.go index a99087d219b..8ac5abdb517 100644 --- a/pkg/salud/metrics.go +++ b/pkg/salud/metrics.go @@ -19,6 +19,8 @@ type metrics struct { ReserveSizePercentErr prometheus.Gauge Healthy prometheus.Counter Unhealthy prometheus.Counter + NeighborhoodAvgDur prometheus.Gauge + NeighborCount prometheus.Gauge } func newMetrics() metrics { @@ -79,6 +81,19 @@ func newMetrics() metrics { Name: "reserve_size_percentage_err", Help: "Percentage error of the reservesize relative to the network average.", }), + // Neighborhood-specific metrics + NeighborhoodAvgDur: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "neighborhood_dur", + Help: "Average duration for snapshot response from neighborhood peers.", + }), + NeighborCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "neighbors", + Help: "Number of neighborhood peers.", + }), } } diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index dc931f86081..28fe648d030 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -134,10 +134,12 @@ type peer struct { // the allowed thresholds. func (s *service) salud(mode string, durPercentile float64, connsPercentile float64) { var ( - mtx sync.Mutex - wg sync.WaitGroup - totaldur float64 - peers []peer + mtx sync.Mutex + wg sync.WaitGroup + totaldur float64 + peers []peer + neighborhoodPeers uint + neighborhoodTotalDur float64 ) err := s.topology.EachConnectedPeer(func(addr swarm.Address, bin uint8) (stop bool, jumpToNext bool, err error) { @@ -163,7 +165,12 @@ func (s *service) salud(mode string, durPercentile float64, connsPercentile floa mtx.Lock() totaldur += dur.Seconds() - peers = append(peers, peer{snapshot, dur, addr, bin, s.reserve.IsWithinStorageRadius(addr)}) + peer := peer{snapshot, dur, addr, bin, s.reserve.IsWithinStorageRadius(addr)} + peers = append(peers, peer) + if peer.neighbor { + neighborhoodPeers++ + neighborhoodTotalDur += dur.Seconds() + } mtx.Unlock() }() return false, false, nil @@ -184,6 +191,20 @@ func (s *service) salud(mode string, durPercentile float64, connsPercentile floa pConns := percentileConns(peers, connsPercentile) commitment := commitment(peers) + if neighborhoodPeers > 0 { + neighborhoodAvgDur := neighborhoodTotalDur / float64(neighborhoodPeers) + + s.metrics.NeighborhoodAvgDur.Set(neighborhoodAvgDur) + s.metrics.NeighborCount.Set(float64(neighborhoodPeers)) + + s.logger.Debug("neighborhood metrics", "avg_dur", neighborhoodAvgDur, "count", neighborhoodPeers) + } else { + s.metrics.NeighborhoodAvgDur.Set(0) + s.metrics.NeighborCount.Set(0) + + s.logger.Debug("no neighborhood peers found for metrics") + } + s.metrics.AvgDur.Set(avgDur) s.metrics.PDur.Set(pDur) s.metrics.PConns.Set(float64(pConns)) @@ -191,7 +212,7 @@ func (s *service) salud(mode string, durPercentile float64, connsPercentile floa s.metrics.NeighborhoodRadius.Set(float64(nHoodRadius)) s.metrics.Commitment.Set(float64(commitment)) - s.logger.Debug("computed", "avg_dur", avgDur, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment) + s.logger.Debug("computed", "avg_dur", avgDur, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment, "neighborhood_peers", neighborhoodPeers) // sort peers by duration, highest first to give priority to the fastest peers sort.Slice(peers, func(i, j int) bool {