Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ require (
github.com/pelletier/go-toml v1.8.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/client_model v0.6.0
github.com/prometheus/common v0.47.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/prometheus/statsd_exporter v0.22.7 // indirect
Expand Down
26 changes: 26 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,32 @@ func (s *Service) contentLengthMetricMiddleware() func(h http.Handler) http.Hand
}
}

func (s *Service) downloadSpeedMetricMiddleware() func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now()
h.ServeHTTP(w, r)

if rw, ok := w.(*responseWriter); ok {
speed := float64(rw.size) / time.Since(now).Seconds()
s.metrics.DownloadSpeed.Observe(speed)
}
})
}
}

func (s *Service) uploadSpeedMetricMiddleware() func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now()
h.ServeHTTP(w, r)

speed := float64(r.ContentLength) / time.Since(now).Seconds()
s.metrics.UploadSpeed.Observe(speed)
})
}
}

// gasConfigMiddleware can be used by the APIs that allow block chain transactions to set
// gas price and gas limit through the HTTP API headers.
func (s *Service) gasConfigMiddleware(handlerName string) func(h http.Handler) http.Handler {
Expand Down
33 changes: 32 additions & 1 deletion pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

"github.com/ethersphere/bee/v2"
m "github.com/ethersphere/bee/v2/pkg/metrics"
"github.com/ethersphere/bee/v2/pkg/status"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
)
Expand All @@ -29,6 +30,8 @@
ResponseCodeCounts *prometheus.CounterVec

ContentApiDuration prometheus.HistogramVec
UploadSpeed prometheus.Histogram
DownloadSpeed prometheus.Histogram
}

func newMetrics() metrics {
Expand Down Expand Up @@ -64,6 +67,20 @@
Help: "Histogram of file upload API response durations.",
Buckets: []float64{0.5, 1, 2.5, 5, 10, 30, 60},
}, []string{"filesize", "method"}),
UploadSpeed: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "upload_speed",
Help: "Histogram of upload speed in B/s.",
Buckets: []float64{0.25, 0.5, 0.75, 1, 1.25, 1.5, 1.75, 2, 2.5, 3, 4, 5},
}),
DownloadSpeed: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "download_speed",
Help: "Histogram of download speed in B/s.",
Buckets: []float64{0.5, 1, 1.5, 2, 2.5, 3, 4, 5, 6, 7, 8, 9},
}),
}
}

Expand Down Expand Up @@ -102,6 +119,13 @@
})
}

func (s *Service) StatusMetrics() status.Metrics {
return status.Metrics{
UploadSpeed: s.metrics.UploadSpeed,
DownloadSpeed: s.metrics.DownloadSpeed,
}
}

// UpgradedResponseWriter adds more functionality on top of ResponseWriter
type UpgradedResponseWriter interface {
http.ResponseWriter
Expand All @@ -114,18 +138,25 @@
UpgradedResponseWriter
statusCode int
wroteHeader bool
size int
}

func newResponseWriter(w http.ResponseWriter) *responseWriter {
// StatusOK is called by default if nothing else is called
uw := w.(UpgradedResponseWriter)
return &responseWriter{uw, http.StatusOK, false}
return &responseWriter{uw, http.StatusOK, false, 0}
}

func (rw *responseWriter) Status() int {
return rw.statusCode
}

func (rr *responseWriter) Write(b []byte) (int, error) {
size, err := rr.UpgradedResponseWriter.Write(b)
rr.size += size
return size, err
}

func (rw *responseWriter) WriteHeader(code int) {
if rw.wroteHeader {
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func (s *Service) mountAPI() {
"POST": web.ChainHandlers(
s.contentLengthMetricMiddleware(),
s.newTracingHandler("bzz-upload"),
s.uploadSpeedMetricMiddleware(),
web.FinalHandlerFunc(s.bzzUploadHandler),
),
})
Expand All @@ -325,6 +326,7 @@ func (s *Service) mountAPI() {
s.contentLengthMetricMiddleware(),
s.newTracingHandler("bzz-download"),
s.actDecryptionHandler(),
s.downloadSpeedMetricMiddleware(),
web.FinalHandlerFunc(s.bzzDownloadHandler),
),
"HEAD": web.ChainHandlers(
Expand Down
80 changes: 65 additions & 15 deletions pkg/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,29 @@ import (
"time"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/status"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/topology"
)

type statusSnapshotResponse struct {
Overlay string `json:"overlay"`
Proximity uint `json:"proximity"`
BeeMode string `json:"beeMode"`
ReserveSize uint64 `json:"reserveSize"`
ReserveSizeWithinRadius uint64 `json:"reserveSizeWithinRadius"`
PullsyncRate float64 `json:"pullsyncRate"`
StorageRadius uint8 `json:"storageRadius"`
ConnectedPeers uint64 `json:"connectedPeers"`
NeighborhoodSize uint64 `json:"neighborhoodSize"`
RequestFailed bool `json:"requestFailed,omitempty"`
BatchCommitment uint64 `json:"batchCommitment"`
IsReachable bool `json:"isReachable"`
LastSyncedBlock uint64 `json:"lastSyncedBlock"`
CommittedDepth uint8 `json:"committedDepth"`
IsWarmingUp bool `json:"isWarmingUp"`
Overlay string `json:"overlay"`
Proximity uint `json:"proximity"`
BeeMode string `json:"beeMode"`
ReserveSize uint64 `json:"reserveSize"`
ReserveSizeWithinRadius uint64 `json:"reserveSizeWithinRadius"`
PullsyncRate float64 `json:"pullsyncRate"`
StorageRadius uint8 `json:"storageRadius"`
ConnectedPeers uint64 `json:"connectedPeers"`
NeighborhoodSize uint64 `json:"neighborhoodSize"`
RequestFailed bool `json:"requestFailed,omitempty"`
BatchCommitment uint64 `json:"batchCommitment"`
IsReachable bool `json:"isReachable"`
LastSyncedBlock uint64 `json:"lastSyncedBlock"`
CommittedDepth uint8 `json:"committedDepth"`
IsWarmingUp bool `json:"isWarmingUp"`
UploadSpeed *statusHistogramResponse `json:"uploadSpeed,omitempty"`
DownloadSpeed *statusHistogramResponse `json:"downloadSpeed,omitempty"`
}

type statusResponse struct {
Expand All @@ -48,6 +51,51 @@ type neighborhoodsResponse struct {
Neighborhoods []statusNeighborhoodResponse `json:"neighborhoods"`
}

func newStatusHistogramResponse(h *status.Histogram) *statusHistogramResponse {
if h == nil {
return nil
}
labels := make([]statusLabelResponse, 0, len(h.Labels))
for _, l := range h.Labels {
labels = append(labels, statusLabelResponse{
Name: l.Name,
Value: l.Value,
})
}

buckets := make([]statusHistogramBucketResponse, 0, len(h.Buckets))
for _, b := range h.Buckets {
buckets = append(buckets, statusHistogramBucketResponse{
CumulativeCount: b.CumulativeCount,
UpperBound: b.UpperBound,
})
}

return &statusHistogramResponse{
Labels: labels,
SampleSum: h.SampleSum,
SampleCount: h.SampleCount,
Buckets: buckets,
}
}

type statusHistogramResponse struct {
Labels []statusLabelResponse `json:"labels,omitempty"`
SampleSum float64 `json:"sampleSum,omitempty"`
SampleCount uint64 `json:"sampleCount,omitempty"`
Buckets []statusHistogramBucketResponse `json:"buckets,omitempty"`
}

type statusLabelResponse struct {
Name string `json:"name,omitempty"`
Value string `json:"value,omitempty"`
}

type statusHistogramBucketResponse struct {
CumulativeCount uint64 `json:"cumulativeCount,omitempty"`
UpperBound float64 `json:"upperBound,omitempty"`
}

// statusAccessHandler is a middleware that limits the number of simultaneous
// status requests.
func (s *Service) statusAccessHandler(h http.Handler) http.Handler {
Expand Down Expand Up @@ -98,6 +146,8 @@ func (s *Service) statusGetHandler(w http.ResponseWriter, _ *http.Request) {
LastSyncedBlock: ss.LastSyncedBlock,
CommittedDepth: uint8(ss.CommittedDepth),
IsWarmingUp: s.isWarmingUp,
UploadSpeed: newStatusHistogramResponse(ss.UploadSpeed),
DownloadSpeed: newStatusHistogramResponse(ss.DownloadSpeed),
})
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/api/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestGetStatus(t *testing.T) {
mode.String(),
ssMock,
ssMock,
status.Metrics{},
)

statusSvc.SetSync(ssMock)
Expand All @@ -86,6 +87,7 @@ func TestGetStatus(t *testing.T) {
"",
nil,
nil,
status.Metrics{},
),
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ func NewBee(

validStamp := postage.ValidStamp(batchStore)

nodeStatus := status.NewService(logger, p2ps, kad, beeNodeMode.String(), batchStore, localStore)
nodeStatus := status.NewService(logger, p2ps, kad, beeNodeMode.String(), batchStore, localStore, apiService.StatusMetrics())
if err = p2ps.AddProtocol(nodeStatus.Protocol()); err != nil {
return nil, fmt.Errorf("status service: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/status/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ const (
ProtocolVersion = protocolVersion
StreamName = streamName
)

var NewHistogram = newHistogram
Loading
Loading