Skip to content

Commit 8cce78b

Browse files
committed
feat(exp/metricsserver): Expose metrics over libp2p streams
Allows users to run a Prometheus+Grafana setup remotely and gather metrics securely over libp2p.
1 parent 578af0c commit 8cce78b

File tree

5 files changed

+315
-0
lines changed

5 files changed

+315
-0
lines changed

x/cmd/libp2phttpproxy/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# http over libp2p proxy
2+
3+
This is a simple proxy server that proxies native HTTP requests to a target
4+
server using HTTP over libp2p streams.
5+
6+
The motivating use case is for use with the metrics server. This proxy lets us
7+
expose a standard HTTP server to prometheus while proxying metrics requests over
8+
libp2p.
9+
10+
## Usage
11+
12+
```
13+
PROXY_TARGET="multiaddr:/ip4/127.0.0.1/tcp/49346/p2p/.../http-path/some-path" go run ./cmd/libp2phttpproxy
14+
```
15+
16+
In another terminal:
17+
```
18+
curl localhost:5005
19+
```
20+
21+

x/cmd/libp2phttpproxy/main.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"net/http/httputil"
6+
"net/url"
7+
"os"
8+
"os/signal"
9+
"strings"
10+
11+
"github.com/libp2p/go-libp2p"
12+
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
13+
"github.com/multiformats/go-multiaddr"
14+
)
15+
16+
func main() {
17+
proxyTarget := os.Getenv("PROXY_TARGET")
18+
if proxyTarget == "" {
19+
log.Fatal("PROXY_TARGET must be set")
20+
}
21+
22+
h, err := libp2p.New(libp2p.NoListenAddrs)
23+
if err != nil {
24+
log.Fatal(err)
25+
}
26+
defer h.Close()
27+
28+
const multiaddrPrefix = "multiaddr:"
29+
if !strings.HasPrefix(proxyTarget, multiaddrPrefix) {
30+
log.Fatalf("PROXY_TARGET must start with %q", multiaddrPrefix)
31+
}
32+
33+
targetUrl, _ := url.Parse(proxyTarget)
34+
35+
httpHost := libp2phttp.Host{
36+
StreamHost: h,
37+
InsecureAllowHTTP: true,
38+
ListenAddrs: []multiaddr.Multiaddr{multiaddr.StringCast("/ip4/127.0.0.1/tcp/5005/http")},
39+
}
40+
41+
// reverse proxy
42+
proxy := &httputil.ReverseProxy{
43+
Rewrite: func(r *httputil.ProxyRequest) {
44+
copiedURL := *targetUrl
45+
r.Out.URL = &copiedURL
46+
},
47+
Transport: &httpHost,
48+
}
49+
50+
httpHost.SetHTTPHandlerAtPath("/http-reverse-proxy/0.0.1", "/", proxy)
51+
go httpHost.Serve()
52+
53+
log.Println("Listening on:")
54+
for _, a := range httpHost.Addrs() {
55+
log.Println(a.Encapsulate(multiaddr.StringCast("/p2p/" + h.ID().String())))
56+
}
57+
58+
// Wait for interrupt signal to stop
59+
intSig := make(chan os.Signal, 1)
60+
signal.Notify(intSig, os.Interrupt)
61+
<-intSig
62+
log.Println("Interrupt signal received, closing host")
63+
}

x/metricsserver/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# libp2p metrics server
2+
3+
This is an experimental libp2p service that exposes a libp2p node's metrics to
4+
a set of allowlisted peers (by their Peer ID).
5+
6+
Useful for:
7+
- Help debugging a node you do not control.
8+
- Ingesting a node's metrics over libp2p when exposing the standard HTTP metrics
9+
server is difficult.
10+
- Allowing only authenticated peers to ingest metrics.

x/metricsserver/example_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package metricsserver_test
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net/http"
7+
"net/url"
8+
"strings"
9+
10+
"github.com/libp2p/go-libp2p"
11+
"github.com/libp2p/go-libp2p/core/peer"
12+
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
13+
"github.com/libp2p/go-libp2p/x/metricsserver"
14+
)
15+
16+
func newClient() (*http.Client, peer.ID, func(), error) {
17+
streamHost, err := libp2p.New(libp2p.NoListenAddrs, libp2p.DefaultTransports)
18+
if err != nil {
19+
return nil, "", nil, err
20+
}
21+
22+
rt := &libp2phttp.Host{
23+
StreamHost: streamHost,
24+
}
25+
26+
return &http.Client{Transport: rt}, streamHost.ID(), func() {
27+
streamHost.Close()
28+
}, nil
29+
}
30+
31+
func ExampleWithMetricsServer() {
32+
client, clientID, close, err := newClient()
33+
if err != nil {
34+
fmt.Println("Error creating client:", err)
35+
return
36+
}
37+
defer close()
38+
39+
var opts []libp2p.Option
40+
opts = metricsserver.WithMetricsServer(opts, []peer.ID{clientID})
41+
42+
h, err := libp2p.New(opts...)
43+
if err != nil {
44+
fmt.Println("Error creating libp2p host:", err)
45+
return
46+
}
47+
defer h.Close()
48+
49+
url := fmt.Sprintf("multiaddr:%s/p2p/%s/http-path/%s", h.Addrs()[0], h.ID(), url.QueryEscape(metricsserver.ProtocolID))
50+
resp, err := client.Get(url)
51+
if err != nil {
52+
fmt.Println("Error getting metrics:", err)
53+
return
54+
}
55+
56+
metricsResponseBody, err := io.ReadAll(resp.Body)
57+
if err != nil {
58+
fmt.Println("Error reading metrics response body:", err)
59+
return
60+
}
61+
62+
foundMetrics := strings.Contains(string(metricsResponseBody), "go_gc_duration_seconds")
63+
fmt.Println("foundMetrics:", foundMetrics)
64+
// Output: foundMetrics: true
65+
}

x/metricsserver/server.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package metricsserver
2+
3+
import (
4+
"crypto/rand"
5+
"encoding/hex"
6+
"net/http"
7+
8+
logging "github.com/ipfs/go-log/v2"
9+
"github.com/libp2p/go-libp2p"
10+
"github.com/libp2p/go-libp2p/core/crypto"
11+
"github.com/libp2p/go-libp2p/core/host"
12+
"github.com/libp2p/go-libp2p/core/peer"
13+
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
14+
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/client_golang/prometheus/promhttp"
16+
"go.uber.org/fx"
17+
)
18+
19+
const ProtocolID = "/libp2p/x/metrics/0.0.1"
20+
21+
var log = logging.Logger("libp2p/x/metrics-server")
22+
23+
func WithMetricsServer(opts []libp2p.Option, allowedPeers []peer.ID) []libp2p.Option {
24+
opts = append(opts, libp2p.WithFxOption(
25+
fx.Provide(func(params constructorParams) *MetricsServer {
26+
27+
var ownHTTPHost bool
28+
httpHost := params.HTTPHost
29+
if httpHost == nil {
30+
// No http host available. We'll make our own
31+
ownHTTPHost = true
32+
httpHost = &libp2phttp.Host{
33+
StreamHost: params.StreamHost,
34+
}
35+
}
36+
37+
if httpHost.StreamHost == nil {
38+
log.Warn("No StreamHost set for the MetricsServer's HTTP host. MetricsServer will not be able to serve HTTP requests over libp2p streams.")
39+
}
40+
41+
m := NewMetricsServer(params.Registerer, params.Gatherer)
42+
m.AllowedPeers = allowedPeers
43+
44+
params.L.Append(fx.StartStopHook(func() error {
45+
err := m.Start(httpHost)
46+
if err != nil {
47+
return err
48+
}
49+
50+
if ownHTTPHost {
51+
go httpHost.Serve()
52+
}
53+
return nil
54+
}, func() error {
55+
if ownHTTPHost {
56+
httpHost.Close()
57+
}
58+
return m.Close()
59+
}))
60+
61+
return m
62+
},
63+
),
64+
// We want the metrics server started even if nothing else depends on it.
65+
fx.Invoke(func(m *MetricsServer) { _ = m })))
66+
return opts
67+
}
68+
69+
type MetricsServer struct {
70+
AllowedPeers []peer.ID
71+
handler http.Handler
72+
closed chan struct{}
73+
}
74+
75+
func NewMetricsServer(r prometheus.Registerer, g prometheus.Gatherer) *MetricsServer {
76+
if r == nil {
77+
r = prometheus.DefaultRegisterer
78+
}
79+
if g == nil {
80+
g = prometheus.DefaultGatherer
81+
}
82+
83+
m := &MetricsServer{
84+
handler: promhttp.InstrumentMetricHandler(r, promhttp.HandlerFor(g, promhttp.HandlerOpts{})),
85+
closed: make(chan struct{}),
86+
}
87+
return m
88+
}
89+
90+
func (m *MetricsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
91+
select {
92+
case <-m.closed:
93+
http.Error(w, "server closed", http.StatusServiceUnavailable)
94+
default:
95+
}
96+
97+
client := libp2phttp.ClientPeerID(r)
98+
if client == "" {
99+
http.Error(w, "no client peer ID", http.StatusForbidden)
100+
}
101+
var found bool
102+
for _, allowedPeer := range m.AllowedPeers {
103+
if client == allowedPeer {
104+
found = true
105+
break
106+
}
107+
}
108+
if !found {
109+
http.Error(w, "not allowed", http.StatusForbidden)
110+
}
111+
112+
m.handler.ServeHTTP(w, r)
113+
}
114+
115+
func (m *MetricsServer) Start(httpHost *libp2phttp.Host) error {
116+
m.closed = make(chan struct{})
117+
httpHost.SetHTTPHandler(ProtocolID, m)
118+
119+
if len(m.AllowedPeers) == 0 {
120+
log.Info("No allowed peers specified. Generting a key for a single peer.")
121+
122+
sk, _, err := crypto.GenerateEd25519Key(rand.Reader)
123+
if err != nil {
124+
return err
125+
}
126+
skBytes, err := sk.Raw()
127+
if err != nil {
128+
return err
129+
}
130+
skAsHex := hex.EncodeToString(skBytes)
131+
132+
m.AllowedPeers = make([]peer.ID, 1)
133+
m.AllowedPeers[0], err = peer.IDFromPrivateKey(sk)
134+
if err != nil {
135+
return err
136+
}
137+
log.Info("Metrics server started. Login using key %s", skAsHex)
138+
}
139+
140+
return nil
141+
}
142+
143+
func (m *MetricsServer) Close() error {
144+
log.Info("Metrics server closed.")
145+
m.AllowedPeers = nil
146+
return nil
147+
}
148+
149+
type constructorParams struct {
150+
fx.In
151+
L fx.Lifecycle
152+
StreamHost host.Host
153+
HTTPHost *libp2phttp.Host `optional:"true"`
154+
Registerer prometheus.Registerer `optional:"true"`
155+
Gatherer prometheus.Gatherer `optional:"true"`
156+
}

0 commit comments

Comments
 (0)