Skip to content
Merged
34 changes: 17 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,30 @@ require (
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-multistream v0.5.0
github.com/opentracing/opentracing-go v1.2.0
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_golang v1.21.1
github.com/spf13/afero v1.6.0
github.com/spf13/cobra v1.5.0
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.10.0
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/uber/jaeger-client-go v2.24.0+incompatible
github.com/vmihailenco/msgpack/v5 v5.3.4
github.com/wealdtech/go-ens/v3 v3.5.1
gitlab.com/nolash/go-mockbytes v0.0.7
go.uber.org/atomic v1.11.0
go.uber.org/goleak v1.3.0
golang.org/x/crypto v0.23.0
golang.org/x/net v0.25.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.20.0
golang.org/x/term v0.20.0
golang.org/x/crypto v0.31.0
golang.org/x/net v0.33.0
golang.org/x/sync v0.10.0
golang.org/x/sys v0.28.0
golang.org/x/term v0.27.0
golang.org/x/time v0.5.0
gopkg.in/yaml.v2 v2.4.0
resenje.org/multex v0.1.0
resenje.org/singleflight v0.4.0
resenje.org/web v0.4.3
)

require golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect

require (
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
Expand All @@ -66,7 +64,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/codahale/hdrhistogram v0.0.0-00010101000000-000000000000 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
Expand Down Expand Up @@ -104,7 +102,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/klauspost/compress v1.17.6 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
Expand Down Expand Up @@ -132,16 +130,17 @@ require (
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo/v2 v2.15.0 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pelletier/go-toml v1.8.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.47.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0
github.com/prometheus/procfs v0.15.1 // indirect
github.com/prometheus/statsd_exporter v0.22.7 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/quic-go v0.42.0 // indirect
Expand All @@ -166,10 +165,11 @@ require (
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/protobuf v1.36.1 // indirect
gopkg.in/ini.v1 v1.57.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
Expand Down
62 changes: 32 additions & 30 deletions go.sum

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,26 @@ func (s *Service) contentLengthMetricMiddleware() func(h http.Handler) http.Hand
}
}

func (s *Service) downloadSpeedMetricMiddleware() func(h http.Handler) http.Handler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could add a string argument to the method, which would serve as a label for metrics. This way, we can easily distinguish between 'bzz' and 'bytes' in the endpoint data, because the download speed depends on the logic behind the handlers, and the logic in 'bzz' is more complex than in 'bytes'.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice idea, thanks.

return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
h.ServeHTTP(w, r)

rw, ok := w.(*responseWriter)
if !ok {
return
}
if rw.Status() != http.StatusOK {
return
}

speed := float64(rw.size) / time.Since(start).Seconds()
s.metrics.DownloadSpeed.Observe(speed)
})
}
}

// gasConfigMiddleware can be used by the APIs that allow block chain transactions to set
// gas price and gas limit through the HTTP API headers.
func (s *Service) gasConfigMiddleware(handlerName string) func(h http.Handler) http.Handler {
Expand Down
19 changes: 19 additions & 0 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,25 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
deferred = defaultUploadMethod(headers.Deferred)
)

defer func(start time.Time) {
rw, ok := w.(*responseWriter)
if !ok {
return
}

if rw.Status() != http.StatusOK && rw.Status() != http.StatusCreated {
return
}

mode := "direct"
if deferred {
mode = "deferred"
}

speed := float64(r.ContentLength) / time.Since(start).Seconds()
s.metrics.UploadSpeed.WithLabelValues(mode).Observe(speed)
}(time.Now())

if deferred || headers.Pin {
tag, err = s.getOrCreateSessionID(headers.SwarmTag)
if err != nil {
Expand Down
33 changes: 32 additions & 1 deletion pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type metrics struct {
ResponseCodeCounts *prometheus.CounterVec

ContentApiDuration prometheus.HistogramVec
UploadSpeed *prometheus.HistogramVec
DownloadSpeed prometheus.Histogram
}

func newMetrics() metrics {
Expand Down Expand Up @@ -64,6 +66,20 @@ func newMetrics() metrics {
Help: "Histogram of file upload API response durations.",
Buckets: []float64{0.5, 1, 2.5, 5, 10, 30, 60},
}, []string{"filesize", "method"}),
UploadSpeed: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "upload_speed",
Help: "Histogram of upload speed in B/s.",
Buckets: []float64{0.25, 0.5, 0.75, 1, 1.25, 1.5, 1.75, 2, 2.5, 3, 4, 5},
}, []string{"mode"}),
DownloadSpeed: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "download_speed",
Help: "Histogram of download speed in B/s.",
Buckets: []float64{0.5, 1, 1.5, 2, 2.5, 3, 4, 5, 6, 7, 8, 9},
}),
}
}

Expand All @@ -82,6 +98,14 @@ func (s *Service) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}

// StatusMetrics exposes metrics that are exposed on the status protocol.
func (s *Service) StatusMetrics() []prometheus.Collector {
return []prometheus.Collector{
s.metrics.UploadSpeed,
s.metrics.DownloadSpeed,
}
}

func (s *Service) pageviewMetricsHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
Expand Down Expand Up @@ -114,18 +138,25 @@ type responseWriter struct {
UpgradedResponseWriter
statusCode int
wroteHeader bool
size int
}

func newResponseWriter(w http.ResponseWriter) *responseWriter {
// StatusOK is called by default if nothing else is called
uw := w.(UpgradedResponseWriter)
return &responseWriter{uw, http.StatusOK, false}
return &responseWriter{uw, http.StatusOK, false, 0}
}

func (rw *responseWriter) Status() int {
return rw.statusCode
}

func (rr *responseWriter) Write(b []byte) (int, error) {
size, err := rr.UpgradedResponseWriter.Write(b)
rr.size += size
return size, err
}

func (rw *responseWriter) WriteHeader(code int) {
if rw.wroteHeader {
return
Expand Down
1 change: 1 addition & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (s *Service) mountAPI() {
s.contentLengthMetricMiddleware(),
s.newTracingHandler("bzz-download"),
s.actDecryptionHandler(),
s.downloadSpeedMetricMiddleware(),
web.FinalHandlerFunc(s.bzzDownloadHandler),
),
"HEAD": web.ChainHandlers(
Expand Down
33 changes: 18 additions & 15 deletions pkg/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@ import (
)

type statusSnapshotResponse struct {
Overlay string `json:"overlay"`
Proximity uint `json:"proximity"`
BeeMode string `json:"beeMode"`
ReserveSize uint64 `json:"reserveSize"`
ReserveSizeWithinRadius uint64 `json:"reserveSizeWithinRadius"`
PullsyncRate float64 `json:"pullsyncRate"`
StorageRadius uint8 `json:"storageRadius"`
ConnectedPeers uint64 `json:"connectedPeers"`
NeighborhoodSize uint64 `json:"neighborhoodSize"`
RequestFailed bool `json:"requestFailed,omitempty"`
BatchCommitment uint64 `json:"batchCommitment"`
IsReachable bool `json:"isReachable"`
LastSyncedBlock uint64 `json:"lastSyncedBlock"`
CommittedDepth uint8 `json:"committedDepth"`
IsWarmingUp bool `json:"isWarmingUp"`
Overlay string `json:"overlay"`
Proximity uint `json:"proximity"`
BeeMode string `json:"beeMode"`
ReserveSize uint64 `json:"reserveSize"`
ReserveSizeWithinRadius uint64 `json:"reserveSizeWithinRadius"`
PullsyncRate float64 `json:"pullsyncRate"`
StorageRadius uint8 `json:"storageRadius"`
ConnectedPeers uint64 `json:"connectedPeers"`
NeighborhoodSize uint64 `json:"neighborhoodSize"`
RequestFailed bool `json:"requestFailed,omitempty"`
BatchCommitment uint64 `json:"batchCommitment"`
IsReachable bool `json:"isReachable"`
LastSyncedBlock uint64 `json:"lastSyncedBlock"`
CommittedDepth uint8 `json:"committedDepth"`
IsWarmingUp bool `json:"isWarmingUp"`
Metrics map[string]string `json:"metrics,omitempty"`
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe metrics should not be exposed through the API? The response is cluttered with encoded metrics data that users can see for their own node on /metrics, and for peer nodes, maybe these metrics are not so relevant. Should we remove this field here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you remove them here, what would be the options then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status protocol will still exchange these metrics, but they will not be exposed here, only to be gathered over p2p network, which is sufficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status protocol will still exchange these metrics, but they will not be exposed here, only to be gathered over p2p network, which is sufficient.

I like this approach

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree. If we really need it, we can expose additional endpoint just for this case. But I don’t see a real reason to do so.

}

type statusResponse struct {
Expand Down Expand Up @@ -98,6 +99,7 @@ func (s *Service) statusGetHandler(w http.ResponseWriter, _ *http.Request) {
LastSyncedBlock: ss.LastSyncedBlock,
CommittedDepth: uint8(ss.CommittedDepth),
IsWarmingUp: s.isWarmingUp,
Metrics: ss.Metrics,
})
}

Expand Down Expand Up @@ -146,6 +148,7 @@ func (s *Service) statusGetPeersHandler(w http.ResponseWriter, r *http.Request)
snapshot.IsReachable = ss.IsReachable
snapshot.LastSyncedBlock = ss.LastSyncedBlock
snapshot.CommittedDepth = uint8(ss.CommittedDepth)
snapshot.Metrics = ss.Metrics
}

mu.Lock()
Expand Down
40 changes: 40 additions & 0 deletions pkg/api/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/status"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/topology"
"github.com/prometheus/client_golang/prometheus"
)

func TestGetStatus(t *testing.T) {
Expand All @@ -41,6 +42,22 @@ func TestGetStatus(t *testing.T) {
IsReachable: true,
LastSyncedBlock: 6092500,
CommittedDepth: 1,
Metrics: map[string]string{
"test_response_duration_seconds": `# HELP test_response_duration_seconds Histogram of API response durations.
# TYPE test_response_duration_seconds histogram
test_response_duration_seconds_bucket{test="label",le="0.01"} 1
test_response_duration_seconds_bucket{test="label",le="0.1"} 1
test_response_duration_seconds_bucket{test="label",le="0.25"} 2
test_response_duration_seconds_bucket{test="label",le="0.5"} 2
test_response_duration_seconds_bucket{test="label",le="1"} 3
test_response_duration_seconds_bucket{test="label",le="2.5"} 4
test_response_duration_seconds_bucket{test="label",le="5"} 4
test_response_duration_seconds_bucket{test="label",le="10"} 6
test_response_duration_seconds_bucket{test="label",le="+Inf"} 7
test_response_duration_seconds_sum{test="label"} 78.15
test_response_duration_seconds_count{test="label"} 7
`,
},
}

ssMock := &statusSnapshotMock{
Expand All @@ -53,13 +70,35 @@ func TestGetStatus(t *testing.T) {
committedDepth: ssr.CommittedDepth,
}

metricsRegistry := prometheus.NewRegistry()

h := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "test",
Name: "response_duration_seconds",
Help: "Histogram of API response durations.",
Buckets: []float64{0.01, 0.1, 0.25, 0.5, 1, 2.5, 5, 10},
ConstLabels: prometheus.Labels{
"test": "label",
},
})

metricsRegistry.MustRegister(h)

points := []float64{0.25, 5.2, 1.5, 1, 5.2, 0, 65}
var sum float64
for _, p := range points {
h.Observe(p)
sum += p
}

statusSvc := status.NewService(
log.Noop,
nil,
new(topologyPeersIterNoopMock),
mode.String(),
ssMock,
ssMock,
metricsRegistry,
)

statusSvc.SetSync(ssMock)
Expand All @@ -86,6 +125,7 @@ func TestGetStatus(t *testing.T) {
"",
nil,
nil,
nil,
),
})

Expand Down
Loading
Loading