Skip to content

Commit 5814195

Browse files
author
Xuewei Zhang
committed
Move apiserver-reporting logic into k8s_exporter.
Added CLI option "enable-k8s-exporter" (default to true). Users can use this option to enable/disable exporting to Kubernetes control plane. This commit also removes all the apiserver-specific logic from package problemdetector. Future exporters (e.g. to local journald, Prometheus, other control planes) should implement types.Exporter interface.
1 parent df2bc3d commit 5814195

File tree

10 files changed

+130
-63
lines changed

10 files changed

+130
-63
lines changed

cmd/node_problem_detector.go

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,43 +17,20 @@ limitations under the License.
1717
package main
1818

1919
import (
20-
"net"
21-
"net/http"
22-
_ "net/http/pprof"
2320
"os"
24-
"strconv"
2521

2622
"github.com/golang/glog"
2723
"github.com/spf13/pflag"
2824

2925
"k8s.io/node-problem-detector/cmd/options"
3026
"k8s.io/node-problem-detector/pkg/custompluginmonitor"
31-
"k8s.io/node-problem-detector/pkg/problemclient"
27+
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter"
3228
"k8s.io/node-problem-detector/pkg/problemdetector"
3329
"k8s.io/node-problem-detector/pkg/systemlogmonitor"
3430
"k8s.io/node-problem-detector/pkg/types"
3531
"k8s.io/node-problem-detector/pkg/version"
3632
)
3733

38-
func startHTTPServer(p problemdetector.ProblemDetector, npdo *options.NodeProblemDetectorOptions) {
39-
// Add healthz http request handler. Always return ok now, add more health check
40-
// logic in the future.
41-
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
42-
w.WriteHeader(http.StatusOK)
43-
w.Write([]byte("ok"))
44-
})
45-
// Add the http handlers in problem detector.
46-
p.RegisterHTTPHandlers()
47-
48-
addr := net.JoinHostPort(npdo.ServerAddress, strconv.Itoa(npdo.ServerPort))
49-
go func() {
50-
err := http.ListenAndServe(addr, nil)
51-
if err != nil {
52-
glog.Fatalf("Failed to start server: %v", err)
53-
}
54-
}()
55-
}
56-
5734
func main() {
5835
npdo := options.NewNodeProblemDetectorOptions()
5936
npdo.AddFlags(pflag.CommandLine)
@@ -87,14 +64,19 @@ func main() {
8764
}
8865
monitors[config] = custompluginmonitor.NewCustomPluginMonitorOrDie(config)
8966
}
90-
c := problemclient.NewClientOrDie(npdo)
91-
p := problemdetector.NewProblemDetector(monitors, c)
9267

93-
// Start http server.
94-
if npdo.ServerPort > 0 {
95-
startHTTPServer(p, npdo)
68+
// Initialize exporters.
69+
exporters := []types.Exporter{}
70+
if ke := k8sexporter.NewExporterOrDie(npdo); ke != nil {
71+
exporters = append(exporters, ke)
72+
glog.Info("K8s exporter started.")
73+
}
74+
if len(exporters) == 0 {
75+
glog.Fatalf("No exporter is successfully setup")
9676
}
9777

78+
// Initialize NPD core.
79+
p := problemdetector.NewProblemDetector(monitors, exporters)
9880
if err := p.Run(); err != nil {
9981
glog.Fatalf("Problem detector failed with error: %v", err)
10082
}

cmd/options/options.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ type NodeProblemDetectorOptions struct {
3636
// CustomPluginMonitorConfigPaths specifies the list of paths to custom plugin monitor configuration
3737
// files.
3838
CustomPluginMonitorConfigPaths []string
39-
// ApiServerOverride is the custom URI used to connect to Kubernetes ApiServer.
40-
ApiServerOverride string
4139
// PrintVersion is the flag determining whether version information is printed.
4240
PrintVersion bool
4341
// HostnameOverride specifies custom node name used to override hostname.
@@ -47,6 +45,14 @@ type NodeProblemDetectorOptions struct {
4745
// ServerAddress is the address to bind the node problem detector server.
4846
ServerAddress string
4947

48+
// exporter options
49+
50+
// k8sExporter options
51+
// EnableK8sExporter is the flag determining whether to report to Kubernetes.
52+
EnableK8sExporter bool
53+
// ApiServerOverride is the custom URI used to connect to Kubernetes ApiServer.
54+
ApiServerOverride string
55+
5056
// application options
5157

5258
// NodeName is the node name used to communicate with Kubernetes ApiServer.
@@ -63,8 +69,9 @@ func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) {
6369
[]string{}, "List of paths to system log monitor config files, comma separated.")
6470
fs.StringSliceVar(&npdo.CustomPluginMonitorConfigPaths, "custom-plugin-monitors",
6571
[]string{}, "List of paths to custom plugin monitor config files, comma separated.")
72+
fs.BoolVar(&npdo.EnableK8sExporter, "enable-k8s-exporter", true, "Enables reporting to Kubernetes API server.")
6673
fs.StringVar(&npdo.ApiServerOverride, "apiserver-override",
67-
"", "Custom URI used to connect to Kubernetes ApiServer")
74+
"", "Custom URI used to connect to Kubernetes ApiServer. This is ignored if --enable-k8s-exporter is false.")
6875
fs.BoolVar(&npdo.PrintVersion, "version", false, "Print version information and quit")
6976
fs.StringVar(&npdo.HostnameOverride, "hostname-override",
7077
"", "Custom node name used to override hostname")
@@ -76,7 +83,7 @@ func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) {
7683

7784
// ValidOrDie validates node problem detector command line options.
7885
func (npdo *NodeProblemDetectorOptions) ValidOrDie() {
79-
if _, err := url.Parse(npdo.ApiServerOverride); err != nil {
86+
if _, err := url.Parse(npdo.ApiServerOverride); npdo.EnableK8sExporter && err != nil {
8087
panic(fmt.Sprintf("apiserver-override %q is not a valid HTTP URI: %v",
8188
npdo.ApiServerOverride, err))
8289
}

pkg/condition/manager.go renamed to pkg/exporters/k8sexporter/condition/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"sync"
2222
"time"
2323

24-
"k8s.io/node-problem-detector/pkg/problemclient"
24+
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter/problemclient"
2525
"k8s.io/node-problem-detector/pkg/types"
2626
problemutil "k8s.io/node-problem-detector/pkg/util"
2727

pkg/condition/manager_test.go renamed to pkg/exporters/k8sexporter/condition/manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323

2424
"github.com/stretchr/testify/assert"
2525

26-
"k8s.io/node-problem-detector/pkg/problemclient"
26+
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter/problemclient"
2727
"k8s.io/node-problem-detector/pkg/types"
2828
problemutil "k8s.io/node-problem-detector/pkg/util"
2929

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package k8sexporter
18+
19+
import (
20+
"net"
21+
"net/http"
22+
_ "net/http/pprof"
23+
"strconv"
24+
25+
"github.com/golang/glog"
26+
27+
"k8s.io/apimachinery/pkg/util/clock"
28+
29+
"k8s.io/node-problem-detector/cmd/options"
30+
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter/condition"
31+
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter/problemclient"
32+
"k8s.io/node-problem-detector/pkg/types"
33+
"k8s.io/node-problem-detector/pkg/util"
34+
)
35+
36+
type k8sExporter struct {
37+
client problemclient.Client
38+
conditionManager condition.ConditionManager
39+
}
40+
41+
// NewExporterOrDie creates a exporter for Kubernetes apiserver exporting, panics if error occurs.
42+
func NewExporterOrDie(npdo *options.NodeProblemDetectorOptions) types.Exporter {
43+
if !npdo.EnableK8sExporter {
44+
return nil
45+
}
46+
47+
c := problemclient.NewClientOrDie(npdo)
48+
ke := k8sExporter{
49+
client: c,
50+
conditionManager: condition.NewConditionManager(c, clock.RealClock{}),
51+
}
52+
53+
ke.startHTTPReporting(npdo)
54+
ke.conditionManager.Start()
55+
56+
return &ke
57+
}
58+
59+
func (ke *k8sExporter) ExportProblems(status *types.Status) {
60+
for _, event := range status.Events {
61+
ke.client.Eventf(util.ConvertToAPIEventType(event.Severity), status.Source, event.Reason, event.Message)
62+
}
63+
for _, cdt := range status.Conditions {
64+
ke.conditionManager.UpdateCondition(cdt)
65+
}
66+
}
67+
68+
func (ke *k8sExporter) startHTTPReporting(npdo *options.NodeProblemDetectorOptions) {
69+
if npdo.ServerPort <= 0 {
70+
return
71+
}
72+
mux := http.NewServeMux()
73+
74+
// Add healthz http request handler. Always return ok now, add more health check
75+
// logic in the future.
76+
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
77+
w.WriteHeader(http.StatusOK)
78+
w.Write([]byte("ok"))
79+
})
80+
81+
// Add the handler to serve condition http request.
82+
mux.HandleFunc("/conditions", func(w http.ResponseWriter, r *http.Request) {
83+
util.ReturnHTTPJson(w, ke.conditionManager.GetConditions())
84+
})
85+
86+
addr := net.JoinHostPort(npdo.ServerAddress, strconv.Itoa(npdo.ServerPort))
87+
go func() {
88+
err := http.ListenAndServe(addr, mux)
89+
if err != nil {
90+
glog.Fatalf("Failed to start server: %v", err)
91+
}
92+
}()
93+
}

pkg/problemdetector/problem_detector.go

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,33 @@ package problemdetector
1818

1919
import (
2020
"fmt"
21-
"net/http"
2221

2322
"github.com/golang/glog"
2423

25-
"k8s.io/apimachinery/pkg/util/clock"
26-
27-
"k8s.io/node-problem-detector/pkg/condition"
28-
"k8s.io/node-problem-detector/pkg/problemclient"
2924
"k8s.io/node-problem-detector/pkg/types"
30-
"k8s.io/node-problem-detector/pkg/util"
3125
)
3226

3327
// ProblemDetector collects statuses from all problem daemons and update the node condition and send node event.
3428
type ProblemDetector interface {
3529
Run() error
36-
RegisterHTTPHandlers()
3730
}
3831

3932
type problemDetector struct {
40-
client problemclient.Client
41-
conditionManager condition.ConditionManager
42-
monitors map[string]types.Monitor
33+
monitors map[string]types.Monitor
34+
exporters []types.Exporter
4335
}
4436

4537
// NewProblemDetector creates the problem detector. Currently we just directly passed in the problem daemons, but
4638
// in the future we may want to let the problem daemons register themselves.
47-
func NewProblemDetector(monitors map[string]types.Monitor, client problemclient.Client) ProblemDetector {
39+
func NewProblemDetector(monitors map[string]types.Monitor, exporters []types.Exporter) ProblemDetector {
4840
return &problemDetector{
49-
client: client,
50-
conditionManager: condition.NewConditionManager(client, clock.RealClock{}),
51-
monitors: monitors,
41+
monitors: monitors,
42+
exporters: exporters,
5243
}
5344
}
5445

5546
// Run starts the problem detector.
5647
func (p *problemDetector) Run() error {
57-
p.conditionManager.Start()
5848
// Start the log monitors one by one.
5949
var chans []<-chan *types.Status
6050
for cfg, m := range p.monitors {
@@ -75,24 +65,13 @@ func (p *problemDetector) Run() error {
7565
for {
7666
select {
7767
case status := <-ch:
78-
for _, event := range status.Events {
79-
p.client.Eventf(util.ConvertToAPIEventType(event.Severity), status.Source, event.Reason, event.Message)
80-
}
81-
for _, cdt := range status.Conditions {
82-
p.conditionManager.UpdateCondition(cdt)
68+
for _, exporter := range p.exporters {
69+
exporter.ExportProblems(status)
8370
}
8471
}
8572
}
8673
}
8774

88-
// RegisterHTTPHandlers registers http handlers of node problem detector.
89-
func (p *problemDetector) RegisterHTTPHandlers() {
90-
// Add the handler to serve condition http request.
91-
http.HandleFunc("/conditions", func(w http.ResponseWriter, r *http.Request) {
92-
util.ReturnHTTPJson(w, p.conditionManager.GetConditions())
93-
})
94-
}
95-
9675
func groupChannel(chans []<-chan *types.Status) <-chan *types.Status {
9776
statuses := make(chan *types.Status)
9877
for _, ch := range chans {

pkg/types/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,9 @@ type Monitor interface {
107107
// Stop stops the log monitor.
108108
Stop()
109109
}
110+
111+
// Exporter exports machine health data to certain control plane.
112+
type Exporter interface {
113+
// Export problems to the control plane.
114+
ExportProblems(*Status)
115+
}

0 commit comments

Comments
 (0)