Skip to content

Commit 7ad5dec

Browse files
author
Xuewei Zhang
committed
Add disk metrics support.
1 parent 23dc265 commit 7ad5dec

File tree

9 files changed

+604
-2
lines changed

9 files changed

+604
-2
lines changed

cmd/plugins.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ package main
2020
import (
2121
_ "k8s.io/node-problem-detector/pkg/custompluginmonitor"
2222
_ "k8s.io/node-problem-detector/pkg/systemlogmonitor"
23+
_ "k8s.io/node-problem-detector/pkg/systemstatsmonitor"
2324
)

config/system-stats-monitor.json

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"disk": {
3+
"metricsConfigs": {
4+
"disk/io_time": {
5+
"displayName": "disk/io_time"
6+
},
7+
"disk/weighted_io": {
8+
"displayName": "disk/weighted_io"
9+
},
10+
"disk/avg_queue_len": {
11+
"displayName": "disk/avg_queue_len"
12+
}
13+
},
14+
"includeRootBlk": true,
15+
"includeAllAttachedBlk": true,
16+
"lsblkTimeout": "5s"
17+
},
18+
"invokeInterval": "60s"
19+
}

pkg/custompluginmonitor/types/config_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,11 +261,11 @@ func TestCustomPluginConfigValidate(t *testing.T) {
261261
err := utMeta.Conf.Validate()
262262
if err != nil && !utMeta.IsError {
263263
t.Error(desp)
264-
t.Errorf("Error in validating custom plugin configuration %+v. Want an error got nil", utMeta)
264+
t.Errorf("Error in validating custom plugin configuration %+v. Wanted nil got an error", utMeta)
265265
}
266266
if err == nil && utMeta.IsError {
267267
t.Error(desp)
268-
t.Errorf("Error in validating custom plugin configuration %+v. Want nil got an error", utMeta)
268+
t.Errorf("Error in validating custom plugin configuration %+v. Wanted an error got nil", utMeta)
269269
}
270270
}
271271
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package systemstatsmonitor
18+
19+
import (
20+
"context"
21+
"os/exec"
22+
"strings"
23+
"time"
24+
25+
"github.com/golang/glog"
26+
"github.com/shirou/gopsutil/disk"
27+
"go.opencensus.io/stats"
28+
"go.opencensus.io/stats/view"
29+
"go.opencensus.io/tag"
30+
ssmtypes "k8s.io/node-problem-detector/pkg/systemstatsmonitor/types"
31+
)
32+
33+
type diskCollector struct {
34+
keyDevice tag.Key
35+
mIOTime *stats.Int64Measure
36+
mWeightedIO *stats.Int64Measure
37+
mAvgQueueLen *stats.Float64Measure
38+
39+
config *ssmtypes.DiskStatsConfig
40+
41+
historyIOTime map[string]uint64
42+
historyWeightedIO map[string]uint64
43+
}
44+
45+
func NewDiskCollectorOrDie(diskConfig *ssmtypes.DiskStatsConfig) *diskCollector {
46+
dc := diskCollector{config: diskConfig}
47+
dc.keyDevice, _ = tag.NewKey("device")
48+
49+
dc.mIOTime = newInt64Metric(
50+
diskConfig.MetricsConfigs["disk/io_time"].DisplayName,
51+
"The IO time spent on the disk",
52+
"second",
53+
view.LastValue(),
54+
[]tag.Key{dc.keyDevice})
55+
56+
dc.mWeightedIO = newInt64Metric(
57+
diskConfig.MetricsConfigs["disk/weighted_io"].DisplayName,
58+
"The weighted IO on the disk",
59+
"second",
60+
view.LastValue(),
61+
[]tag.Key{dc.keyDevice})
62+
63+
dc.mAvgQueueLen = newFloat64Metric(
64+
diskConfig.MetricsConfigs["disk/avg_queue_len"].DisplayName,
65+
"The average queue length on the disk",
66+
"second",
67+
view.LastValue(),
68+
[]tag.Key{dc.keyDevice})
69+
70+
dc.historyIOTime = make(map[string]uint64)
71+
dc.historyWeightedIO = make(map[string]uint64)
72+
73+
return &dc
74+
}
75+
76+
func (dc *diskCollector) collect() {
77+
if dc == nil {
78+
return
79+
}
80+
81+
blks := []string{}
82+
if dc.config.IncludeRootBlk {
83+
blks = append(blks, listRootBlockDevices(dc.config.LsblkTimeout)...)
84+
}
85+
if dc.config.IncludeAllAttachedBlk {
86+
blks = append(blks, listAttachedBlockDevices()...)
87+
}
88+
89+
ioCountersStats, _ := disk.IOCounters(blks...)
90+
91+
for deviceName, ioCountersStat := range ioCountersStats {
92+
// Calculate average IO queue length since last measurement.
93+
lastIOTime := dc.historyIOTime[deviceName]
94+
lastWeightedIO := dc.historyWeightedIO[deviceName]
95+
96+
dc.historyIOTime[deviceName] = ioCountersStat.IoTime
97+
dc.historyWeightedIO[deviceName] = ioCountersStat.WeightedIO
98+
99+
avg_queue_len := float64(0.0)
100+
if lastIOTime != ioCountersStat.IoTime {
101+
avg_queue_len = float64(ioCountersStat.WeightedIO-lastWeightedIO) / float64(ioCountersStat.IoTime-lastIOTime)
102+
}
103+
104+
// Attach label {"device": deviceName} to the metrics.
105+
device_ctx, _ := tag.New(context.Background(), tag.Upsert(dc.keyDevice, deviceName))
106+
if dc.mIOTime != nil {
107+
stats.Record(device_ctx, dc.mIOTime.M(int64(ioCountersStat.IoTime)))
108+
}
109+
if dc.mWeightedIO != nil {
110+
stats.Record(device_ctx, dc.mWeightedIO.M(int64(ioCountersStat.WeightedIO)))
111+
}
112+
if dc.mAvgQueueLen != nil {
113+
stats.Record(device_ctx, dc.mAvgQueueLen.M(avg_queue_len))
114+
}
115+
}
116+
}
117+
118+
// listRootBlockDevices lists all block devices that's not a slave or holder.
119+
func listRootBlockDevices(timeout time.Duration) []string {
120+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
121+
defer cancel()
122+
123+
// "-d" prevents printing slave or holder devices. i.e. /dev/sda1, /dev/sda2...
124+
// "-n" prevents printing the headings.
125+
// "-p NAME" specifies to only print the device name.
126+
cmd := exec.CommandContext(ctx, "lsblk", "-d", "-n", "-o", "NAME")
127+
stdout, err := cmd.Output()
128+
if err != nil {
129+
glog.Errorf("Error calling lsblk")
130+
}
131+
return strings.Split(strings.TrimSpace(string(stdout)), "\n")
132+
}
133+
134+
// listAttachedBlockDevices lists all currently attached block devices.
135+
func listAttachedBlockDevices() []string {
136+
partitions, _ := disk.Partitions(false)
137+
blks := []string{}
138+
for _, partition := range partitions {
139+
blks = append(blks, partition.Device)
140+
}
141+
return blks
142+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package systemstatsmonitor
18+
19+
import (
20+
"go.opencensus.io/stats"
21+
"go.opencensus.io/stats/view"
22+
"go.opencensus.io/tag"
23+
)
24+
25+
// newInt64Metric create a stats.Int64 metrics, returns nil when name is empty.
26+
func newInt64Metric(name string, description string, unit string, aggregation *view.Aggregation, tagKeys []tag.Key) *stats.Int64Measure {
27+
if name == "" {
28+
return nil
29+
}
30+
measure := stats.Int64(name, description, unit)
31+
newView := &view.View{
32+
Name: name,
33+
Measure: measure,
34+
Description: description,
35+
Aggregation: aggregation,
36+
TagKeys: tagKeys,
37+
}
38+
view.Register(newView)
39+
return measure
40+
}
41+
42+
// newFloat64Metric create a stats.Float64 metrics, returns nil when name is empty.
43+
func newFloat64Metric(name string, description string, unit string, aggregation *view.Aggregation, tagKeys []tag.Key) *stats.Float64Measure {
44+
if name == "" {
45+
return nil
46+
}
47+
measure := stats.Float64(name, description, unit)
48+
newView := &view.View{
49+
Name: name,
50+
Measure: measure,
51+
Description: description,
52+
Aggregation: aggregation,
53+
TagKeys: tagKeys,
54+
}
55+
view.Register(newView)
56+
return measure
57+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package systemstatsmonitor
18+
19+
import (
20+
"encoding/json"
21+
"io/ioutil"
22+
"time"
23+
24+
"github.com/golang/glog"
25+
"k8s.io/node-problem-detector/pkg/problemdaemon"
26+
ssmtypes "k8s.io/node-problem-detector/pkg/systemstatsmonitor/types"
27+
"k8s.io/node-problem-detector/pkg/types"
28+
"k8s.io/node-problem-detector/pkg/util/tomb"
29+
)
30+
31+
const SystemStatsMonitorName = "system-stats-monitor"
32+
33+
func init() {
34+
problemdaemon.Register(SystemStatsMonitorName, types.ProblemDaemonHandler{
35+
CreateProblemDaemonOrDie: NewSystemStatsMonitorOrDie,
36+
CmdOptionDescription: "Set to config file paths."})
37+
}
38+
39+
type systemStatsMonitor struct {
40+
config ssmtypes.SystemStatsConfig
41+
diskCollector *diskCollector
42+
tomb *tomb.Tomb
43+
}
44+
45+
// NewSystemStatsMonitorOrDie creates a system stats monitor.
46+
func NewSystemStatsMonitorOrDie(configPath string) types.Monitor {
47+
ssm := systemStatsMonitor{
48+
tomb: tomb.NewTomb(),
49+
}
50+
51+
// Apply configurations.
52+
f, err := ioutil.ReadFile(configPath)
53+
if err != nil {
54+
glog.Fatalf("Failed to read configuration file %q: %v", configPath, err)
55+
}
56+
err = json.Unmarshal(f, &ssm.config)
57+
if err != nil {
58+
glog.Fatalf("Failed to unmarshal configuration file %q: %v", configPath, err)
59+
}
60+
61+
err = ssm.config.ApplyConfiguration()
62+
if err != nil {
63+
glog.Fatalf("Failed to apply configuration for %q: %v", configPath, err)
64+
}
65+
66+
err = ssm.config.Validate()
67+
if err != nil {
68+
glog.Fatalf("Failed to validate configuration %+v: %v", ssm.config, err)
69+
}
70+
71+
// Initialize diskCollector if needed.
72+
if len(ssm.config.DiskConfig.MetricsConfigs) > 0 {
73+
ssm.diskCollector = NewDiskCollectorOrDie(&ssm.config.DiskConfig)
74+
}
75+
return &ssm
76+
}
77+
78+
func (ssm *systemStatsMonitor) Start() (<-chan *types.Status, error) {
79+
glog.Info("Start system stats monitor")
80+
go ssm.monitorLoop()
81+
return nil, nil
82+
}
83+
84+
func (ssm *systemStatsMonitor) monitorLoop() {
85+
defer ssm.tomb.Done()
86+
87+
runTicker := time.NewTicker(ssm.config.InvokeInterval)
88+
defer runTicker.Stop()
89+
90+
select {
91+
case <-ssm.tomb.Stopping():
92+
glog.Infof("System stats monitor stopped")
93+
return
94+
default:
95+
ssm.diskCollector.collect()
96+
}
97+
98+
for {
99+
select {
100+
case <-runTicker.C:
101+
ssm.diskCollector.collect()
102+
case <-ssm.tomb.Stopping():
103+
glog.Infof("System stats monitor stopped")
104+
return
105+
}
106+
}
107+
}
108+
109+
func (ssm *systemStatsMonitor) Stop() {
110+
glog.Info("Stop system stats monitor")
111+
ssm.tomb.Stop()
112+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package systemstatsmonitor
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
24+
"k8s.io/node-problem-detector/pkg/problemdaemon"
25+
)
26+
27+
func TestRegistration(t *testing.T) {
28+
assert.NotPanics(t,
29+
func() { problemdaemon.GetProblemDaemonHandlerOrDie(SystemStatsMonitorName) },
30+
"System stats monitor failed to register itself as a problem daemon.")
31+
}

0 commit comments

Comments
 (0)