Skip to content

Commit 6c027ad

Browse files
committed
add monitoring of metrics proxy servers
This change introduces a Monitor type which is made aware of a metrics proxy's closure via a channel. It contains the logic for port selection for a new metrics proxy and port freeing for a metrics proxy that has been shut down. Unit test for metrics method added. resolves #607 Signed-off-by: Gavin Inglis <[email protected]>
1 parent 5b7fa79 commit 6c027ad

File tree

4 files changed

+391
-69
lines changed

4 files changed

+391
-69
lines changed

snapshotter/app/service.go

Lines changed: 62 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"context"
1818
"fmt"
1919
"net"
20-
"net/url"
2120
"os"
2221
"os/signal"
2322
"strconv"
@@ -56,7 +55,29 @@ func Run(config config.Config) error {
5655
group, ctx := errgroup.WithContext(ctx)
5756

5857
cache := cache.NewSnapshotterCache()
59-
snapshotter, err := initSnapshotter(ctx, config, cache)
58+
59+
var (
60+
monitor *metrics.Monitor
61+
serviceDiscovery *discovery.ServiceDiscovery
62+
)
63+
if config.Snapshotter.Metrics.Enable {
64+
sdHost := config.Snapshotter.Metrics.Host
65+
sdPort := config.Snapshotter.Metrics.ServiceDiscoveryPort
66+
serviceDiscovery := discovery.NewServiceDiscovery(sdHost, sdPort, cache)
67+
monitor, err := initMetricsProxyMonitor(config.Snapshotter.Metrics.PortRange)
68+
if err != nil {
69+
log.G(ctx).WithError(err).Fatal("failed creating metrics proxy monitor")
70+
return err
71+
}
72+
group.Go(func() error {
73+
return serviceDiscovery.Serve()
74+
})
75+
group.Go(func() error {
76+
return monitor.Start()
77+
})
78+
}
79+
80+
snapshotter, err := initSnapshotter(ctx, config, cache, monitor)
6081
if err != nil {
6182
log.G(ctx).WithFields(
6283
logrus.Fields{"resolver": config.Snapshotter.Proxy.Address.Resolver.Type},
@@ -80,15 +101,6 @@ func Run(config config.Config) error {
80101
return err
81102
}
82103

83-
var serviceDiscovery *discovery.ServiceDiscovery
84-
if config.Snapshotter.Metrics.Enable {
85-
sdPort := config.Snapshotter.Metrics.ServiceDiscoveryPort
86-
serviceDiscovery = discovery.NewServiceDiscovery(config.Snapshotter.Metrics.Host, sdPort, cache)
87-
group.Go(func() error {
88-
return serviceDiscovery.Serve()
89-
})
90-
}
91-
92104
group.Go(func() error {
93105
return grpcServer.Serve(listener)
94106
})
@@ -101,10 +113,13 @@ func Run(config config.Config) error {
101113
if err := snapshotter.Close(); err != nil {
102114
log.G(ctx).WithError(err).Error("failed to close snapshotter")
103115
}
104-
if serviceDiscovery != nil {
116+
if config.Snapshotter.Metrics.Enable {
105117
if err := serviceDiscovery.Shutdown(ctx); err != nil {
106118
log.G(ctx).WithError(err).Error("failed to shutdown service discovery server")
107119
}
120+
// Senders to this channel would panic if it is closed. However snapshotter.Close() will
121+
// shutdown all metrics proxies and ensure there are no more senders over the channel.
122+
monitor.Stop()
108123
}
109124
}()
110125

@@ -141,23 +156,19 @@ func initResolver(config config.Config) (proxyaddress.Resolver, error) {
141156
const base10 = 10
142157
const bits32 = 32
143158

144-
func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cache) (snapshots.Snapshotter, error) {
159+
func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cache, monitor *metrics.Monitor) (snapshots.Snapshotter, error) {
145160
resolver, err := initResolver(config)
146161
if err != nil {
147162
return nil, err
148163
}
149164

150-
newProxySnapshotterFunc := func(ctx context.Context, namespace string) (*proxy.RemoteSnapshotter, error) {
165+
newRemoteSnapshotterFunc := func(ctx context.Context, namespace string) (*proxy.RemoteSnapshotter, error) {
151166
r := resolver
152167
response, err := r.Get(namespace)
153168
if err != nil {
154169
return nil, err
155170
}
156-
u, err := url.Parse(response.Address)
157-
if err != nil {
158-
return nil, err
159-
}
160-
host := u.Hostname()
171+
host := response.Address
161172
port, err := strconv.ParseUint(response.SnapshotterPort, base10, bits32)
162173
if err != nil {
163174
return nil, err
@@ -167,57 +178,48 @@ func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cach
167178
}
168179

169180
var metricsProxy *metrics.Proxy
170-
// TODO (ginglis13): port management and lifecycle ties in to overall metrics proxy
171-
// server lifecycle. tracked here: https://github.com/firecracker-microvm/firecracker-containerd/issues/607
172-
portMap := make(map[int]bool)
173181
if config.Snapshotter.Metrics.Enable {
174-
metricsPort, err := strconv.ParseUint(response.MetricsPort, base10, bits32)
182+
metricsProxy, err = initMetricsProxy(config, monitor, host, response.MetricsPort, response.Labels)
175183
if err != nil {
176184
return nil, err
177185
}
186+
}
178187

179-
metricsDialer := func(ctx context.Context, _, _ string) (net.Conn, error) {
180-
return vsock.DialContext(ctx, host, uint32(metricsPort), vsock.WithLogger(log.G(ctx)))
181-
}
188+
return proxy.NewRemoteSnapshotter(ctx, host, snapshotterDialer, metricsProxy)
189+
}
182190

183-
portRange := config.Snapshotter.Metrics.PortRange
184-
metricsHost := config.Snapshotter.Metrics.Host
191+
return demux.NewSnapshotter(cache, newRemoteSnapshotterFunc), nil
192+
}
185193

186-
// Assign a port for metrics proxy server.
187-
ports := strings.Split(portRange, "-")
188-
portRangeError := fmt.Errorf("invalid port range %s", portRange)
189-
if len(ports) < 2 {
190-
return nil, portRangeError
191-
}
192-
lower, err := strconv.Atoi(ports[0])
193-
if err != nil {
194-
return nil, portRangeError
195-
}
196-
upper, err := strconv.Atoi(ports[1])
197-
if err != nil {
198-
return nil, portRangeError
199-
}
200-
port := -1
201-
for p := lower; p <= upper; p++ {
202-
if _, ok := portMap[p]; !ok {
203-
port = p
204-
portMap[p] = true
205-
break
206-
}
207-
}
208-
if port < 0 {
209-
return nil, fmt.Errorf("invalid port: %d", port)
210-
}
194+
func initMetricsProxyMonitor(portRange string) (*metrics.Monitor, error) {
195+
ports := strings.Split(portRange, "-")
196+
portRangeError := fmt.Errorf("invalid port range %s", portRange)
197+
if len(ports) < 2 {
198+
return nil, portRangeError
199+
}
200+
lower, err := strconv.Atoi(ports[0])
201+
if err != nil {
202+
return nil, portRangeError
203+
}
204+
upper, err := strconv.Atoi(ports[1])
205+
if err != nil {
206+
return nil, portRangeError
207+
}
211208

212-
metricsProxy, err = metrics.NewProxy(metricsHost, port, response.Labels, metricsDialer)
213-
if err != nil {
214-
return nil, err
215-
}
209+
return metrics.NewMonitor(lower, upper)
210+
}
216211

217-
}
212+
func initMetricsProxy(config config.Config, monitor *metrics.Monitor, host, port string, labels map[string]string) (*metrics.Proxy, error) {
213+
metricsPort, err := strconv.ParseUint(port, base10, bits32)
214+
if err != nil {
215+
return nil, err
216+
}
218217

219-
return proxy.NewProxySnapshotter(ctx, host, snapshotterDialer, metricsProxy)
218+
metricsDialer := func(ctx context.Context, _, _ string) (net.Conn, error) {
219+
return vsock.DialContext(ctx, host, uint32(metricsPort), vsock.WithLogger(log.G(ctx)))
220220
}
221221

222-
return demux.NewSnapshotter(cache, newProxySnapshotterFunc), nil
222+
metricsHost := config.Snapshotter.Metrics.Host
223+
224+
return metrics.NewProxy(metricsHost, monitor, labels, metricsDialer)
223225
}

snapshotter/demux/metrics/proxy.go

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ package metrics
1515

1616
import (
1717
"context"
18+
"fmt"
1819
"io"
1920
"net"
2021
"net/http"
2122
"strconv"
23+
"sync"
2224

2325
"github.com/containerd/containerd/log"
2426
)
@@ -30,10 +32,76 @@ const (
3032
MaxMetricsResponseSize = 32768 // 32 KB
3133
)
3234

35+
// Monitor is used for port and lifecycle management of metrics proxies.
36+
type Monitor struct {
37+
// ch is used for communicating when a metrics proxy and its port are no longer in use
38+
// so that the Monitor can mark the port as free.
39+
ch chan int
40+
// portsInUse is a set that tracks used and unused metrics proxy server ports.
41+
portsInUse map[int]bool
42+
// startPort is first port in the configurable range of ports to serve metrics proxies from.
43+
startPort int
44+
// endPort is last port in the configurable range of ports to serve metrics proxies from.
45+
endPort int
46+
// mu is the lock used for threadsafe reading and writing of the metrics proxy port map.
47+
mu *sync.Mutex
48+
}
49+
50+
// NewMonitor returns a new metrics proxy monitor.
51+
func NewMonitor(startPort, endPort int) (*Monitor, error) {
52+
if startPort <= 0 || endPort <= 0 || startPort > endPort {
53+
return nil, fmt.Errorf("invalid port range %d-%d", startPort, endPort)
54+
}
55+
return &Monitor{make(chan int), make(map[int]bool), startPort, endPort, &sync.Mutex{}}, nil
56+
}
57+
58+
// Start receives messages from the proxy server channel indiciating a server has closed or terminated.
59+
// It marks the metrics proxy server's port as available by removing it from the portsInUse set.
60+
func (m *Monitor) Start() error {
61+
for freePort := range m.ch {
62+
m.mu.Lock()
63+
delete(m.portsInUse, freePort)
64+
m.mu.Unlock()
65+
}
66+
67+
return nil
68+
}
69+
70+
// Stop closes the metrics proxy monitoring channel.
71+
func (m *Monitor) Stop() {
72+
close(m.ch)
73+
}
74+
75+
// findOpenPort returns a port for use by a metrics proxy server.
76+
func (m *Monitor) findOpenPort() (int, error) {
77+
m.mu.Lock()
78+
defer m.mu.Unlock()
79+
metricsProxyPort := -1
80+
for p := m.startPort; p <= m.endPort; p++ {
81+
if _, ok := m.portsInUse[p]; !ok {
82+
// Mark the port as in use in the set
83+
metricsProxyPort = p
84+
m.portsInUse[p] = true
85+
break
86+
}
87+
}
88+
89+
if metricsProxyPort < 0 {
90+
return 0, fmt.Errorf("unable to find port in range %d-%d", m.startPort, m.endPort)
91+
}
92+
93+
return metricsProxyPort, nil
94+
}
95+
96+
// HTTPClient defines the interface for the client getting metrics.
97+
type HTTPClient interface {
98+
Get(string) (*http.Response, error)
99+
}
100+
33101
// Proxy represents a metrics proxy server for getting and serving remote snapshotter metrics.
34102
type Proxy struct {
35103
// client is the HTTP client used to send metrics requests to a remote snapshotter.
36-
client *http.Client
104+
client HTTPClient
37105
// server is the proxy server on host for serving remote snapshotter metrics.
38106
server *http.Server
39107
// host is the nonroutable address listening for metrics requests.
@@ -42,13 +110,19 @@ type Proxy struct {
42110
Port int
43111
// Labels are labels used to apply to metrics.
44112
Labels map[string]string
113+
// monitor is the metrics proxy monitor
114+
monitor *Monitor
45115
}
46116

47117
// NewProxy creates a new Proxy with initialized HTTP client and port.
48118
// dialer is used as the DialContext underlying the metrics HTTP client's RoundTripper.
49119
//
50120
// Reference: https://pkg.go.dev/net/http#Transport
51-
func NewProxy(host string, port int, labels map[string]string, dialer func(context.Context, string, string) (net.Conn, error)) (*Proxy, error) {
121+
func NewProxy(host string, monitor *Monitor, labels map[string]string, dialer func(context.Context, string, string) (net.Conn, error)) (*Proxy, error) {
122+
port, err := monitor.findOpenPort()
123+
if err != nil {
124+
return nil, err
125+
}
52126
return &Proxy{
53127
client: &http.Client{
54128
Transport: &http.Transport{
@@ -74,9 +148,12 @@ func (mp *Proxy) Serve(ctx context.Context) error {
74148

75149
err := mp.server.ListenAndServe()
76150
if err != http.ErrServerClosed {
151+
log.G(ctx).Errorf("metrics proxy server: %v\n", err)
152+
mp.monitor.ch <- mp.Port
77153
return err
78154
}
79155

156+
mp.monitor.ch <- mp.Port
80157
return nil
81158
}
82159

@@ -92,13 +169,18 @@ func (mp *Proxy) metrics(w http.ResponseWriter, req *http.Request) {
92169
// Pull metrics and copy to HTTP response.
93170
res, err := mp.client.Get(RequestPath)
94171
if err != nil {
95-
log.G(ctx).Errorf("error reading response body: %v\n", err)
172+
log.G(ctx).Errorf("error reading response body: %v", err)
96173
w.WriteHeader(500)
97174
return
98175
}
99176

100177
defer res.Body.Close()
101178
response := io.LimitReader(res.Body, MaxMetricsResponseSize)
102179

103-
io.Copy(w, response)
180+
_, err = io.Copy(w, response)
181+
if err != nil {
182+
log.G(ctx).Errorf("error writing response body: %v", err)
183+
w.WriteHeader(500)
184+
return
185+
}
104186
}

0 commit comments

Comments
 (0)