From ff195e04321b451085b2e72a678a4dba5b881df1 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Fri, 28 Feb 2025 02:34:40 +0300 Subject: [PATCH 1/7] feat: push metrics --- pkg/api/api.go | 3 +- pkg/api/api_test.go | 5 +- pkg/api/metrics.go | 32 ----------- pkg/metrics/registery/registery.go | 82 ++++++++++++++++++++++++++++ pkg/node/devnode.go | 3 +- pkg/node/node.go | 86 +++++++++++++++++++----------- 6 files changed, 144 insertions(+), 67 deletions(-) create mode 100644 pkg/metrics/registery/registery.go diff --git a/pkg/api/api.go b/pkg/api/api.go index 240637b3baf..7c5a9dfac6e 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -282,6 +282,7 @@ func New( chainBackend transaction.Backend, cors []string, stamperStore storage.Store, + registery *prometheus.Registry, ) *Service { s := new(Service) @@ -297,7 +298,7 @@ func New( s.transaction = transaction s.batchStore = batchStore s.chainBackend = chainBackend - s.metricsRegistry = newDebugMetrics() + s.metricsRegistry = registery s.preMapHooks = map[string]func(v string) (string, error){ "mimeMediaType": func(v string) (string, error) { typ, _, err := mime.ParseMediaType(v) diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 986bc663d3e..217adef9c3d 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -34,6 +34,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/gsoc" "github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/metrics/registery" p2pmock "github.com/ethersphere/bee/v2/pkg/p2p/mock" "github.com/ethersphere/bee/v2/pkg/pingpong" "github.com/ethersphere/bee/v2/pkg/postage" @@ -212,7 +213,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. o.BeeMode = api.FullMode } - s := api.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, []string{o.WhitelistedAddr}, o.Logger, transaction, o.BatchStore, o.BeeMode, !o.ChequebookDisabled, !o.SwapDisabled, backend, o.CORSAllowedOrigins, inmemstore.New()) + s := api.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, []string{o.WhitelistedAddr}, o.Logger, transaction, o.BatchStore, o.BeeMode, !o.ChequebookDisabled, !o.SwapDisabled, backend, o.CORSAllowedOrigins, inmemstore.New(), registery.NewRegistry(false).MetricsRegistry()) testutil.CleanupCloser(t, s) s.SetP2P(o.P2P) @@ -364,7 +365,7 @@ func TestParseName(t *testing.T) { pk, _ := crypto.GenerateSecp256k1Key() signer := crypto.NewDefaultSigner(pk) - s := api.New(pk.PublicKey, pk.PublicKey, common.Address{}, nil, log, nil, nil, 1, false, false, nil, []string{"*"}, inmemstore.New()) + s := api.New(pk.PublicKey, pk.PublicKey, common.Address{}, nil, log, nil, nil, 1, false, false, nil, []string{"*"}, inmemstore.New(), registery.NewRegistry(false).MetricsRegistry()) s.Configure(signer, nil, api.Options{}, api.ExtraOptions{Resolver: tC.res}, 1, nil) s.Mount() s.EnableFullAPI() diff --git a/pkg/api/metrics.go b/pkg/api/metrics.go index 7635468f4e3..656c17093f2 100644 --- a/pkg/api/metrics.go +++ b/pkg/api/metrics.go @@ -9,10 +9,8 @@ import ( "strconv" "time" - "github.com/ethersphere/bee/v2" m "github.com/ethersphere/bee/v2/pkg/metrics" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" ) const bytesInKB = 1000 @@ -134,33 +132,3 @@ func (rw *responseWriter) WriteHeader(code int) { rw.UpgradedResponseWriter.WriteHeader(code) rw.wroteHeader = true } - -func newDebugMetrics() (r *prometheus.Registry) { - r = prometheus.NewRegistry() - - // register standard metrics - r.MustRegister( - collectors.NewProcessCollector(collectors.ProcessCollectorOpts{ - Namespace: m.Namespace, - }), - collectors.NewGoCollector(), - prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: m.Namespace, - Name: "info", - Help: "Bee information.", - ConstLabels: prometheus.Labels{ - "version": bee.Version, - }, - }), - ) - - return r -} - -func (s *Service) MetricsRegistry() *prometheus.Registry { - return s.metricsRegistry -} - -func (s *Service) MustRegisterMetrics(cs ...prometheus.Collector) { - s.metricsRegistry.MustRegister(cs...) -} diff --git a/pkg/metrics/registery/registery.go b/pkg/metrics/registery/registery.go new file mode 100644 index 00000000000..916c91e3f51 --- /dev/null +++ b/pkg/metrics/registery/registery.go @@ -0,0 +1,82 @@ +package registery + +import ( + "context" + "time" + + "github.com/ethersphere/bee/v2" + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/push" +) + +type Registry struct { + register *prometheus.Registry + pushRegister *prometheus.Registry +} + +func NewRegistry(push bool) *Registry { + r := &Registry{ + register: prometheus.NewRegistry(), + pushRegister: prometheus.NewRegistry(), + } + + c := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{ + Namespace: metrics.Namespace, + }) + + g := collectors.NewGoCollector() + + v := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Name: "info", + Help: "Bee information.", + ConstLabels: prometheus.Labels{ + "version": bee.Version, + }, + }) + + r.MustRegister(c, g, v) + + if push { + r.MustPushRegister(c, g, v) + } + + return r +} + +func (r *Registry) MetricsRegistry() *prometheus.Registry { + return r.register +} + +func (r *Registry) PushWorker(ctx context.Context, path string, job string, logger log.Logger) func() error { + metricsPusher := push.New(path, job).Collector(r.pushRegister) + + ctx, cancel := context.WithCancel(ctx) + + go func() { + select { + case <-ctx.Done(): + return + case <-time.After(time.Minute): + if err := metricsPusher.Push(); err != nil { + logger.Debug("metrics push failed", "error", err) + } + } + }() + + return func() error { + cancel() + return metricsPusher.Push() + } +} + +func (r *Registry) MustRegister(cs ...prometheus.Collector) { + r.register.MustRegister(cs...) +} + +func (r *Registry) MustPushRegister(cs ...prometheus.Collector) { + r.pushRegister.MustRegister(cs...) +} diff --git a/pkg/node/devnode.go b/pkg/node/devnode.go index 9ff5363c455..55c933b518c 100644 --- a/pkg/node/devnode.go +++ b/pkg/node/devnode.go @@ -25,6 +25,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/feeds/factory" "github.com/ethersphere/bee/v2/pkg/gsoc" "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/metrics/registery" mockP2P "github.com/ethersphere/bee/v2/pkg/p2p/mock" mockPingPong "github.com/ethersphere/bee/v2/pkg/pingpong/mock" "github.com/ethersphere/bee/v2/pkg/postage" @@ -360,7 +361,7 @@ func NewDevBee(logger log.Logger, o *DevOptions) (b *DevBee, err error) { }), ) - apiService := api.New(mockKey.PublicKey, mockKey.PublicKey, overlayEthAddress, nil, logger, mockTransaction, batchStore, api.DevMode, true, true, chainBackend, o.CORSAllowedOrigins, inmemstore.New()) + apiService := api.New(mockKey.PublicKey, mockKey.PublicKey, overlayEthAddress, nil, logger, mockTransaction, batchStore, api.DevMode, true, true, chainBackend, o.CORSAllowedOrigins, inmemstore.New(), registery.NewRegistry(false).MetricsRegistry()) apiService.Configure(signer, tracer, api.Options{ CORSAllowedOrigins: o.CORSAllowedOrigins, diff --git a/pkg/node/node.go b/pkg/node/node.go index fe82347eb6e..92658f1e63c 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -20,6 +20,7 @@ import ( "net/http" "path/filepath" "runtime" + "slices" "sync" "sync/atomic" "time" @@ -36,6 +37,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/hive" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/metrics" + "github.com/ethersphere/bee/v2/pkg/metrics/registery" "github.com/ethersphere/bee/v2/pkg/p2p" "github.com/ethersphere/bee/v2/pkg/p2p/libp2p" "github.com/ethersphere/bee/v2/pkg/pingpong" @@ -77,7 +79,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/util/syncutil" "github.com/hashicorp/go-multierror" ma "github.com/multiformats/go-multiaddr" - promc "github.com/prometheus/client_golang/prometheus" "golang.org/x/crypto/sha3" "golang.org/x/sync/errgroup" ) @@ -429,8 +430,12 @@ func NewBee( b.stamperStoreCloser = stamperStore var apiService *api.Service + var metricsRegistery *registery.Registry if o.APIAddr != "" { + + metricsRegistery = registery.NewRegistry(false) + if o.MutexProfile { _ = runtime.SetMutexProfileFraction(1) } @@ -457,6 +462,7 @@ func NewBee( chainBackend, o.CORSAllowedOrigins, stamperStore, + metricsRegistery.MetricsRegistry(), ) apiService.Mount() @@ -606,12 +612,6 @@ func NewBee( } } - var registry *promc.Registry - - if apiService != nil { - registry = apiService.MetricsRegistry() - } - p2ps, err := libp2p.New(ctx, signer, networkID, swarmAddress, addr, addressbook, stateStore, lightNodes, logger, tracer, libp2p.Options{ PrivateKey: libp2pPrivateKey, NATAddr: o.NATAddr, @@ -620,7 +620,7 @@ func NewBee( FullNode: o.FullNodeMode, Nonce: nonce, ValidateOverlay: chainEnabled, - Registry: registry, + Registry: metricsRegistery.MetricsRegistry(), }) if err != nil { return nil, fmt.Errorf("p2p service: %w", err) @@ -1142,48 +1142,72 @@ func NewBee( if o.APIAddr != "" { // register metrics from components - apiService.MustRegisterMetrics(p2ps.Metrics()...) - apiService.MustRegisterMetrics(pingPong.Metrics()...) - apiService.MustRegisterMetrics(acc.Metrics()...) - apiService.MustRegisterMetrics(localStore.Metrics()...) - apiService.MustRegisterMetrics(kad.Metrics()...) - apiService.MustRegisterMetrics(saludService.Metrics()...) - apiService.MustRegisterMetrics(stateStoreMetrics.Metrics()...) + metricsRegistery.MustRegister(p2ps.Metrics()...) + metricsRegistery.MustRegister(pingPong.Metrics()...) + metricsRegistery.MustRegister(acc.Metrics()...) + metricsRegistery.MustRegister(localStore.Metrics()...) + metricsRegistery.MustRegister(kad.Metrics()...) + metricsRegistery.MustRegister(saludService.Metrics()...) + metricsRegistery.MustRegister(stateStoreMetrics.Metrics()...) if pullerService != nil { - apiService.MustRegisterMetrics(pullerService.Metrics()...) + metricsRegistery.MustRegister(pullerService.Metrics()...) } if agent != nil { - apiService.MustRegisterMetrics(agent.Metrics()...) + metricsRegistery.MustRegister(agent.Metrics()...) } - apiService.MustRegisterMetrics(pushSyncProtocol.Metrics()...) - apiService.MustRegisterMetrics(pusherService.Metrics()...) - apiService.MustRegisterMetrics(pullSyncProtocol.Metrics()...) - apiService.MustRegisterMetrics(retrieval.Metrics()...) - apiService.MustRegisterMetrics(lightNodes.Metrics()...) - apiService.MustRegisterMetrics(hive.Metrics()...) + metricsRegistery.MustRegister(pushSyncProtocol.Metrics()...) + metricsRegistery.MustRegister(pusherService.Metrics()...) + metricsRegistery.MustRegister(pullSyncProtocol.Metrics()...) + metricsRegistery.MustRegister(retrieval.Metrics()...) + metricsRegistery.MustRegister(lightNodes.Metrics()...) + metricsRegistery.MustRegister(hive.Metrics()...) + + // TODO: remove this when we have a push metrics opt in configuration + if true { + + // TODO: remove this when we have an exclude option in the configuration + exclude := []string{"pullsync"} + include := []string{"pusher", "pushsync", "pullsync", "localstore"} + + for _, m := range include { + if slices.Contains(exclude, m) { + continue + } + switch m { + case "pusher": + metricsRegistery.MustPushRegister(pusherService.Metrics()...) + case "pushsync": + metricsRegistery.MustPushRegister(pushSyncProtocol.Metrics()...) + case "pullsync": + metricsRegistery.MustPushRegister(pullSyncProtocol.Metrics()...) + case "localstore": + metricsRegistery.MustPushRegister(localStore.Metrics()...) + } + } + } if bs, ok := batchStore.(metrics.Collector); ok { - apiService.MustRegisterMetrics(bs.Metrics()...) + metricsRegistery.MustRegister(bs.Metrics()...) } if ls, ok := eventListener.(metrics.Collector); ok { - apiService.MustRegisterMetrics(ls.Metrics()...) + metricsRegistery.MustRegister(ls.Metrics()...) } if pssServiceMetrics, ok := pssService.(metrics.Collector); ok { - apiService.MustRegisterMetrics(pssServiceMetrics.Metrics()...) + metricsRegistery.MustRegister(pssServiceMetrics.Metrics()...) } if swapBackendMetrics, ok := chainBackend.(metrics.Collector); ok { - apiService.MustRegisterMetrics(swapBackendMetrics.Metrics()...) + metricsRegistery.MustRegister(swapBackendMetrics.Metrics()...) } - if l, ok := logger.(metrics.Collector); ok { - apiService.MustRegisterMetrics(l.Metrics()...) + metricsRegistery.MustRegister(l.Metrics()...) } - apiService.MustRegisterMetrics(pseudosettleService.Metrics()...) + + metricsRegistery.MustRegister(pseudosettleService.Metrics()...) if swapService != nil { - apiService.MustRegisterMetrics(swapService.Metrics()...) + metricsRegistery.MustRegister(swapService.Metrics()...) } apiService.Configure(signer, tracer, api.Options{ From 964b174bc9e2c1cccc4ac255fb1b138d2430014e Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Fri, 28 Feb 2025 02:36:15 +0300 Subject: [PATCH 2/7] fix: lint --- pkg/api/api.go | 4 ++-- pkg/api/api_test.go | 6 +++--- pkg/metrics/{registery => registry}/registery.go | 6 +++++- pkg/node/devnode.go | 4 ++-- pkg/node/node.go | 6 +++--- 5 files changed, 15 insertions(+), 11 deletions(-) rename pkg/metrics/{registery => registry}/registery.go (90%) diff --git a/pkg/api/api.go b/pkg/api/api.go index 7c5a9dfac6e..91c7a6bc8a5 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -282,7 +282,7 @@ func New( chainBackend transaction.Backend, cors []string, stamperStore storage.Store, - registery *prometheus.Registry, + registry *prometheus.Registry, ) *Service { s := new(Service) @@ -298,7 +298,7 @@ func New( s.transaction = transaction s.batchStore = batchStore s.chainBackend = chainBackend - s.metricsRegistry = registery + s.metricsRegistry = registry s.preMapHooks = map[string]func(v string) (string, error){ "mimeMediaType": func(v string) (string, error) { typ, _, err := mime.ParseMediaType(v) diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 217adef9c3d..75ecaa6ff0f 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -34,7 +34,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/gsoc" "github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/metrics/registery" + "github.com/ethersphere/bee/v2/pkg/metrics/registry" p2pmock "github.com/ethersphere/bee/v2/pkg/p2p/mock" "github.com/ethersphere/bee/v2/pkg/pingpong" "github.com/ethersphere/bee/v2/pkg/postage" @@ -213,7 +213,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. o.BeeMode = api.FullMode } - s := api.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, []string{o.WhitelistedAddr}, o.Logger, transaction, o.BatchStore, o.BeeMode, !o.ChequebookDisabled, !o.SwapDisabled, backend, o.CORSAllowedOrigins, inmemstore.New(), registery.NewRegistry(false).MetricsRegistry()) + s := api.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, []string{o.WhitelistedAddr}, o.Logger, transaction, o.BatchStore, o.BeeMode, !o.ChequebookDisabled, !o.SwapDisabled, backend, o.CORSAllowedOrigins, inmemstore.New(), registry.NewRegistry(false).MetricsRegistry()) testutil.CleanupCloser(t, s) s.SetP2P(o.P2P) @@ -365,7 +365,7 @@ func TestParseName(t *testing.T) { pk, _ := crypto.GenerateSecp256k1Key() signer := crypto.NewDefaultSigner(pk) - s := api.New(pk.PublicKey, pk.PublicKey, common.Address{}, nil, log, nil, nil, 1, false, false, nil, []string{"*"}, inmemstore.New(), registery.NewRegistry(false).MetricsRegistry()) + s := api.New(pk.PublicKey, pk.PublicKey, common.Address{}, nil, log, nil, nil, 1, false, false, nil, []string{"*"}, inmemstore.New(), registry.NewRegistry(false).MetricsRegistry()) s.Configure(signer, nil, api.Options{}, api.ExtraOptions{Resolver: tC.res}, 1, nil) s.Mount() s.EnableFullAPI() diff --git a/pkg/metrics/registery/registery.go b/pkg/metrics/registry/registery.go similarity index 90% rename from pkg/metrics/registery/registery.go rename to pkg/metrics/registry/registery.go index 916c91e3f51..954f67240b7 100644 --- a/pkg/metrics/registery/registery.go +++ b/pkg/metrics/registry/registery.go @@ -1,4 +1,8 @@ -package registery +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package registry import ( "context" diff --git a/pkg/node/devnode.go b/pkg/node/devnode.go index 55c933b518c..d033f46e9a8 100644 --- a/pkg/node/devnode.go +++ b/pkg/node/devnode.go @@ -25,7 +25,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/feeds/factory" "github.com/ethersphere/bee/v2/pkg/gsoc" "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/metrics/registery" + "github.com/ethersphere/bee/v2/pkg/metrics/registry" mockP2P "github.com/ethersphere/bee/v2/pkg/p2p/mock" mockPingPong "github.com/ethersphere/bee/v2/pkg/pingpong/mock" "github.com/ethersphere/bee/v2/pkg/postage" @@ -361,7 +361,7 @@ func NewDevBee(logger log.Logger, o *DevOptions) (b *DevBee, err error) { }), ) - apiService := api.New(mockKey.PublicKey, mockKey.PublicKey, overlayEthAddress, nil, logger, mockTransaction, batchStore, api.DevMode, true, true, chainBackend, o.CORSAllowedOrigins, inmemstore.New(), registery.NewRegistry(false).MetricsRegistry()) + apiService := api.New(mockKey.PublicKey, mockKey.PublicKey, overlayEthAddress, nil, logger, mockTransaction, batchStore, api.DevMode, true, true, chainBackend, o.CORSAllowedOrigins, inmemstore.New(), registry.NewRegistry(false).MetricsRegistry()) apiService.Configure(signer, tracer, api.Options{ CORSAllowedOrigins: o.CORSAllowedOrigins, diff --git a/pkg/node/node.go b/pkg/node/node.go index 92658f1e63c..5750d4bfe03 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -37,7 +37,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/hive" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/metrics" - "github.com/ethersphere/bee/v2/pkg/metrics/registery" + "github.com/ethersphere/bee/v2/pkg/metrics/registry" "github.com/ethersphere/bee/v2/pkg/p2p" "github.com/ethersphere/bee/v2/pkg/p2p/libp2p" "github.com/ethersphere/bee/v2/pkg/pingpong" @@ -430,11 +430,11 @@ func NewBee( b.stamperStoreCloser = stamperStore var apiService *api.Service - var metricsRegistery *registery.Registry + var metricsRegistery *registry.Registry if o.APIAddr != "" { - metricsRegistery = registery.NewRegistry(false) + metricsRegistery = registry.NewRegistry(false) if o.MutexProfile { _ = runtime.SetMutexProfileFraction(1) From 1544c3e6c3f930ab260435b021b74e42e4a69af8 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Fri, 28 Feb 2025 02:41:13 +0300 Subject: [PATCH 3/7] fix: include only --- pkg/node/node.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 5750d4bfe03..131da7e0b8e 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -20,7 +20,6 @@ import ( "net/http" "path/filepath" "runtime" - "slices" "sync" "sync/atomic" "time" @@ -1168,14 +1167,10 @@ func NewBee( // TODO: remove this when we have a push metrics opt in configuration if true { - // TODO: remove this when we have an exclude option in the configuration - exclude := []string{"pullsync"} + // TODO: remove this when we have an include option in the configuration include := []string{"pusher", "pushsync", "pullsync", "localstore"} for _, m := range include { - if slices.Contains(exclude, m) { - continue - } switch m { case "pusher": metricsRegistery.MustPushRegister(pusherService.Metrics()...) From 87e98725f83f71702c87ac55ea5e75f1f1036492 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 3 Mar 2025 17:58:19 +0300 Subject: [PATCH 4/7] fix: bee mode --- pkg/metrics/registry/registery.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/metrics/registry/registery.go b/pkg/metrics/registry/registery.go index 954f67240b7..96d5f2c006f 100644 --- a/pkg/metrics/registry/registery.go +++ b/pkg/metrics/registry/registery.go @@ -39,6 +39,7 @@ func NewRegistry(push bool) *Registry { Help: "Bee information.", ConstLabels: prometheus.Labels{ "version": bee.Version, + "mode": "full" | "light" | "ultralight", }, }) From bda61ec2da7382078fd3194abf68a444087ba2c1 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Mon, 10 Mar 2025 20:16:21 +0200 Subject: [PATCH 5/7] feat: added bee mode also cpu and mem usage metrics --- pkg/api/api_test.go | 4 +- pkg/metrics/registry/registery.go | 87 -------------------- pkg/metrics/registry/registry.go | 130 ++++++++++++++++++++++++++++++ pkg/node/devnode.go | 2 +- pkg/node/node.go | 2 +- 5 files changed, 134 insertions(+), 91 deletions(-) delete mode 100644 pkg/metrics/registry/registery.go create mode 100644 pkg/metrics/registry/registry.go diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 75ecaa6ff0f..e4c409a9f2c 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -213,7 +213,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. o.BeeMode = api.FullMode } - s := api.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, []string{o.WhitelistedAddr}, o.Logger, transaction, o.BatchStore, o.BeeMode, !o.ChequebookDisabled, !o.SwapDisabled, backend, o.CORSAllowedOrigins, inmemstore.New(), registry.NewRegistry(false).MetricsRegistry()) + s := api.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, []string{o.WhitelistedAddr}, o.Logger, transaction, o.BatchStore, o.BeeMode, !o.ChequebookDisabled, !o.SwapDisabled, backend, o.CORSAllowedOrigins, inmemstore.New(), registry.NewRegistry(false, o.BeeMode.String()).MetricsRegistry()) testutil.CleanupCloser(t, s) s.SetP2P(o.P2P) @@ -365,7 +365,7 @@ func TestParseName(t *testing.T) { pk, _ := crypto.GenerateSecp256k1Key() signer := crypto.NewDefaultSigner(pk) - s := api.New(pk.PublicKey, pk.PublicKey, common.Address{}, nil, log, nil, nil, 1, false, false, nil, []string{"*"}, inmemstore.New(), registry.NewRegistry(false).MetricsRegistry()) + s := api.New(pk.PublicKey, pk.PublicKey, common.Address{}, nil, log, nil, nil, 1, false, false, nil, []string{"*"}, inmemstore.New(), registry.NewRegistry(false, api.FullMode.String()).MetricsRegistry()) s.Configure(signer, nil, api.Options{}, api.ExtraOptions{Resolver: tC.res}, 1, nil) s.Mount() s.EnableFullAPI() diff --git a/pkg/metrics/registry/registery.go b/pkg/metrics/registry/registery.go deleted file mode 100644 index 96d5f2c006f..00000000000 --- a/pkg/metrics/registry/registery.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2025 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package registry - -import ( - "context" - "time" - - "github.com/ethersphere/bee/v2" - "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/metrics" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/push" -) - -type Registry struct { - register *prometheus.Registry - pushRegister *prometheus.Registry -} - -func NewRegistry(push bool) *Registry { - r := &Registry{ - register: prometheus.NewRegistry(), - pushRegister: prometheus.NewRegistry(), - } - - c := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{ - Namespace: metrics.Namespace, - }) - - g := collectors.NewGoCollector() - - v := prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: metrics.Namespace, - Name: "info", - Help: "Bee information.", - ConstLabels: prometheus.Labels{ - "version": bee.Version, - "mode": "full" | "light" | "ultralight", - }, - }) - - r.MustRegister(c, g, v) - - if push { - r.MustPushRegister(c, g, v) - } - - return r -} - -func (r *Registry) MetricsRegistry() *prometheus.Registry { - return r.register -} - -func (r *Registry) PushWorker(ctx context.Context, path string, job string, logger log.Logger) func() error { - metricsPusher := push.New(path, job).Collector(r.pushRegister) - - ctx, cancel := context.WithCancel(ctx) - - go func() { - select { - case <-ctx.Done(): - return - case <-time.After(time.Minute): - if err := metricsPusher.Push(); err != nil { - logger.Debug("metrics push failed", "error", err) - } - } - }() - - return func() error { - cancel() - return metricsPusher.Push() - } -} - -func (r *Registry) MustRegister(cs ...prometheus.Collector) { - r.register.MustRegister(cs...) -} - -func (r *Registry) MustPushRegister(cs ...prometheus.Collector) { - r.pushRegister.MustRegister(cs...) -} diff --git a/pkg/metrics/registry/registry.go b/pkg/metrics/registry/registry.go new file mode 100644 index 00000000000..9751a82cdcc --- /dev/null +++ b/pkg/metrics/registry/registry.go @@ -0,0 +1,130 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package registry + +import ( + "context" + "time" + + "github.com/ethersphere/bee/v2" + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/push" + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/mem" +) + +type Registry struct { + register *prometheus.Registry + pushRegister *prometheus.Registry + cpuGauge prometheus.Gauge + memGauge prometheus.Gauge +} + +func NewRegistry(push bool, mode string) *Registry { + r := &Registry{ + register: prometheus.NewRegistry(), + pushRegister: prometheus.NewRegistry(), + } + + c := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{ + Namespace: metrics.Namespace, + }) + + g := collectors.NewGoCollector() + + // Create CPU and memory gauge metrics + cpuGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Name: "system_cpu_usage_percent", + Help: "System CPU usage percentage", + }) + + memGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Name: "system_memory_usage_percent", + Help: "System memory usage percentage", + }) + + v := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Name: "info", + Help: "Bee information.", + ConstLabels: prometheus.Labels{ + "version": bee.Version, + "mode": mode, + }, + }) + + r.MustRegister(c, g, v, cpuGauge, memGauge) + + if push { + r.MustPushRegister(c, g, v, cpuGauge, memGauge) + } + + // Store references to gauges for updating in PushWorker + r.cpuGauge = cpuGauge + r.memGauge = memGauge + + return r +} + +func (r *Registry) MetricsRegistry() *prometheus.Registry { + return r.register +} + +func (r *Registry) PushWorker(ctx context.Context, path string, job string, logger log.Logger) func() error { + metricsPusher := push.New(path, job).Collector(r.pushRegister) + + ctx, cancel := context.WithCancel(ctx) + + go func() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Collect CPU metrics + percentages, err := cpu.Percent(0, false) + if err == nil && len(percentages) > 0 { + r.cpuGauge.Set(percentages[0]) + } else if err != nil { + logger.Debug("failed to collect CPU metrics", "error", err) + } + + // Collect memory metrics + vm, err := mem.VirtualMemory() + if err == nil { + r.memGauge.Set(vm.UsedPercent) + } else { + logger.Debug("failed to collect memory metrics", "error", err) + } + + // Push metrics + if err := metricsPusher.Push(); err != nil { + logger.Debug("metrics push failed", "error", err) + } + } + } + }() + + return func() error { + cancel() + return metricsPusher.Push() + } +} + +func (r *Registry) MustRegister(cs ...prometheus.Collector) { + r.register.MustRegister(cs...) +} + +func (r *Registry) MustPushRegister(cs ...prometheus.Collector) { + r.pushRegister.MustRegister(cs...) +} diff --git a/pkg/node/devnode.go b/pkg/node/devnode.go index d033f46e9a8..e2934251fab 100644 --- a/pkg/node/devnode.go +++ b/pkg/node/devnode.go @@ -361,7 +361,7 @@ func NewDevBee(logger log.Logger, o *DevOptions) (b *DevBee, err error) { }), ) - apiService := api.New(mockKey.PublicKey, mockKey.PublicKey, overlayEthAddress, nil, logger, mockTransaction, batchStore, api.DevMode, true, true, chainBackend, o.CORSAllowedOrigins, inmemstore.New(), registry.NewRegistry(false).MetricsRegistry()) + apiService := api.New(mockKey.PublicKey, mockKey.PublicKey, overlayEthAddress, nil, logger, mockTransaction, batchStore, api.DevMode, true, true, chainBackend, o.CORSAllowedOrigins, inmemstore.New(), registry.NewRegistry(false, api.DevMode.String()).MetricsRegistry()) apiService.Configure(signer, tracer, api.Options{ CORSAllowedOrigins: o.CORSAllowedOrigins, diff --git a/pkg/node/node.go b/pkg/node/node.go index 131da7e0b8e..1ae3db41abd 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -433,7 +433,7 @@ func NewBee( if o.APIAddr != "" { - metricsRegistery = registry.NewRegistry(false) + metricsRegistery = registry.NewRegistry(false, beeNodeMode.String()) if o.MutexProfile { _ = runtime.SetMutexProfileFraction(1) From ded4c3247fd21c79eba0aa6093e5bd8f871c9f17 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Mon, 10 Mar 2025 20:20:47 +0200 Subject: [PATCH 6/7] fix: lint issue --- pkg/metrics/registry/registry.go | 62 ++++++++++++++++---------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/pkg/metrics/registry/registry.go b/pkg/metrics/registry/registry.go index 9751a82cdcc..c3b39ab9e2e 100644 --- a/pkg/metrics/registry/registry.go +++ b/pkg/metrics/registry/registry.go @@ -83,37 +83,37 @@ func (r *Registry) PushWorker(ctx context.Context, path string, job string, logg ctx, cancel := context.WithCancel(ctx) go func() { - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - // Collect CPU metrics - percentages, err := cpu.Percent(0, false) - if err == nil && len(percentages) > 0 { - r.cpuGauge.Set(percentages[0]) - } else if err != nil { - logger.Debug("failed to collect CPU metrics", "error", err) - } - - // Collect memory metrics - vm, err := mem.VirtualMemory() - if err == nil { - r.memGauge.Set(vm.UsedPercent) - } else { - logger.Debug("failed to collect memory metrics", "error", err) - } - - // Push metrics - if err := metricsPusher.Push(); err != nil { - logger.Debug("metrics push failed", "error", err) - } - } - } - }() + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Collect CPU metrics + percentages, err := cpu.Percent(0, false) + if err == nil && len(percentages) > 0 { + r.cpuGauge.Set(percentages[0]) + } else if err != nil { + logger.Debug("failed to collect CPU metrics", "error", err) + } + + // Collect memory metrics + vm, err := mem.VirtualMemory() + if err == nil { + r.memGauge.Set(vm.UsedPercent) + } else { + logger.Debug("failed to collect memory metrics", "error", err) + } + + // Push metrics + if err := metricsPusher.Push(); err != nil { + logger.Debug("metrics push failed", "error", err) + } + } + } + }() return func() error { cancel() From c9a55f9c467189873bda92f13bd773ca47bd4089 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Mon, 10 Mar 2025 20:26:02 +0200 Subject: [PATCH 7/7] fix: go mod tidy check --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 6d118116eab..2f1f708d9ab 100644 --- a/go.mod +++ b/go.mod @@ -146,7 +146,7 @@ require ( github.com/quic-go/quic-go v0.42.0 // indirect github.com/quic-go/webtransport-go v0.6.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect - github.com/shirou/gopsutil v3.21.5+incompatible // indirect + github.com/shirou/gopsutil v3.21.5+incompatible github.com/smartystreets/assertions v1.1.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cast v1.3.0 // indirect