diff --git a/go.mod b/go.mod index 6d118116eab..2f1f708d9ab 100644 --- a/go.mod +++ b/go.mod @@ -146,7 +146,7 @@ require ( github.com/quic-go/quic-go v0.42.0 // indirect github.com/quic-go/webtransport-go v0.6.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect - github.com/shirou/gopsutil v3.21.5+incompatible // indirect + github.com/shirou/gopsutil v3.21.5+incompatible github.com/smartystreets/assertions v1.1.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cast v1.3.0 // indirect diff --git a/pkg/api/api.go b/pkg/api/api.go index 240637b3baf..91c7a6bc8a5 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -282,6 +282,7 @@ func New( chainBackend transaction.Backend, cors []string, stamperStore storage.Store, + registry *prometheus.Registry, ) *Service { s := new(Service) @@ -297,7 +298,7 @@ func New( s.transaction = transaction s.batchStore = batchStore s.chainBackend = chainBackend - s.metricsRegistry = newDebugMetrics() + s.metricsRegistry = registry s.preMapHooks = map[string]func(v string) (string, error){ "mimeMediaType": func(v string) (string, error) { typ, _, err := mime.ParseMediaType(v) diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 986bc663d3e..e4c409a9f2c 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -34,6 +34,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/gsoc" "github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/metrics/registry" p2pmock "github.com/ethersphere/bee/v2/pkg/p2p/mock" "github.com/ethersphere/bee/v2/pkg/pingpong" "github.com/ethersphere/bee/v2/pkg/postage" @@ -212,7 +213,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. o.BeeMode = api.FullMode } - s := api.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, []string{o.WhitelistedAddr}, o.Logger, transaction, o.BatchStore, o.BeeMode, !o.ChequebookDisabled, !o.SwapDisabled, backend, o.CORSAllowedOrigins, inmemstore.New()) + s := api.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, []string{o.WhitelistedAddr}, o.Logger, transaction, o.BatchStore, o.BeeMode, !o.ChequebookDisabled, !o.SwapDisabled, backend, o.CORSAllowedOrigins, inmemstore.New(), registry.NewRegistry(false, o.BeeMode.String()).MetricsRegistry()) testutil.CleanupCloser(t, s) s.SetP2P(o.P2P) @@ -364,7 +365,7 @@ func TestParseName(t *testing.T) { pk, _ := crypto.GenerateSecp256k1Key() signer := crypto.NewDefaultSigner(pk) - s := api.New(pk.PublicKey, pk.PublicKey, common.Address{}, nil, log, nil, nil, 1, false, false, nil, []string{"*"}, inmemstore.New()) + s := api.New(pk.PublicKey, pk.PublicKey, common.Address{}, nil, log, nil, nil, 1, false, false, nil, []string{"*"}, inmemstore.New(), registry.NewRegistry(false, api.FullMode.String()).MetricsRegistry()) s.Configure(signer, nil, api.Options{}, api.ExtraOptions{Resolver: tC.res}, 1, nil) s.Mount() s.EnableFullAPI() diff --git a/pkg/api/metrics.go b/pkg/api/metrics.go index 7635468f4e3..656c17093f2 100644 --- a/pkg/api/metrics.go +++ b/pkg/api/metrics.go @@ -9,10 +9,8 @@ import ( "strconv" "time" - "github.com/ethersphere/bee/v2" m "github.com/ethersphere/bee/v2/pkg/metrics" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" ) const bytesInKB = 1000 @@ -134,33 +132,3 @@ func (rw *responseWriter) WriteHeader(code int) { rw.UpgradedResponseWriter.WriteHeader(code) rw.wroteHeader = true } - -func newDebugMetrics() (r *prometheus.Registry) { - r = prometheus.NewRegistry() - - // register standard metrics - r.MustRegister( - collectors.NewProcessCollector(collectors.ProcessCollectorOpts{ - Namespace: m.Namespace, - }), - collectors.NewGoCollector(), - prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: m.Namespace, - Name: "info", - Help: "Bee information.", - ConstLabels: prometheus.Labels{ - "version": bee.Version, - }, - }), - ) - - return r -} - -func (s *Service) MetricsRegistry() *prometheus.Registry { - return s.metricsRegistry -} - -func (s *Service) MustRegisterMetrics(cs ...prometheus.Collector) { - s.metricsRegistry.MustRegister(cs...) -} diff --git a/pkg/metrics/registry/registry.go b/pkg/metrics/registry/registry.go new file mode 100644 index 00000000000..c3b39ab9e2e --- /dev/null +++ b/pkg/metrics/registry/registry.go @@ -0,0 +1,130 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package registry + +import ( + "context" + "time" + + "github.com/ethersphere/bee/v2" + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/push" + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/mem" +) + +type Registry struct { + register *prometheus.Registry + pushRegister *prometheus.Registry + cpuGauge prometheus.Gauge + memGauge prometheus.Gauge +} + +func NewRegistry(push bool, mode string) *Registry { + r := &Registry{ + register: prometheus.NewRegistry(), + pushRegister: prometheus.NewRegistry(), + } + + c := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{ + Namespace: metrics.Namespace, + }) + + g := collectors.NewGoCollector() + + // Create CPU and memory gauge metrics + cpuGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Name: "system_cpu_usage_percent", + Help: "System CPU usage percentage", + }) + + memGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Name: "system_memory_usage_percent", + Help: "System memory usage percentage", + }) + + v := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Name: "info", + Help: "Bee information.", + ConstLabels: prometheus.Labels{ + "version": bee.Version, + "mode": mode, + }, + }) + + r.MustRegister(c, g, v, cpuGauge, memGauge) + + if push { + r.MustPushRegister(c, g, v, cpuGauge, memGauge) + } + + // Store references to gauges for updating in PushWorker + r.cpuGauge = cpuGauge + r.memGauge = memGauge + + return r +} + +func (r *Registry) MetricsRegistry() *prometheus.Registry { + return r.register +} + +func (r *Registry) PushWorker(ctx context.Context, path string, job string, logger log.Logger) func() error { + metricsPusher := push.New(path, job).Collector(r.pushRegister) + + ctx, cancel := context.WithCancel(ctx) + + go func() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Collect CPU metrics + percentages, err := cpu.Percent(0, false) + if err == nil && len(percentages) > 0 { + r.cpuGauge.Set(percentages[0]) + } else if err != nil { + logger.Debug("failed to collect CPU metrics", "error", err) + } + + // Collect memory metrics + vm, err := mem.VirtualMemory() + if err == nil { + r.memGauge.Set(vm.UsedPercent) + } else { + logger.Debug("failed to collect memory metrics", "error", err) + } + + // Push metrics + if err := metricsPusher.Push(); err != nil { + logger.Debug("metrics push failed", "error", err) + } + } + } + }() + + return func() error { + cancel() + return metricsPusher.Push() + } +} + +func (r *Registry) MustRegister(cs ...prometheus.Collector) { + r.register.MustRegister(cs...) +} + +func (r *Registry) MustPushRegister(cs ...prometheus.Collector) { + r.pushRegister.MustRegister(cs...) +} diff --git a/pkg/node/devnode.go b/pkg/node/devnode.go index 9ff5363c455..e2934251fab 100644 --- a/pkg/node/devnode.go +++ b/pkg/node/devnode.go @@ -25,6 +25,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/feeds/factory" "github.com/ethersphere/bee/v2/pkg/gsoc" "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/metrics/registry" mockP2P "github.com/ethersphere/bee/v2/pkg/p2p/mock" mockPingPong "github.com/ethersphere/bee/v2/pkg/pingpong/mock" "github.com/ethersphere/bee/v2/pkg/postage" @@ -360,7 +361,7 @@ func NewDevBee(logger log.Logger, o *DevOptions) (b *DevBee, err error) { }), ) - apiService := api.New(mockKey.PublicKey, mockKey.PublicKey, overlayEthAddress, nil, logger, mockTransaction, batchStore, api.DevMode, true, true, chainBackend, o.CORSAllowedOrigins, inmemstore.New()) + apiService := api.New(mockKey.PublicKey, mockKey.PublicKey, overlayEthAddress, nil, logger, mockTransaction, batchStore, api.DevMode, true, true, chainBackend, o.CORSAllowedOrigins, inmemstore.New(), registry.NewRegistry(false, api.DevMode.String()).MetricsRegistry()) apiService.Configure(signer, tracer, api.Options{ CORSAllowedOrigins: o.CORSAllowedOrigins, diff --git a/pkg/node/node.go b/pkg/node/node.go index fe82347eb6e..1ae3db41abd 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -36,6 +36,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/hive" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/metrics" + "github.com/ethersphere/bee/v2/pkg/metrics/registry" "github.com/ethersphere/bee/v2/pkg/p2p" "github.com/ethersphere/bee/v2/pkg/p2p/libp2p" "github.com/ethersphere/bee/v2/pkg/pingpong" @@ -77,7 +78,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/util/syncutil" "github.com/hashicorp/go-multierror" ma "github.com/multiformats/go-multiaddr" - promc "github.com/prometheus/client_golang/prometheus" "golang.org/x/crypto/sha3" "golang.org/x/sync/errgroup" ) @@ -429,8 +429,12 @@ func NewBee( b.stamperStoreCloser = stamperStore var apiService *api.Service + var metricsRegistery *registry.Registry if o.APIAddr != "" { + + metricsRegistery = registry.NewRegistry(false, beeNodeMode.String()) + if o.MutexProfile { _ = runtime.SetMutexProfileFraction(1) } @@ -457,6 +461,7 @@ func NewBee( chainBackend, o.CORSAllowedOrigins, stamperStore, + metricsRegistery.MetricsRegistry(), ) apiService.Mount() @@ -606,12 +611,6 @@ func NewBee( } } - var registry *promc.Registry - - if apiService != nil { - registry = apiService.MetricsRegistry() - } - p2ps, err := libp2p.New(ctx, signer, networkID, swarmAddress, addr, addressbook, stateStore, lightNodes, logger, tracer, libp2p.Options{ PrivateKey: libp2pPrivateKey, NATAddr: o.NATAddr, @@ -620,7 +619,7 @@ func NewBee( FullNode: o.FullNodeMode, Nonce: nonce, ValidateOverlay: chainEnabled, - Registry: registry, + Registry: metricsRegistery.MetricsRegistry(), }) if err != nil { return nil, fmt.Errorf("p2p service: %w", err) @@ -1142,48 +1141,68 @@ func NewBee( if o.APIAddr != "" { // register metrics from components - apiService.MustRegisterMetrics(p2ps.Metrics()...) - apiService.MustRegisterMetrics(pingPong.Metrics()...) - apiService.MustRegisterMetrics(acc.Metrics()...) - apiService.MustRegisterMetrics(localStore.Metrics()...) - apiService.MustRegisterMetrics(kad.Metrics()...) - apiService.MustRegisterMetrics(saludService.Metrics()...) - apiService.MustRegisterMetrics(stateStoreMetrics.Metrics()...) + metricsRegistery.MustRegister(p2ps.Metrics()...) + metricsRegistery.MustRegister(pingPong.Metrics()...) + metricsRegistery.MustRegister(acc.Metrics()...) + metricsRegistery.MustRegister(localStore.Metrics()...) + metricsRegistery.MustRegister(kad.Metrics()...) + metricsRegistery.MustRegister(saludService.Metrics()...) + metricsRegistery.MustRegister(stateStoreMetrics.Metrics()...) if pullerService != nil { - apiService.MustRegisterMetrics(pullerService.Metrics()...) + metricsRegistery.MustRegister(pullerService.Metrics()...) } if agent != nil { - apiService.MustRegisterMetrics(agent.Metrics()...) + metricsRegistery.MustRegister(agent.Metrics()...) } - apiService.MustRegisterMetrics(pushSyncProtocol.Metrics()...) - apiService.MustRegisterMetrics(pusherService.Metrics()...) - apiService.MustRegisterMetrics(pullSyncProtocol.Metrics()...) - apiService.MustRegisterMetrics(retrieval.Metrics()...) - apiService.MustRegisterMetrics(lightNodes.Metrics()...) - apiService.MustRegisterMetrics(hive.Metrics()...) + metricsRegistery.MustRegister(pushSyncProtocol.Metrics()...) + metricsRegistery.MustRegister(pusherService.Metrics()...) + metricsRegistery.MustRegister(pullSyncProtocol.Metrics()...) + metricsRegistery.MustRegister(retrieval.Metrics()...) + metricsRegistery.MustRegister(lightNodes.Metrics()...) + metricsRegistery.MustRegister(hive.Metrics()...) + + // TODO: remove this when we have a push metrics opt in configuration + if true { + + // TODO: remove this when we have an include option in the configuration + include := []string{"pusher", "pushsync", "pullsync", "localstore"} + + for _, m := range include { + switch m { + case "pusher": + metricsRegistery.MustPushRegister(pusherService.Metrics()...) + case "pushsync": + metricsRegistery.MustPushRegister(pushSyncProtocol.Metrics()...) + case "pullsync": + metricsRegistery.MustPushRegister(pullSyncProtocol.Metrics()...) + case "localstore": + metricsRegistery.MustPushRegister(localStore.Metrics()...) + } + } + } if bs, ok := batchStore.(metrics.Collector); ok { - apiService.MustRegisterMetrics(bs.Metrics()...) + metricsRegistery.MustRegister(bs.Metrics()...) } if ls, ok := eventListener.(metrics.Collector); ok { - apiService.MustRegisterMetrics(ls.Metrics()...) + metricsRegistery.MustRegister(ls.Metrics()...) } if pssServiceMetrics, ok := pssService.(metrics.Collector); ok { - apiService.MustRegisterMetrics(pssServiceMetrics.Metrics()...) + metricsRegistery.MustRegister(pssServiceMetrics.Metrics()...) } if swapBackendMetrics, ok := chainBackend.(metrics.Collector); ok { - apiService.MustRegisterMetrics(swapBackendMetrics.Metrics()...) + metricsRegistery.MustRegister(swapBackendMetrics.Metrics()...) } - if l, ok := logger.(metrics.Collector); ok { - apiService.MustRegisterMetrics(l.Metrics()...) + metricsRegistery.MustRegister(l.Metrics()...) } - apiService.MustRegisterMetrics(pseudosettleService.Metrics()...) + + metricsRegistery.MustRegister(pseudosettleService.Metrics()...) if swapService != nil { - apiService.MustRegisterMetrics(swapService.Metrics()...) + metricsRegistery.MustRegister(swapService.Metrics()...) } apiService.Configure(signer, tracer, api.Options{