Skip to content

metrics over libp2p streams #3110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions x/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# libp2p `x` packages

The packages in this directory are experimental and subject to change or
removal. They may introduce breaking changes without a major version bump. If
you find these packages helpful, please let us know and we can work together on
stabilizing them.

## libp2p/x/metricsserver

This is an experimental libp2p service that exposes a libp2p node's metrics to
a set of allowlisted peers (by their Peer ID).

21 changes: 21 additions & 0 deletions x/cmd/libp2phttpproxy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# http over libp2p proxy

This is a simple proxy server that proxies native HTTP requests to a target
server using HTTP over libp2p streams.

The motivating use case is for use with the metrics server. This proxy lets us
expose a standard HTTP server to prometheus while proxying metrics requests over
libp2p.

## Usage

```
PROXY_TARGET="multiaddr:/ip4/127.0.0.1/tcp/49346/p2p/.../http-path/some-path" go run ./cmd/libp2phttpproxy
```

In another terminal:
```
curl localhost:5005
```


63 changes: 63 additions & 0 deletions x/cmd/libp2phttpproxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"log"
"net/http/httputil"
"net/url"
"os"
"os/signal"
"strings"

"github.com/libp2p/go-libp2p"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/multiformats/go-multiaddr"
)

func main() {
proxyTarget := os.Getenv("PROXY_TARGET")
if proxyTarget == "" {
log.Fatal("PROXY_TARGET must be set")
}

h, err := libp2p.New(libp2p.NoListenAddrs)
if err != nil {
log.Fatal(err)
}
defer h.Close()

const multiaddrPrefix = "multiaddr:"
if !strings.HasPrefix(proxyTarget, multiaddrPrefix) {
log.Fatalf("PROXY_TARGET must start with %q", multiaddrPrefix)
}

targetUrl, _ := url.Parse(proxyTarget)

httpHost := libp2phttp.Host{
StreamHost: h,
InsecureAllowHTTP: true,
ListenAddrs: []multiaddr.Multiaddr{multiaddr.StringCast("/ip4/127.0.0.1/tcp/5005/http")},
}

// reverse proxy
proxy := &httputil.ReverseProxy{
Rewrite: func(r *httputil.ProxyRequest) {
copiedURL := *targetUrl
r.Out.URL = &copiedURL
},
Transport: &httpHost,
}

httpHost.SetHTTPHandlerAtPath("/http-reverse-proxy/0.0.1", "/", proxy)
go httpHost.Serve()

log.Println("Listening on:")
for _, a := range httpHost.Addrs() {
log.Println(a.Encapsulate(multiaddr.StringCast("/p2p/" + h.ID().String())))
}

// Wait for interrupt signal to stop
intSig := make(chan os.Signal, 1)
signal.Notify(intSig, os.Interrupt)
<-intSig
log.Println("Interrupt signal received, closing host")
}
10 changes: 10 additions & 0 deletions x/metricsserver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# libp2p metrics server

This is an experimental libp2p service that exposes a libp2p node's metrics to
a set of allowlisted peers (by their Peer ID).

Useful for:
- Help debugging a node you do not control.
- Ingesting a node's metrics over libp2p when exposing the standard HTTP metrics
server is difficult.
- Allowing only authenticated peers to ingest metrics.
65 changes: 65 additions & 0 deletions x/metricsserver/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package metricsserver_test

import (
"fmt"
"io"
"net/http"
"net/url"
"strings"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/libp2p/go-libp2p/x/metricsserver"
)

func newClient() (*http.Client, peer.ID, func(), error) {
streamHost, err := libp2p.New(libp2p.NoListenAddrs, libp2p.DefaultTransports)
if err != nil {
return nil, "", nil, err
}

rt := &libp2phttp.Host{
StreamHost: streamHost,
}

return &http.Client{Transport: rt}, streamHost.ID(), func() {
streamHost.Close()
}, nil
}

func ExampleWithMetricsServer() {
client, clientID, close, err := newClient()
if err != nil {
fmt.Println("Error creating client:", err)
return
}
defer close()

var opts []libp2p.Option
opts = metricsserver.WithMetricsServer(opts, []peer.ID{clientID})

h, err := libp2p.New(opts...)
if err != nil {
fmt.Println("Error creating libp2p host:", err)
return
}
defer h.Close()

url := fmt.Sprintf("multiaddr:%s/p2p/%s/http-path/%s", h.Addrs()[0], h.ID(), url.QueryEscape(metricsserver.ProtocolID))
resp, err := client.Get(url)
if err != nil {
fmt.Println("Error getting metrics:", err)
return
}

metricsResponseBody, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("Error reading metrics response body:", err)
return
}

foundMetrics := strings.Contains(string(metricsResponseBody), "go_gc_duration_seconds")
fmt.Println("foundMetrics:", foundMetrics)
// Output: foundMetrics: true
}
156 changes: 156 additions & 0 deletions x/metricsserver/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package metricsserver

import (
"crypto/rand"
"encoding/hex"
"net/http"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/fx"
)

const ProtocolID = "/libp2p/x/metrics/0.0.1"

var log = logging.Logger("libp2p/x/metrics-server")

func WithMetricsServer(opts []libp2p.Option, allowedPeers []peer.ID) []libp2p.Option {
opts = append(opts, libp2p.WithFxOption(
fx.Provide(func(params constructorParams) *MetricsServer {

var ownHTTPHost bool
httpHost := params.HTTPHost
if httpHost == nil {
// No http host available. We'll make our own
ownHTTPHost = true
httpHost = &libp2phttp.Host{
StreamHost: params.StreamHost,
}
}

if httpHost.StreamHost == nil {
log.Warn("No StreamHost set for the MetricsServer's HTTP host. MetricsServer will not be able to serve HTTP requests over libp2p streams.")
}

m := NewMetricsServer(params.Registerer, params.Gatherer)
m.AllowedPeers = allowedPeers

params.L.Append(fx.StartStopHook(func() error {
err := m.Start(httpHost)
if err != nil {
return err
}

if ownHTTPHost {
go httpHost.Serve()
}
return nil
}, func() error {
if ownHTTPHost {
httpHost.Close()
}
return m.Close()
}))

return m
},
),
// We want the metrics server started even if nothing else depends on it.
fx.Invoke(func(m *MetricsServer) { _ = m })))
return opts
}

type MetricsServer struct {
AllowedPeers []peer.ID
handler http.Handler
closed chan struct{}
}

func NewMetricsServer(r prometheus.Registerer, g prometheus.Gatherer) *MetricsServer {
if r == nil {
r = prometheus.DefaultRegisterer
}
if g == nil {
g = prometheus.DefaultGatherer
}

m := &MetricsServer{
handler: promhttp.InstrumentMetricHandler(r, promhttp.HandlerFor(g, promhttp.HandlerOpts{})),
closed: make(chan struct{}),
}
return m
}

func (m *MetricsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
select {
case <-m.closed:
http.Error(w, "server closed", http.StatusServiceUnavailable)
default:
}

client := libp2phttp.ClientPeerID(r)
if client == "" {
http.Error(w, "no client peer ID", http.StatusForbidden)
}
var found bool
for _, allowedPeer := range m.AllowedPeers {
if client == allowedPeer {
found = true
break
}
}
if !found {
http.Error(w, "not allowed", http.StatusForbidden)
}

m.handler.ServeHTTP(w, r)
}

func (m *MetricsServer) Start(httpHost *libp2phttp.Host) error {
m.closed = make(chan struct{})
httpHost.SetHTTPHandler(ProtocolID, m)

if len(m.AllowedPeers) == 0 {
log.Info("No allowed peers specified. Generting a key for a single peer.")

sk, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return err
}
skBytes, err := sk.Raw()
if err != nil {
return err
}
skAsHex := hex.EncodeToString(skBytes)

m.AllowedPeers = make([]peer.ID, 1)
m.AllowedPeers[0], err = peer.IDFromPrivateKey(sk)
if err != nil {
return err
}
log.Info("Metrics server started. Login using key %s", skAsHex)
}

return nil
}

func (m *MetricsServer) Close() error {
log.Info("Metrics server closed.")
m.AllowedPeers = nil
return nil
}

type constructorParams struct {
fx.In
L fx.Lifecycle
StreamHost host.Host
HTTPHost *libp2phttp.Host `optional:"true"`
Registerer prometheus.Registerer `optional:"true"`
Gatherer prometheus.Gatherer `optional:"true"`
}
Loading