From 8cce78b271830174e58a220a01313c931294f398 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 20 Nov 2024 16:13:03 -0800 Subject: [PATCH 1/2] feat(exp/metricsserver): Expose metrics over libp2p streams Allows users to run a Prometheus+Grafana setup remotely and gather metrics securely over libp2p. --- x/cmd/libp2phttpproxy/README.md | 21 +++++ x/cmd/libp2phttpproxy/main.go | 63 +++++++++++++ x/metricsserver/README.md | 10 ++ x/metricsserver/example_test.go | 65 +++++++++++++ x/metricsserver/server.go | 156 ++++++++++++++++++++++++++++++++ 5 files changed, 315 insertions(+) create mode 100644 x/cmd/libp2phttpproxy/README.md create mode 100644 x/cmd/libp2phttpproxy/main.go create mode 100644 x/metricsserver/README.md create mode 100644 x/metricsserver/example_test.go create mode 100644 x/metricsserver/server.go diff --git a/x/cmd/libp2phttpproxy/README.md b/x/cmd/libp2phttpproxy/README.md new file mode 100644 index 0000000000..a63979c7ba --- /dev/null +++ b/x/cmd/libp2phttpproxy/README.md @@ -0,0 +1,21 @@ +# http over libp2p proxy + +This is a simple proxy server that proxies native HTTP requests to a target +server using HTTP over libp2p streams. + +The motivating use case is for use with the metrics server. This proxy lets us +expose a standard HTTP server to prometheus while proxying metrics requests over +libp2p. + +## Usage + +``` +PROXY_TARGET="multiaddr:/ip4/127.0.0.1/tcp/49346/p2p/.../http-path/some-path" go run ./cmd/libp2phttpproxy +``` + +In another terminal: +``` +curl localhost:5005 +``` + + diff --git a/x/cmd/libp2phttpproxy/main.go b/x/cmd/libp2phttpproxy/main.go new file mode 100644 index 0000000000..c26f5f2932 --- /dev/null +++ b/x/cmd/libp2phttpproxy/main.go @@ -0,0 +1,63 @@ +package main + +import ( + "log" + "net/http/httputil" + "net/url" + "os" + "os/signal" + "strings" + + "github.com/libp2p/go-libp2p" + libp2phttp "github.com/libp2p/go-libp2p/p2p/http" + "github.com/multiformats/go-multiaddr" +) + +func main() { + proxyTarget := os.Getenv("PROXY_TARGET") + if proxyTarget == "" { + log.Fatal("PROXY_TARGET must be set") + } + + h, err := libp2p.New(libp2p.NoListenAddrs) + if err != nil { + log.Fatal(err) + } + defer h.Close() + + const multiaddrPrefix = "multiaddr:" + if !strings.HasPrefix(proxyTarget, multiaddrPrefix) { + log.Fatalf("PROXY_TARGET must start with %q", multiaddrPrefix) + } + + targetUrl, _ := url.Parse(proxyTarget) + + httpHost := libp2phttp.Host{ + StreamHost: h, + InsecureAllowHTTP: true, + ListenAddrs: []multiaddr.Multiaddr{multiaddr.StringCast("/ip4/127.0.0.1/tcp/5005/http")}, + } + + // reverse proxy + proxy := &httputil.ReverseProxy{ + Rewrite: func(r *httputil.ProxyRequest) { + copiedURL := *targetUrl + r.Out.URL = &copiedURL + }, + Transport: &httpHost, + } + + httpHost.SetHTTPHandlerAtPath("/http-reverse-proxy/0.0.1", "/", proxy) + go httpHost.Serve() + + log.Println("Listening on:") + for _, a := range httpHost.Addrs() { + log.Println(a.Encapsulate(multiaddr.StringCast("/p2p/" + h.ID().String()))) + } + + // Wait for interrupt signal to stop + intSig := make(chan os.Signal, 1) + signal.Notify(intSig, os.Interrupt) + <-intSig + log.Println("Interrupt signal received, closing host") +} diff --git a/x/metricsserver/README.md b/x/metricsserver/README.md new file mode 100644 index 0000000000..a048bdb057 --- /dev/null +++ b/x/metricsserver/README.md @@ -0,0 +1,10 @@ +# libp2p metrics server + +This is an experimental libp2p service that exposes a libp2p node's metrics to +a set of allowlisted peers (by their Peer ID). + +Useful for: +- Help debugging a node you do not control. +- Ingesting a node's metrics over libp2p when exposing the standard HTTP metrics + server is difficult. +- Allowing only authenticated peers to ingest metrics. diff --git a/x/metricsserver/example_test.go b/x/metricsserver/example_test.go new file mode 100644 index 0000000000..61d400bea6 --- /dev/null +++ b/x/metricsserver/example_test.go @@ -0,0 +1,65 @@ +package metricsserver_test + +import ( + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/peer" + libp2phttp "github.com/libp2p/go-libp2p/p2p/http" + "github.com/libp2p/go-libp2p/x/metricsserver" +) + +func newClient() (*http.Client, peer.ID, func(), error) { + streamHost, err := libp2p.New(libp2p.NoListenAddrs, libp2p.DefaultTransports) + if err != nil { + return nil, "", nil, err + } + + rt := &libp2phttp.Host{ + StreamHost: streamHost, + } + + return &http.Client{Transport: rt}, streamHost.ID(), func() { + streamHost.Close() + }, nil +} + +func ExampleWithMetricsServer() { + client, clientID, close, err := newClient() + if err != nil { + fmt.Println("Error creating client:", err) + return + } + defer close() + + var opts []libp2p.Option + opts = metricsserver.WithMetricsServer(opts, []peer.ID{clientID}) + + h, err := libp2p.New(opts...) + if err != nil { + fmt.Println("Error creating libp2p host:", err) + return + } + defer h.Close() + + url := fmt.Sprintf("multiaddr:%s/p2p/%s/http-path/%s", h.Addrs()[0], h.ID(), url.QueryEscape(metricsserver.ProtocolID)) + resp, err := client.Get(url) + if err != nil { + fmt.Println("Error getting metrics:", err) + return + } + + metricsResponseBody, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading metrics response body:", err) + return + } + + foundMetrics := strings.Contains(string(metricsResponseBody), "go_gc_duration_seconds") + fmt.Println("foundMetrics:", foundMetrics) + // Output: foundMetrics: true +} diff --git a/x/metricsserver/server.go b/x/metricsserver/server.go new file mode 100644 index 0000000000..8f184ee02a --- /dev/null +++ b/x/metricsserver/server.go @@ -0,0 +1,156 @@ +package metricsserver + +import ( + "crypto/rand" + "encoding/hex" + "net/http" + + logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + libp2phttp "github.com/libp2p/go-libp2p/p2p/http" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/fx" +) + +const ProtocolID = "/libp2p/x/metrics/0.0.1" + +var log = logging.Logger("libp2p/x/metrics-server") + +func WithMetricsServer(opts []libp2p.Option, allowedPeers []peer.ID) []libp2p.Option { + opts = append(opts, libp2p.WithFxOption( + fx.Provide(func(params constructorParams) *MetricsServer { + + var ownHTTPHost bool + httpHost := params.HTTPHost + if httpHost == nil { + // No http host available. We'll make our own + ownHTTPHost = true + httpHost = &libp2phttp.Host{ + StreamHost: params.StreamHost, + } + } + + if httpHost.StreamHost == nil { + log.Warn("No StreamHost set for the MetricsServer's HTTP host. MetricsServer will not be able to serve HTTP requests over libp2p streams.") + } + + m := NewMetricsServer(params.Registerer, params.Gatherer) + m.AllowedPeers = allowedPeers + + params.L.Append(fx.StartStopHook(func() error { + err := m.Start(httpHost) + if err != nil { + return err + } + + if ownHTTPHost { + go httpHost.Serve() + } + return nil + }, func() error { + if ownHTTPHost { + httpHost.Close() + } + return m.Close() + })) + + return m + }, + ), + // We want the metrics server started even if nothing else depends on it. + fx.Invoke(func(m *MetricsServer) { _ = m }))) + return opts +} + +type MetricsServer struct { + AllowedPeers []peer.ID + handler http.Handler + closed chan struct{} +} + +func NewMetricsServer(r prometheus.Registerer, g prometheus.Gatherer) *MetricsServer { + if r == nil { + r = prometheus.DefaultRegisterer + } + if g == nil { + g = prometheus.DefaultGatherer + } + + m := &MetricsServer{ + handler: promhttp.InstrumentMetricHandler(r, promhttp.HandlerFor(g, promhttp.HandlerOpts{})), + closed: make(chan struct{}), + } + return m +} + +func (m *MetricsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + select { + case <-m.closed: + http.Error(w, "server closed", http.StatusServiceUnavailable) + default: + } + + client := libp2phttp.ClientPeerID(r) + if client == "" { + http.Error(w, "no client peer ID", http.StatusForbidden) + } + var found bool + for _, allowedPeer := range m.AllowedPeers { + if client == allowedPeer { + found = true + break + } + } + if !found { + http.Error(w, "not allowed", http.StatusForbidden) + } + + m.handler.ServeHTTP(w, r) +} + +func (m *MetricsServer) Start(httpHost *libp2phttp.Host) error { + m.closed = make(chan struct{}) + httpHost.SetHTTPHandler(ProtocolID, m) + + if len(m.AllowedPeers) == 0 { + log.Info("No allowed peers specified. Generting a key for a single peer.") + + sk, _, err := crypto.GenerateEd25519Key(rand.Reader) + if err != nil { + return err + } + skBytes, err := sk.Raw() + if err != nil { + return err + } + skAsHex := hex.EncodeToString(skBytes) + + m.AllowedPeers = make([]peer.ID, 1) + m.AllowedPeers[0], err = peer.IDFromPrivateKey(sk) + if err != nil { + return err + } + log.Info("Metrics server started. Login using key %s", skAsHex) + } + + return nil +} + +func (m *MetricsServer) Close() error { + log.Info("Metrics server closed.") + m.AllowedPeers = nil + return nil +} + +type constructorParams struct { + fx.In + L fx.Lifecycle + StreamHost host.Host + HTTPHost *libp2phttp.Host `optional:"true"` + Registerer prometheus.Registerer `optional:"true"` + Gatherer prometheus.Gatherer `optional:"true"` +} From 72c3efe15348f8b9c945d2bf8cf39f705eef7bb8 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 25 Feb 2025 15:26:59 -0800 Subject: [PATCH 2/2] docs(x): add readme --- x/README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 x/README.md diff --git a/x/README.md b/x/README.md new file mode 100644 index 0000000000..1c2fe9630a --- /dev/null +++ b/x/README.md @@ -0,0 +1,12 @@ +# libp2p `x` packages + +The packages in this directory are experimental and subject to change or +removal. They may introduce breaking changes without a major version bump. If +you find these packages helpful, please let us know and we can work together on +stabilizing them. + +## libp2p/x/metricsserver + +This is an experimental libp2p service that exposes a libp2p node's metrics to +a set of allowlisted peers (by their Peer ID). +