@@ -20,7 +20,7 @@ import (
20
20
"encoding/json"
21
21
"fmt"
22
22
"maps"
23
- "net"
23
+ "net/http "
24
24
"os"
25
25
"path"
26
26
"path/filepath"
@@ -31,10 +31,9 @@ import (
31
31
"time"
32
32
33
33
"github.com/google/uuid"
34
+ "github.com/prometheus/client_golang/prometheus"
35
+ "github.com/prometheus/client_golang/prometheus/promhttp"
34
36
"golang.org/x/net/context"
35
- "google.golang.org/grpc"
36
- "google.golang.org/grpc/health"
37
- "google.golang.org/grpc/health/grpc_health_v1"
38
37
corev1 "k8s.io/api/core/v1"
39
38
apiequality "k8s.io/apimachinery/pkg/api/equality"
40
39
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -115,18 +114,14 @@ type ConfigOverrideArgs struct {
115
114
116
115
// Args holds command line arguments
117
116
type Args struct {
118
- ConfigFile string
119
- Instance string
120
- Klog map [string ]* utils.KlogFlagVal
121
- Kubeconfig string
122
- Port int
123
- // GrpcHealthPort is only needed to avoid races between tests (by skipping the health server).
124
- // Could be removed when gRPC labler service is dropped (when nfd-worker tests stop running nfd-master).
125
- GrpcHealthPort int
117
+ ConfigFile string
118
+ Instance string
119
+ Klog map [string ]* utils.KlogFlagVal
120
+ Kubeconfig string
121
+ Port int
126
122
Prune bool
127
123
Options string
128
124
EnableLeaderElection bool
129
- MetricsPort int
130
125
131
126
Overrides ConfigOverrideArgs
132
127
}
@@ -139,7 +134,6 @@ type deniedNs struct {
139
134
type NfdMaster interface {
140
135
Run () error
141
136
Stop ()
142
- WaitForReady (time.Duration ) bool
143
137
}
144
138
145
139
type nfdMaster struct {
@@ -149,10 +143,7 @@ type nfdMaster struct {
149
143
namespace string
150
144
nodeName string
151
145
configFilePath string
152
- server * grpc.Server
153
- healthServer * grpc.Server
154
146
stop chan struct {}
155
- ready chan struct {}
156
147
kubeconfig * restclient.Config
157
148
k8sClient k8sclient.Interface
158
149
nfdClient nfdclientset.Interface
@@ -166,7 +157,6 @@ func NewNfdMaster(opts ...NfdMasterOption) (NfdMaster, error) {
166
157
nfd := & nfdMaster {
167
158
nodeName : utils .NodeName (),
168
159
namespace : utils .GetKubernetesNamespace (),
169
- ready : make (chan struct {}),
170
160
stop : make (chan struct {}),
171
161
}
172
162
@@ -298,22 +288,22 @@ func (m *nfdMaster) Run() error {
298
288
}
299
289
}
300
290
291
+ httpMux := http .NewServeMux ()
292
+
301
293
// Register to metrics server
302
- if m .args .MetricsPort > 0 {
303
- m := utils .CreateMetricsServer (m .args .MetricsPort ,
304
- buildInfo ,
305
- nodeUpdateRequests ,
306
- nodeUpdates ,
307
- nodeUpdateFailures ,
308
- nodeLabelsRejected ,
309
- nodeERsRejected ,
310
- nodeTaintsRejected ,
311
- nfrProcessingTime ,
312
- nfrProcessingErrors )
313
- go m .Run ()
314
- registerVersion (version .Get ())
315
- defer m .Stop ()
316
- }
294
+ promRegistry := prometheus .NewRegistry ()
295
+ promRegistry .MustRegister (
296
+ buildInfo ,
297
+ nodeUpdateRequests ,
298
+ nodeUpdates ,
299
+ nodeUpdateFailures ,
300
+ nodeLabelsRejected ,
301
+ nodeERsRejected ,
302
+ nodeTaintsRejected ,
303
+ nfrProcessingTime ,
304
+ nfrProcessingErrors )
305
+ httpMux .Handle ("/metrics" , promhttp .HandlerFor (promRegistry , promhttp.HandlerOpts {}))
306
+ registerVersion (version .Get ())
317
307
318
308
// Run updater that handles events from the nfd CRD API.
319
309
if m .nfdController != nil {
@@ -324,60 +314,29 @@ func (m *nfdMaster) Run() error {
324
314
}
325
315
}
326
316
327
- // Start gRPC server for liveness probe (at this point we're "live")
328
- grpcErr := make (chan error )
329
- if m .args .GrpcHealthPort != 0 {
330
- if err := m .startGrpcHealthServer (grpcErr ); err != nil {
331
- return fmt .Errorf ("failed to start gRPC health server: %w" , err )
332
- }
333
- }
334
-
335
- // Notify that we're ready to accept connections
336
- close (m .ready )
337
-
338
- // NFD-Master main event loop
339
- for {
340
- select {
341
- case err := <- grpcErr :
342
- return fmt .Errorf ("error in serving gRPC: %w" , err )
343
-
344
- case <- m .stop :
345
- klog .InfoS ("shutting down nfd-master" )
346
- return nil
347
- }
348
- }
349
- }
350
-
351
- // startGrpcHealthServer starts a gRPC health server for Kubernetes readiness/liveness probes.
352
- // TODO: improve status checking e.g. with watchdog in the main event loop and
353
- // cheking that node updater pool is alive.
354
- func (m * nfdMaster ) startGrpcHealthServer (errChan chan <- error ) error {
355
- lis , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , m .args .GrpcHealthPort ))
356
- if err != nil {
357
- return fmt .Errorf ("failed to listen: %w" , err )
358
- }
359
-
360
- s := grpc .NewServer ()
361
- grpc_health_v1 .RegisterHealthServer (s , health .NewServer ())
362
- klog .InfoS ("gRPC health server serving" , "port" , m .args .GrpcHealthPort )
317
+ // Register health probe (at this point we're "ready and live")
318
+ httpMux .HandleFunc ("/healthz" , m .Healthz )
363
319
320
+ // Start HTTP server
321
+ httpServer := http.Server {Addr : fmt .Sprintf (":%d" , m .args .Port ), Handler : httpMux }
364
322
go func () {
365
- defer func () {
366
- lis .Close ()
367
- }()
368
- if err := s .Serve (lis ); err != nil {
369
- errChan <- fmt .Errorf ("gRPC health server exited with an error: %w" , err )
370
- }
371
- klog .InfoS ("gRPC health server stopped" )
323
+ klog .InfoS ("http server starting" , "port" , httpServer .Addr )
324
+ klog .InfoS ("http server stopped" , "exitCode" , httpServer .ListenAndServe ())
372
325
}()
373
- m .healthServer = s
326
+ defer httpServer .Close ()
327
+
328
+ <- m .stop
329
+ klog .InfoS ("shutting down nfd-master" )
374
330
return nil
375
331
}
376
332
333
+ func (m * nfdMaster ) Healthz (writer http.ResponseWriter , _ * http.Request ) {
334
+ writer .WriteHeader (http .StatusOK )
335
+ }
336
+
377
337
// nfdAPIUpdateHandler handles events from the nfd API controller.
378
338
func (m * nfdMaster ) nfdAPIUpdateHandler () {
379
- // We want to unconditionally update all nodes at startup if gRPC is
380
- // disabled (i.e. NodeFeature API is enabled)
339
+ // We want to unconditionally update all nodes at startup
381
340
updateAll := true
382
341
updateNodes := make (map [string ]struct {})
383
342
nodeFeatureGroup := make (map [string ]struct {})
@@ -431,13 +390,6 @@ func (m *nfdMaster) nfdAPIUpdateHandler() {
431
390
432
391
// Stop NfdMaster
433
392
func (m * nfdMaster ) Stop () {
434
- if m .server != nil {
435
- m .server .GracefulStop ()
436
- }
437
- if m .healthServer != nil {
438
- m .healthServer .GracefulStop ()
439
- }
440
-
441
393
if m .nfdController != nil {
442
394
m .nfdController .stop ()
443
395
}
@@ -447,16 +399,6 @@ func (m *nfdMaster) Stop() {
447
399
close (m .stop )
448
400
}
449
401
450
- // Wait until NfdMaster is able able to accept connections.
451
- func (m * nfdMaster ) WaitForReady (timeout time.Duration ) bool {
452
- select {
453
- case <- m .ready :
454
- return true
455
- case <- time .After (timeout ):
456
- }
457
- return false
458
- }
459
-
460
402
// Prune erases all NFD related properties from the node objects of the cluster.
461
403
func (m * nfdMaster ) prune () error {
462
404
if m .config .NoPublish {
0 commit comments