Skip to content
Merged
34 changes: 17 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,30 @@ require (
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-multistream v0.5.0
github.com/opentracing/opentracing-go v1.2.0
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_golang v1.21.1
github.com/spf13/afero v1.6.0
github.com/spf13/cobra v1.5.0
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.10.0
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/uber/jaeger-client-go v2.24.0+incompatible
github.com/vmihailenco/msgpack/v5 v5.3.4
github.com/wealdtech/go-ens/v3 v3.5.1
gitlab.com/nolash/go-mockbytes v0.0.7
go.uber.org/atomic v1.11.0
go.uber.org/goleak v1.3.0
golang.org/x/crypto v0.23.0
golang.org/x/net v0.25.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.20.0
golang.org/x/term v0.20.0
golang.org/x/crypto v0.31.0
golang.org/x/net v0.33.0
golang.org/x/sync v0.10.0
golang.org/x/sys v0.28.0
golang.org/x/term v0.27.0
golang.org/x/time v0.5.0
gopkg.in/yaml.v2 v2.4.0
resenje.org/multex v0.1.0
resenje.org/singleflight v0.4.0
resenje.org/web v0.4.3
)

require golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect

require (
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
Expand All @@ -66,7 +64,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/codahale/hdrhistogram v0.0.0-00010101000000-000000000000 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
Expand Down Expand Up @@ -104,7 +102,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/klauspost/compress v1.17.6 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
Expand Down Expand Up @@ -132,16 +130,17 @@ require (
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo/v2 v2.15.0 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pelletier/go-toml v1.8.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.47.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0
github.com/prometheus/procfs v0.15.1 // indirect
github.com/prometheus/statsd_exporter v0.22.7 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/quic-go v0.42.0 // indirect
Expand All @@ -166,10 +165,11 @@ require (
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/protobuf v1.36.1 // indirect
gopkg.in/ini.v1 v1.57.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
Expand Down
62 changes: 32 additions & 30 deletions go.sum

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,49 @@ func (s *Service) contentLengthMetricMiddleware() func(h http.Handler) http.Hand
}
}

func (s *Service) downloadSpeedMetricMiddleware(endpoint string) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
h.ServeHTTP(w, r)

rw, ok := w.(*responseWriter)
if !ok {
return
}
if rw.Status() != http.StatusOK {
return
}

speed := float64(rw.size) / time.Since(start).Seconds()
s.metrics.DownloadSpeed.WithLabelValues(endpoint).Observe(speed)
})
}
}

// observeUploadSpeed measures the speed of the upload and sets appropriate
// labels to the metrics. This function can be called as a deferred function in
// side of handler. This functions is not in a form of a middleware to more
// directly pass the deferred flag.
func (s *Service) observeUploadSpeed(w http.ResponseWriter, r *http.Request, start time.Time, endpoint string, deferred bool) {
rw, ok := w.(*responseWriter)
if !ok {
return
}

if rw.Status() != http.StatusOK && rw.Status() != http.StatusCreated {
return
}

mode := "direct"
if deferred {
mode = "deferred"
}

speed := float64(r.ContentLength) / time.Since(start).Seconds()
s.metrics.UploadSpeed.WithLabelValues(endpoint, mode).Observe(speed)
}

// gasConfigMiddleware can be used by the APIs that allow block chain transactions to set
// gas price and gas limit through the HTTP API headers.
func (s *Service) gasConfigMiddleware(handlerName string) func(h http.Handler) http.Handler {
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"net/http"
"strconv"
"time"

"github.com/ethersphere/bee/v2/pkg/accesscontrol"
"github.com/ethersphere/bee/v2/pkg/cac"
Expand Down Expand Up @@ -71,6 +72,8 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
span.SetTag("tagID", tag)
}

defer s.observeUploadSpeed(w, r, time.Now(), "bytes", deferred)

putter, err := s.newStamperPutter(ctx, putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
deferred = defaultUploadMethod(headers.Deferred)
)

defer s.observeUploadSpeed(w, r, time.Now(), "bzz", deferred)

if deferred || headers.Pin {
tag, err = s.getOrCreateSessionID(headers.SwarmTag)
if err != nil {
Expand Down
33 changes: 32 additions & 1 deletion pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type metrics struct {
ResponseCodeCounts *prometheus.CounterVec

ContentApiDuration prometheus.HistogramVec
UploadSpeed *prometheus.HistogramVec
DownloadSpeed *prometheus.HistogramVec
}

func newMetrics() metrics {
Expand Down Expand Up @@ -64,6 +66,20 @@ func newMetrics() metrics {
Help: "Histogram of file upload API response durations.",
Buckets: []float64{0.5, 1, 2.5, 5, 10, 30, 60},
}, []string{"filesize", "method"}),
UploadSpeed: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "upload_speed",
Help: "Histogram of upload speed in B/s.",
Buckets: []float64{0.25, 0.5, 0.75, 1, 1.25, 1.5, 1.75, 2, 2.5, 3, 4, 5},
}, []string{"endpoint", "mode"}),
DownloadSpeed: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "download_speed",
Help: "Histogram of download speed in B/s.",
Buckets: []float64{0.5, 1, 1.5, 2, 2.5, 3, 4, 5, 6, 7, 8, 9},
}, []string{"endpoint"}),
}
}

Expand All @@ -82,6 +98,14 @@ func (s *Service) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}

// StatusMetrics exposes metrics that are exposed on the status protocol.
func (s *Service) StatusMetrics() []prometheus.Collector {
return []prometheus.Collector{
s.metrics.UploadSpeed,
s.metrics.DownloadSpeed,
}
}

func (s *Service) pageviewMetricsHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
Expand Down Expand Up @@ -114,18 +138,25 @@ type responseWriter struct {
UpgradedResponseWriter
statusCode int
wroteHeader bool
size int
}

func newResponseWriter(w http.ResponseWriter) *responseWriter {
// StatusOK is called by default if nothing else is called
uw := w.(UpgradedResponseWriter)
return &responseWriter{uw, http.StatusOK, false}
return &responseWriter{uw, http.StatusOK, false, 0}
}

func (rw *responseWriter) Status() int {
return rw.statusCode
}

func (rr *responseWriter) Write(b []byte) (int, error) {
size, err := rr.UpgradedResponseWriter.Write(b)
rr.size += size
return size, err
}

func (rw *responseWriter) WriteHeader(code int) {
if rw.wroteHeader {
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (s *Service) mountAPI() {
handle("/bytes/{address}", jsonhttp.MethodHandler{
"GET": web.ChainHandlers(
s.contentLengthMetricMiddleware(),
s.downloadSpeedMetricMiddleware("bytes"),
s.newTracingHandler("bytes-download"),
s.actDecryptionHandler(),
web.FinalHandlerFunc(s.bytesGetHandler),
Expand Down Expand Up @@ -325,6 +326,7 @@ func (s *Service) mountAPI() {
s.contentLengthMetricMiddleware(),
s.newTracingHandler("bzz-download"),
s.actDecryptionHandler(),
s.downloadSpeedMetricMiddleware("bzz"),
web.FinalHandlerFunc(s.bzzDownloadHandler),
),
"HEAD": web.ChainHandlers(
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestGetStatus(t *testing.T) {
mode.String(),
ssMock,
ssMock,
nil,
)

statusSvc.SetSync(ssMock)
Expand All @@ -86,6 +87,7 @@ func TestGetStatus(t *testing.T) {
"",
nil,
nil,
nil,
),
})

Expand Down
17 changes: 16 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/util/syncutil"
"github.com/hashicorp/go-multierror"
ma "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
promc "github.com/prometheus/client_golang/prometheus"
"golang.org/x/crypto/sha3"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -910,7 +911,16 @@ func NewBee(

validStamp := postage.ValidStamp(batchStore)

nodeStatus := status.NewService(logger, p2ps, kad, beeNodeMode.String(), batchStore, localStore)
// metrics exposed on the status protocol
statusMetricsRegistry := prometheus.NewRegistry()
if localStore != nil {
statusMetricsRegistry.MustRegister(localStore.StatusMetrics()...)
}
if p2ps != nil {
statusMetricsRegistry.MustRegister(p2ps.StatusMetrics()...)
}

nodeStatus := status.NewService(logger, p2ps, kad, beeNodeMode.String(), batchStore, localStore, statusMetricsRegistry)
if err = p2ps.AddProtocol(nodeStatus.Protocol()); err != nil {
return nil, fmt.Errorf("status service: %w", err)
}
Expand Down Expand Up @@ -968,6 +978,8 @@ func NewBee(
retrieval := retrieval.New(swarmAddress, waitNetworkRFunc, localStore, p2ps, kad, logger, acc, pricer, tracer, o.RetrievalCaching)
localStore.SetRetrievalService(retrieval)

statusMetricsRegistry.MustRegister(retrieval.StatusMetrics()...)

pusherService := pusher.New(networkID, localStore, pushSyncProtocol, batchStore, logger, warmupTime, pusher.DefaultRetryCount)
b.pusherCloser = pusherService

Expand Down Expand Up @@ -1196,6 +1208,9 @@ func NewBee(
apiService.EnableFullAPI()

apiService.SetRedistributionAgent(agent)

// api metrics are constructed on api.Service.Configure
statusMetricsRegistry.MustRegister(apiService.StatusMetrics()...)
}

if err := kad.Start(ctx); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,6 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
}

s.host.SetStreamHandlerMatch(id, matcher, func(streamlibp2p network.Stream) {
start := time.Now()
peerID := streamlibp2p.Conn().RemotePeer()
overlay, found := s.peers.overlay(peerID)
if !found {
Expand All @@ -581,14 +580,15 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
stream := newStream(streamlibp2p, s.metrics)

// exchange headers
headersStartTime := time.Now()
ctx, cancel := context.WithTimeout(s.ctx, s.HeadersRWTimeout)
defer cancel()
if err := handleHeaders(ctx, ss.Headler, stream, overlay); err != nil {
s.logger.Debug("handle protocol: handle headers failed", "protocol", p.Name, "version", p.Version, "stream", ss.Name, "peer", overlay, "error", err)
_ = stream.Reset()
return
}
s.metrics.HeadersExchangeDuration.Observe(time.Since(start).Seconds())
s.metrics.HeadersExchangeDuration.Observe(time.Since(headersStartTime).Seconds())

ctx, cancel = context.WithCancel(s.ctx)

Expand Down
7 changes: 7 additions & 0 deletions pkg/p2p/libp2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,10 @@ func newMetrics() metrics {
func (s *Service) Metrics() []prometheus.Collector {
return append(m.PrometheusCollectorsFromFields(s.metrics), s.handshakeService.Metrics()...)
}

// StatusMetrics exposes metrics that are exposed on the status protocol.
func (s *Service) StatusMetrics() []prometheus.Collector {
return []prometheus.Collector{
s.metrics.HeadersExchangeDuration,
}
}
9 changes: 9 additions & 0 deletions pkg/retrieval/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,12 @@ func newMetrics() metrics {
func (s *Service) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}

// StatusMetrics exposes metrics that are exposed on the status protocol.
func (s *Service) StatusMetrics() []prometheus.Collector {
return []prometheus.Collector{
s.metrics.RequestAttempts,
s.metrics.ChunkRetrieveTime,
s.metrics.RequestDurationTime,
}
}
Loading
Loading