Skip to content

Commit a086937

Browse files
authored
Merge pull request #606 from ginglis13/metrics-proxy
feat: add snapshotter metrics proxy and service discovery mechanism
2 parents c20c72e + 79fdc28 commit a086937

File tree

13 files changed

+378
-38
lines changed

13 files changed

+378
-38
lines changed

snapshotter/app/service.go

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"os"
2222
"os/signal"
2323
"strconv"
24+
"strings"
2425
"syscall"
2526

2627
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
@@ -35,6 +36,8 @@ import (
3536
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/config"
3637
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux"
3738
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/cache"
39+
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/metrics"
40+
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/metrics/discovery"
3841
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/proxy"
3942
proxyaddress "github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/proxy/address"
4043
)
@@ -52,7 +55,8 @@ func Run(config config.Config) error {
5255

5356
group, ctx := errgroup.WithContext(ctx)
5457

55-
snapshotter, err := initSnapshotter(ctx, config)
58+
cache := cache.NewSnapshotterCache()
59+
snapshotter, err := initSnapshotter(ctx, config, cache)
5660
if err != nil {
5761
log.G(ctx).WithFields(
5862
logrus.Fields{"resolver": config.Snapshotter.Proxy.Address.Resolver.Type},
@@ -76,6 +80,15 @@ func Run(config config.Config) error {
7680
return err
7781
}
7882

83+
var serviceDiscovery *discovery.ServiceDiscovery
84+
if config.Snapshotter.Metrics.Enable {
85+
sdPort := config.Snapshotter.Metrics.ServiceDiscoveryPort
86+
serviceDiscovery = discovery.NewServiceDiscovery(config.Snapshotter.Metrics.Host, sdPort, cache)
87+
group.Go(func() error {
88+
return serviceDiscovery.Serve()
89+
})
90+
}
91+
7992
group.Go(func() error {
8093
return grpcServer.Serve(listener)
8194
})
@@ -88,6 +101,11 @@ func Run(config config.Config) error {
88101
if err := snapshotter.Close(); err != nil {
89102
log.G(ctx).WithError(err).Error("failed to close snapshotter")
90103
}
104+
if serviceDiscovery != nil {
105+
if err := serviceDiscovery.Shutdown(ctx); err != nil {
106+
log.G(ctx).WithError(err).Error("failed to shutdown service discovery server")
107+
}
108+
}
91109
}()
92110

93111
for {
@@ -123,13 +141,13 @@ func initResolver(config config.Config) (proxyaddress.Resolver, error) {
123141
const base10 = 10
124142
const bits32 = 32
125143

126-
func initSnapshotter(ctx context.Context, config config.Config) (snapshots.Snapshotter, error) {
144+
func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cache) (snapshots.Snapshotter, error) {
127145
resolver, err := initResolver(config)
128146
if err != nil {
129147
return nil, err
130148
}
131149

132-
newProxySnapshotterFunc := func(ctx context.Context, namespace string) (snapshots.Snapshotter, error) {
150+
newProxySnapshotterFunc := func(ctx context.Context, namespace string) (*proxy.RemoteSnapshotter, error) {
133151
r := resolver
134152
response, err := r.Get(namespace)
135153
if err != nil {
@@ -148,22 +166,58 @@ func initSnapshotter(ctx context.Context, config config.Config) (snapshots.Snaps
148166
return vsock.DialContext(ctx, host, uint32(port), vsock.WithLogger(log.G(ctx)))
149167
}
150168

169+
var metricsProxy *metrics.Proxy
170+
// TODO (ginglis13): port management and lifecycle ties in to overall metrics proxy
171+
// server lifecycle. tracked here: https://github.com/firecracker-microvm/firecracker-containerd/issues/607
172+
portMap := make(map[int]bool)
151173
if config.Snapshotter.Metrics.Enable {
152174
metricsPort, err := strconv.ParseUint(response.MetricsPort, base10, bits32)
153175
if err != nil {
154176
return nil, err
155177
}
156178

157-
// TODO (ginglis13) metricsDialer func to be defined here using metricsPort. It will dial
158-
// the same host but connect via its own port. The metrics proxy will be configured in NewProxySnapshotter
159-
// task 2 of https://github.com/firecracker-microvm/firecracker-containerd/issues/602
160-
_ = func(ctx context.Context, _ string) (net.Conn, error) {
179+
metricsDialer := func(ctx context.Context, _, _ string) (net.Conn, error) {
161180
return vsock.DialContext(ctx, host, uint32(metricsPort), vsock.WithLogger(log.G(ctx)))
162181
}
182+
183+
portRange := config.Snapshotter.Metrics.PortRange
184+
metricsHost := config.Snapshotter.Metrics.Host
185+
186+
// Assign a port for metrics proxy server.
187+
ports := strings.Split(portRange, "-")
188+
portRangeError := fmt.Errorf("invalid port range %s", portRange)
189+
if len(ports) < 2 {
190+
return nil, portRangeError
191+
}
192+
lower, err := strconv.Atoi(ports[0])
193+
if err != nil {
194+
return nil, portRangeError
195+
}
196+
upper, err := strconv.Atoi(ports[1])
197+
if err != nil {
198+
return nil, portRangeError
199+
}
200+
port := -1
201+
for p := lower; p <= upper; p++ {
202+
if _, ok := portMap[p]; !ok {
203+
port = p
204+
portMap[p] = true
205+
break
206+
}
207+
}
208+
if port < 0 {
209+
return nil, fmt.Errorf("invalid port: %d", port)
210+
}
211+
212+
metricsProxy, err = metrics.NewProxy(metricsHost, port, response.Labels, metricsDialer)
213+
if err != nil {
214+
return nil, err
215+
}
216+
163217
}
164218

165-
return proxy.NewProxySnapshotter(ctx, host, snapshotterDialer)
219+
return proxy.NewProxySnapshotter(ctx, host, snapshotterDialer, metricsProxy)
166220
}
167221

168-
return demux.NewSnapshotter(cache.NewSnapshotterCache(), newProxySnapshotterFunc), nil
222+
return demux.NewSnapshotter(cache, newProxySnapshotterFunc), nil
169223
}

snapshotter/config/config.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ type debug struct {
5959
}
6060

6161
type metrics struct {
62-
Enable bool `toml:"enable" default:"false"`
62+
Enable bool `toml:"enable" default:"false"`
63+
Host string `toml:"host"`
64+
PortRange string `toml:"port_range"`
65+
ServiceDiscoveryPort int `toml:"service_discovery_port"`
6366
}
6467

6568
// Load parses application configuration from a specified file path.

snapshotter/config/config.toml.example

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88

99
[snapshotter.metrics]
1010
enable = true
11+
port_range = "9000-9999"
12+
[snapshotter.metrics.service_discovery]
13+
enable = true
14+
port = 8080
1115

1216
[debug]
1317
logLevel = "info"

snapshotter/config/config_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ func parseExampleConfig() error {
8484
address = "localhost:10001"
8585
[snapshotter.metrics]
8686
enable = true
87-
87+
port_range = "9000-9999"
88+
host = "0.0.0.0"
89+
service_discovery_port = 8080
8890
[debug]
8991
logLevel = "debug"
9092
`)
@@ -103,7 +105,10 @@ func parseExampleConfig() error {
103105
},
104106
},
105107
Metrics: metrics{
106-
Enable: true,
108+
Enable: true,
109+
PortRange: "9000-9999",
110+
Host: "0.0.0.0",
111+
ServiceDiscoveryPort: 8080,
107112
},
108113
},
109114
Debug: debug{

snapshotter/demux/cache/cache.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,17 @@ package cache
1616
import (
1717
"context"
1818

19-
"github.com/containerd/containerd/snapshots"
19+
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/proxy"
2020
)
2121

2222
// SnapshotterProvider defines a snapshotter fetch function.
23-
type SnapshotterProvider = func(context.Context, string) (snapshots.Snapshotter, error)
23+
type SnapshotterProvider = func(context.Context, string) (*proxy.RemoteSnapshotter, error)
2424

2525
// Cache defines the interface for a snapshotter caching mechanism.
2626
type Cache interface {
2727
// Retrieves the snapshotter from the underlying cache using the provided
2828
// fetch function if the snapshotter is not currently cached.
29-
Get(ctx context.Context, key string, fetch SnapshotterProvider) (snapshots.Snapshotter, error)
29+
Get(ctx context.Context, key string, fetch SnapshotterProvider) (*proxy.RemoteSnapshotter, error)
3030

3131
// Closes the snapshotter and removes it from the cache.
3232
Evict(key string) error

snapshotter/demux/cache/snapshotter_cache.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,24 @@ import (
1818
"fmt"
1919
"sync"
2020

21-
"github.com/containerd/containerd/snapshots"
21+
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/proxy"
2222
"github.com/hashicorp/go-multierror"
2323
)
2424

2525
// SnapshotterCache implements a read, write protected cache mechanism
2626
// for keyed snapshotters.
2727
type SnapshotterCache struct {
2828
mutex *sync.RWMutex
29-
snapshotters map[string]snapshots.Snapshotter
29+
snapshotters map[string]*proxy.RemoteSnapshotter
3030
}
3131

3232
// NewSnapshotterCache creates a new instance with an empty cache.
3333
func NewSnapshotterCache() *SnapshotterCache {
34-
return &SnapshotterCache{&sync.RWMutex{}, make(map[string]snapshots.Snapshotter)}
34+
return &SnapshotterCache{&sync.RWMutex{}, make(map[string]*proxy.RemoteSnapshotter)}
3535
}
3636

3737
// Get fetches and caches the snapshotter for a given key.
38-
func (cache *SnapshotterCache) Get(ctx context.Context, key string, fetch SnapshotterProvider) (snapshots.Snapshotter, error) {
38+
func (cache *SnapshotterCache) Get(ctx context.Context, key string, fetch SnapshotterProvider) (*proxy.RemoteSnapshotter, error) {
3939
snapshotter, ok := cache.snapshotters[key]
4040

4141
if !ok {
@@ -59,7 +59,7 @@ func (cache *SnapshotterCache) Get(ctx context.Context, key string, fetch Snapsh
5959
// Evict removes a cached snapshotter for a given key.
6060
func (cache *SnapshotterCache) Evict(key string) error {
6161
cache.mutex.RLock()
62-
snapshotter, ok := cache.snapshotters[key]
62+
remoteSnapshotter, ok := cache.snapshotters[key]
6363
cache.mutex.RUnlock()
6464

6565
if !ok {
@@ -68,16 +68,16 @@ func (cache *SnapshotterCache) Evict(key string) error {
6868
cache.mutex.Lock()
6969
defer cache.mutex.Unlock()
7070

71-
err := snapshotter.Close()
71+
err := remoteSnapshotter.Close()
7272
delete(cache.snapshotters, key)
7373
return err
7474
}
7575

7676
// Close calls Close on all cached remote snapshotters.
7777
func (cache *SnapshotterCache) Close() error {
7878
var compiledErr error
79-
for _, snapshotter := range cache.snapshotters {
80-
if err := snapshotter.Close(); err != nil {
79+
for _, remoteSnapshotter := range cache.snapshotters {
80+
if err := remoteSnapshotter.Close(); err != nil {
8181
compiledErr = multierror.Append(compiledErr, err)
8282
}
8383
}

snapshotter/demux/cache/snapshotter_cache_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,23 @@ import (
1717
"context"
1818
"testing"
1919

20-
"github.com/containerd/containerd/snapshots"
2120
"github.com/hashicorp/go-multierror"
2221
"github.com/pkg/errors"
2322

2423
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/internal"
24+
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/proxy"
2525
)
2626

27-
func getSnapshotterOkFunction(ctx context.Context, key string) (snapshots.Snapshotter, error) {
28-
return &internal.SuccessfulSnapshotter{}, nil
27+
func getSnapshotterOkFunction(ctx context.Context, key string) (*proxy.RemoteSnapshotter, error) {
28+
return &proxy.RemoteSnapshotter{Snapshotter: &internal.SuccessfulSnapshotter{}}, nil
2929
}
3030

31-
func getSnapshotterErrorFunction(ctx context.Context, key string) (snapshots.Snapshotter, error) {
32-
return &internal.SuccessfulSnapshotter{}, errors.New("MOCK ERROR")
31+
func getSnapshotterErrorFunction(ctx context.Context, key string) (*proxy.RemoteSnapshotter, error) {
32+
return &proxy.RemoteSnapshotter{Snapshotter: &internal.SuccessfulSnapshotter{}}, errors.New("MOCK ERROR")
3333
}
3434

35-
func getFailingSnapshotterOkFunction(ctx context.Context, key string) (snapshots.Snapshotter, error) {
36-
return &internal.FailingSnapshotter{}, nil
35+
func getFailingSnapshotterOkFunction(ctx context.Context, key string) (*proxy.RemoteSnapshotter, error) {
36+
return &proxy.RemoteSnapshotter{Snapshotter: &internal.FailingSnapshotter{}}, nil
3737
}
3838

3939
func getSnapshotterFromEmptyCache(uut *SnapshotterCache) error {
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
14+
package discovery
15+
16+
import (
17+
"context"
18+
"encoding/json"
19+
"fmt"
20+
"net/http"
21+
"strconv"
22+
23+
"github.com/containerd/containerd/log"
24+
"github.com/firecracker-microvm/firecracker-containerd/snapshotter/demux/cache"
25+
)
26+
27+
// ServiceDiscovery discovers metrics services in the snapshotter cache
28+
// and serves a []MetricsTarget on an HTTP server.
29+
type ServiceDiscovery struct {
30+
// cache is the shared host-level cache of snapshotters also used by the demux snapshotter.
31+
cache cache.Cache
32+
// server is the HTTP server that returns a list of targets to pull metrics from and apply labels to those metrics.
33+
server *http.Server
34+
}
35+
36+
// NewServiceDiscovery returns a ServiceDiscovery with configured HTTP server and provided cache.
37+
func NewServiceDiscovery(host string, port int, c cache.Cache) *ServiceDiscovery {
38+
return &ServiceDiscovery{
39+
server: &http.Server{
40+
Addr: host + ":" + strconv.Itoa(port),
41+
},
42+
cache: c,
43+
}
44+
}
45+
46+
// Serve starts the HTTP server for receiving service discovery requests.
47+
func (sd *ServiceDiscovery) Serve() error {
48+
sd.server.Handler = http.HandlerFunc(sd.serviceDiscoveryHandler)
49+
err := sd.server.ListenAndServe()
50+
if err != http.ErrServerClosed {
51+
return err
52+
}
53+
54+
return nil
55+
}
56+
57+
// Shutdown shuts down the service discovery HTTP server.
58+
func (sd *ServiceDiscovery) Shutdown(ctx context.Context) error {
59+
return sd.server.Shutdown(ctx)
60+
}
61+
62+
// metricsTarget represents the underlying JSON structure
63+
// that a Prometheus HTTP Service Discovery server is
64+
// expected to return.
65+
//
66+
// https://prometheus.io/docs/prometheus/latest/http_sd/
67+
type metricsTarget struct {
68+
Targets []string `json:"targets"`
69+
Labels map[string]string `json:"labels"`
70+
}
71+
72+
// serviceDiscoveryHandler scans the cache for snapshotters and their proxy server information,
73+
// and builds and returns a JSON response in the format of a Prometheus service discovery endpoint.
74+
func (sd *ServiceDiscovery) serviceDiscoveryHandler(w http.ResponseWriter, req *http.Request) {
75+
ctx := req.Context()
76+
77+
namespaces := sd.cache.List()
78+
services := []metricsTarget{}
79+
for _, ns := range namespaces {
80+
cachedSnapshotter, err := sd.cache.Get(ctx, ns, nil)
81+
if err != nil {
82+
log.G(ctx).Error("unable to retrieve from snapshotter cache: ", err)
83+
continue
84+
}
85+
86+
// make target "localhost:{PORT}" -> the metrics proxy for given snapshotter
87+
targetString := fmt.Sprintf("localhost:%v", cachedSnapshotter.MetricsProxyPort())
88+
target := []string{targetString}
89+
90+
// build list of discovered services
91+
mt := metricsTarget{Targets: target, Labels: cachedSnapshotter.MetricsProxyLabels()}
92+
services = append(services, mt)
93+
}
94+
95+
w.Header().Set("Content-Type", "application/json")
96+
97+
response, err := json.Marshal(services)
98+
if err != nil {
99+
log.G(ctx).Error("unable to marshal service discovery response: ", err)
100+
w.WriteHeader(500)
101+
return
102+
}
103+
104+
w.Write(response)
105+
}

0 commit comments

Comments
 (0)