@@ -55,7 +55,29 @@ func Run(config config.Config) error {
55
55
group , ctx := errgroup .WithContext (ctx )
56
56
57
57
cache := cache .NewSnapshotterCache ()
58
- snapshotter , err := initSnapshotter (ctx , config , cache )
58
+
59
+ var (
60
+ monitor * metrics.Monitor
61
+ serviceDiscovery * discovery.ServiceDiscovery
62
+ )
63
+ if config .Snapshotter .Metrics .Enable {
64
+ sdHost := config .Snapshotter .Metrics .Host
65
+ sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
66
+ serviceDiscovery := discovery .NewServiceDiscovery (sdHost , sdPort , cache )
67
+ monitor , err := initMetricsProxyMonitor (config .Snapshotter .Metrics .PortRange )
68
+ if err != nil {
69
+ log .G (ctx ).WithError (err ).Fatal ("failed creating metrics proxy monitor" )
70
+ return err
71
+ }
72
+ group .Go (func () error {
73
+ return serviceDiscovery .Serve ()
74
+ })
75
+ group .Go (func () error {
76
+ return monitor .Start ()
77
+ })
78
+ }
79
+
80
+ snapshotter , err := initSnapshotter (ctx , config , cache , monitor )
59
81
if err != nil {
60
82
log .G (ctx ).WithFields (
61
83
logrus.Fields {"resolver" : config .Snapshotter .Proxy .Address .Resolver .Type },
@@ -79,15 +101,6 @@ func Run(config config.Config) error {
79
101
return err
80
102
}
81
103
82
- var serviceDiscovery * discovery.ServiceDiscovery
83
- if config .Snapshotter .Metrics .Enable {
84
- sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
85
- serviceDiscovery = discovery .NewServiceDiscovery (config .Snapshotter .Metrics .Host , sdPort , cache )
86
- group .Go (func () error {
87
- return serviceDiscovery .Serve ()
88
- })
89
- }
90
-
91
104
group .Go (func () error {
92
105
return grpcServer .Serve (listener )
93
106
})
@@ -100,10 +113,13 @@ func Run(config config.Config) error {
100
113
if err := snapshotter .Close (); err != nil {
101
114
log .G (ctx ).WithError (err ).Error ("failed to close snapshotter" )
102
115
}
103
- if serviceDiscovery != nil {
116
+ if config . Snapshotter . Metrics . Enable {
104
117
if err := serviceDiscovery .Shutdown (ctx ); err != nil {
105
118
log .G (ctx ).WithError (err ).Error ("failed to shutdown service discovery server" )
106
119
}
120
+ // Senders to this channel would panic if it is closed. However snapshotter.Close() will
121
+ // shutdown all metrics proxies and ensure there are no more senders over the channel.
122
+ monitor .Stop ()
107
123
}
108
124
}()
109
125
@@ -140,13 +156,13 @@ func initResolver(config config.Config) (proxyaddress.Resolver, error) {
140
156
const base10 = 10
141
157
const bits32 = 32
142
158
143
- func initSnapshotter (ctx context.Context , config config.Config , cache cache.Cache ) (snapshots.Snapshotter , error ) {
159
+ func initSnapshotter (ctx context.Context , config config.Config , cache cache.Cache , monitor * metrics. Monitor ) (snapshots.Snapshotter , error ) {
144
160
resolver , err := initResolver (config )
145
161
if err != nil {
146
162
return nil , err
147
163
}
148
164
149
- newProxySnapshotterFunc := func (ctx context.Context , namespace string ) (* proxy.RemoteSnapshotter , error ) {
165
+ newRemoteSnapshotterFunc := func (ctx context.Context , namespace string ) (* proxy.RemoteSnapshotter , error ) {
150
166
r := resolver
151
167
response , err := r .Get (namespace )
152
168
if err != nil {
@@ -162,57 +178,48 @@ func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cach
162
178
}
163
179
164
180
var metricsProxy * metrics.Proxy
165
- // TODO (ginglis13): port management and lifecycle ties in to overall metrics proxy
166
- // server lifecycle. tracked here: https://github.com/firecracker-microvm/firecracker-containerd/issues/607
167
- portMap := make (map [int ]bool )
168
181
if config .Snapshotter .Metrics .Enable {
169
- metricsPort , err := strconv . ParseUint ( response .MetricsPort , base10 , bits32 )
182
+ metricsProxy , err = initMetricsProxy ( config , monitor , host , response .MetricsPort , response . Labels )
170
183
if err != nil {
171
184
return nil , err
172
185
}
186
+ }
173
187
174
- metricsDialer := func (ctx context.Context , _ , _ string ) (net.Conn , error ) {
175
- return vsock .DialContext (ctx , host , uint32 (metricsPort ), vsock .WithLogger (log .G (ctx )))
176
- }
188
+ return proxy .NewRemoteSnapshotter (ctx , host , snapshotterDialer , metricsProxy )
189
+ }
177
190
178
- portRange := config . Snapshotter . Metrics . PortRange
179
- metricsHost := config . Snapshotter . Metrics . Host
191
+ return demux . NewSnapshotter ( cache , newRemoteSnapshotterFunc ), nil
192
+ }
180
193
181
- // Assign a port for metrics proxy server.
182
- ports := strings .Split (portRange , "-" )
183
- portRangeError := fmt .Errorf ("invalid port range %s" , portRange )
184
- if len (ports ) < 2 {
185
- return nil , portRangeError
186
- }
187
- lower , err := strconv .Atoi (ports [0 ])
188
- if err != nil {
189
- return nil , portRangeError
190
- }
191
- upper , err := strconv .Atoi (ports [1 ])
192
- if err != nil {
193
- return nil , portRangeError
194
- }
195
- port := - 1
196
- for p := lower ; p <= upper ; p ++ {
197
- if _ , ok := portMap [p ]; ! ok {
198
- port = p
199
- portMap [p ] = true
200
- break
201
- }
202
- }
203
- if port < 0 {
204
- return nil , fmt .Errorf ("invalid port: %d" , port )
205
- }
194
+ func initMetricsProxyMonitor (portRange string ) (* metrics.Monitor , error ) {
195
+ ports := strings .Split (portRange , "-" )
196
+ portRangeError := fmt .Errorf ("invalid port range %s" , portRange )
197
+ if len (ports ) < 2 {
198
+ return nil , portRangeError
199
+ }
200
+ lower , err := strconv .Atoi (ports [0 ])
201
+ if err != nil {
202
+ return nil , portRangeError
203
+ }
204
+ upper , err := strconv .Atoi (ports [1 ])
205
+ if err != nil {
206
+ return nil , portRangeError
207
+ }
206
208
207
- metricsProxy , err = metrics .NewProxy (metricsHost , port , response .Labels , metricsDialer )
208
- if err != nil {
209
- return nil , err
210
- }
209
+ return metrics .NewMonitor (lower , upper )
210
+ }
211
211
212
- }
212
+ func initMetricsProxy (config config.Config , monitor * metrics.Monitor , host , port string , labels map [string ]string ) (* metrics.Proxy , error ) {
213
+ metricsPort , err := strconv .ParseUint (port , base10 , bits32 )
214
+ if err != nil {
215
+ return nil , err
216
+ }
213
217
214
- return proxy .NewProxySnapshotter (ctx , host , snapshotterDialer , metricsProxy )
218
+ metricsDialer := func (ctx context.Context , _ , _ string ) (net.Conn , error ) {
219
+ return vsock .DialContext (ctx , host , uint32 (metricsPort ), vsock .WithLogger (log .G (ctx )))
215
220
}
216
221
217
- return demux .NewSnapshotter (cache , newProxySnapshotterFunc ), nil
222
+ metricsHost := config .Snapshotter .Metrics .Host
223
+
224
+ return metrics .NewProxy (metricsHost , monitor , labels , metricsDialer )
218
225
}
0 commit comments