Skip to content
This repository was archived by the owner on Dec 1, 2018. It is now read-only.

Commit 84da166

Browse files
add more metrics to Stackdriver sink
1 parent b15cdc0 commit 84da166

File tree

2 files changed

+137
-9
lines changed

2 files changed

+137
-9
lines changed

docs/sink-owners.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ List of Owners
2727
| --------------- | ------------------ | -------------------| --------------------------------------------- | -------------- |
2828
| ElasticSearch | :heavy_check_mark: | :heavy_check_mark: | @AlmogBaku / @andyxning / @huangyuqi | :ok: |
2929
| GCM | :heavy_check_mark: | :x: | @kubernetes/heapster-maintainers | :ok: |
30+
| Stackdriver | :heavy_check_mark: | :x: | @kubernetes/heapster-maintainers | :ok: |
3031
| Hawkular | :heavy_check_mark: | :x: | @burmanm / @mwringe | :ok: |
3132
| InfluxDB | :heavy_check_mark: | :heavy_check_mark: | @kubernetes/heapster-maintainers / @andyxning | :ok: |
3233
| Metric (memory) | :heavy_check_mark: | :x: | @kubernetes/heapster-maintainers | :ok: |

metrics/sinks/stackdriver/stackdriver.go

Lines changed: 136 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,59 @@ type metricMetadata struct {
4545
}
4646

4747
var (
48+
cpuReservedCoresMD = &metricMetadata{
49+
MetricKind: "GAUGE",
50+
ValueType: "DOUBLE",
51+
Name: "container.googleapis.com/container/cpu/reserved_cores",
52+
}
53+
54+
cpuUsageTimeMD = &metricMetadata{
55+
MetricKind: "CUMULATIVE",
56+
ValueType: "DOUBLE",
57+
Name: "container.googleapis.com/container/cpu/usage_time",
58+
}
59+
4860
uptimeMD = &metricMetadata{
4961
MetricKind: "CUMULATIVE",
5062
ValueType: "DOUBLE",
5163
Name: "container.googleapis.com/container/uptime",
5264
}
65+
66+
utilizationMD = &metricMetadata{
67+
MetricKind: "GAUGE",
68+
ValueType: "DOUBLE",
69+
Name: "container.googleapis.com/container/cpu/utilization",
70+
}
71+
72+
networkRxMD = &metricMetadata{
73+
MetricKind: "CUMULATIVE",
74+
ValueType: "INT64",
75+
Name: "container.googleapis.com/container/network/received_bytes_count",
76+
}
77+
78+
networkTxMD = &metricMetadata{
79+
MetricKind: "CUMULATIVE",
80+
ValueType: "INT64",
81+
Name: "container.googleapis.com/container/network/sent_bytes_count",
82+
}
83+
84+
memoryLimitMD = &metricMetadata{
85+
MetricKind: "GAUGE",
86+
ValueType: "INT64",
87+
Name: "container.googleapis.com/container/memory/bytes_total",
88+
}
89+
90+
memoryBytesUsedMD = &metricMetadata{
91+
MetricKind: "GAUGE",
92+
ValueType: "INT64",
93+
Name: "container.googleapis.com/container/memory/bytes_used",
94+
}
95+
96+
memoryPageFaultsMD = &metricMetadata{
97+
MetricKind: "CUMULATIVE",
98+
ValueType: "INT64",
99+
Name: "container.googleapis.com/container/memory/page_fault_count",
100+
}
53101
)
54102

55103
func (sink *stackdriverSink) Name() string {
@@ -62,8 +110,19 @@ func (sink *stackdriverSink) Stop() {
62110

63111
func (sink *stackdriverSink) ExportData(dataBatch *core.DataBatch) {
64112
req := getReq()
65-
66113
for _, metricSet := range dataBatch.MetricSets {
114+
switch metricSet.Labels["type"] {
115+
case core.MetricSetTypeNode, core.MetricSetTypePod, core.MetricSetTypePodContainer, core.MetricSetTypeSystemContainer:
116+
default:
117+
continue
118+
}
119+
120+
if metricSet.Labels["type"] == core.MetricSetTypeNode {
121+
metricSet.Labels[core.LabelContainerName.Key] = "machine"
122+
}
123+
124+
sink.preprocessMemoryMetrics(metricSet)
125+
67126
for name, value := range metricSet.MetricValues {
68127
point := sink.translateMetric(dataBatch.Timestamp, metricSet.Labels, name, value, metricSet.CreateTime)
69128

@@ -76,6 +135,10 @@ func (sink *stackdriverSink) ExportData(dataBatch *core.DataBatch) {
76135
}
77136
}
78137
}
138+
139+
if len(req.TimeSeries) > 0 {
140+
sink.sendRequest(req)
141+
}
79142
}
80143

81144
func CreateStackdriverSink(uri *url.URL) (core.DataSink, error) {
@@ -124,17 +187,66 @@ func (sink *stackdriverSink) sendRequest(req *sd_api.CreateTimeSeriesRequest) {
124187
_, err := sink.stackdriverClient.Projects.TimeSeries.Create(fullProjectName(sink.project), req).Do()
125188
if err != nil {
126189
glog.Errorf("Error while sending request to Stackdriver %v", err)
127-
} else {
128-
glog.V(4).Infof("Successfully sent %v timeseries to Stackdriver", len(req.TimeSeries))
129190
}
130191
}
131192

193+
func (sink *stackdriverSink) preprocessMemoryMetrics(metricSet *core.MetricSet) {
194+
usage := metricSet.MetricValues[core.MetricMemoryUsage.MetricDescriptor.Name].IntValue
195+
workingSet := metricSet.MetricValues[core.MetricMemoryWorkingSet.MetricDescriptor.Name].IntValue
196+
bytesUsed := core.MetricValue{
197+
IntValue: usage - workingSet,
198+
}
199+
200+
metricSet.MetricValues["memory/bytes_used"] = bytesUsed
201+
202+
memoryFaults := metricSet.MetricValues[core.MetricMemoryPageFaults.MetricDescriptor.Name].IntValue
203+
majorMemoryFaults := metricSet.MetricValues[core.MetricMemoryMajorPageFaults.MetricDescriptor.Name].IntValue
204+
205+
minorMemoryFaults := core.MetricValue{
206+
IntValue: memoryFaults - majorMemoryFaults,
207+
}
208+
metricSet.MetricValues["memory/minor_page_faults"] = minorMemoryFaults
209+
}
210+
132211
func (sink *stackdriverSink) translateMetric(timestamp time.Time, labels map[string]string, name string, value core.MetricValue, createTime time.Time) *sd_api.TimeSeries {
212+
resourceLabels := sink.getResourceLabels(labels)
133213
switch name {
134214
case core.MetricUptime.MetricDescriptor.Name:
135-
point := sink.uptimePoint(timestamp, createTime, value)
136-
resourceLabels := sink.getResourceLabels(labels)
215+
doubleValue := float64(value.IntValue) / float64(time.Second/time.Millisecond)
216+
point := sink.doublePoint(timestamp, createTime, doubleValue)
137217
return createTimeSeries(resourceLabels, uptimeMD, point)
218+
case core.MetricCpuLimit.MetricDescriptor.Name:
219+
point := sink.doublePoint(timestamp, timestamp, float64(value.FloatValue))
220+
return createTimeSeries(resourceLabels, cpuReservedCoresMD, point)
221+
case core.MetricCpuUsage.MetricDescriptor.Name:
222+
point := sink.doublePoint(timestamp, createTime, float64(value.FloatValue))
223+
return createTimeSeries(resourceLabels, cpuUsageTimeMD, point)
224+
case core.MetricNetworkRx.MetricDescriptor.Name:
225+
point := sink.intPoint(timestamp, createTime, value.IntValue)
226+
return createTimeSeries(resourceLabels, networkRxMD, point)
227+
case core.MetricNetworkTx.MetricDescriptor.Name:
228+
point := sink.intPoint(timestamp, createTime, value.IntValue)
229+
return createTimeSeries(resourceLabels, networkTxMD, point)
230+
case core.MetricMemoryLimit.MetricDescriptor.Name:
231+
point := sink.intPoint(timestamp, timestamp, value.IntValue)
232+
return createTimeSeries(resourceLabels, memoryLimitMD, point)
233+
case core.MetricMemoryMajorPageFaults.MetricDescriptor.Name:
234+
point := sink.intPoint(timestamp, createTime, value.IntValue)
235+
ts := createTimeSeries(resourceLabels, memoryPageFaultsMD, point)
236+
ts.Metric.Labels = map[string]string{
237+
"fault_type": "major",
238+
}
239+
return ts
240+
case "memory/bytes_used":
241+
point := sink.intPoint(timestamp, timestamp, value.IntValue)
242+
return createTimeSeries(resourceLabels, memoryBytesUsedMD, point)
243+
case "memory/minor_page_faults":
244+
point := sink.intPoint(timestamp, createTime, value.IntValue)
245+
ts := createTimeSeries(resourceLabels, memoryPageFaultsMD, point)
246+
ts.Metric.Labels = map[string]string{
247+
"fault_type": "minor",
248+
}
249+
return ts
138250
default:
139251
return nil
140252
}
@@ -167,14 +279,29 @@ func createTimeSeries(resourceLabels map[string]string, metadata *metricMetadata
167279
}
168280
}
169281

170-
func (sink *stackdriverSink) uptimePoint(timestamp time.Time, createTime time.Time, value core.MetricValue) *sd_api.Point {
282+
func (sink *stackdriverSink) doublePoint(endTime time.Time, startTime time.Time, value float64) *sd_api.Point {
283+
return &sd_api.Point{
284+
Interval: &sd_api.TimeInterval{
285+
EndTime: endTime.Format(time.RFC3339),
286+
StartTime: startTime.Format(time.RFC3339),
287+
},
288+
Value: &sd_api.TypedValue{
289+
DoubleValue: value,
290+
ForceSendFields: []string{"DoubleValue"},
291+
},
292+
}
293+
294+
}
295+
296+
func (sink *stackdriverSink) intPoint(endTime time.Time, startTime time.Time, value int64) *sd_api.Point {
171297
return &sd_api.Point{
172298
Interval: &sd_api.TimeInterval{
173-
EndTime: timestamp.Format(time.RFC3339),
174-
StartTime: createTime.Format(time.RFC3339),
299+
EndTime: endTime.Format(time.RFC3339),
300+
StartTime: startTime.Format(time.RFC3339),
175301
},
176302
Value: &sd_api.TypedValue{
177-
DoubleValue: float64(value.IntValue) / float64(time.Second/time.Millisecond),
303+
Int64Value: value,
304+
ForceSendFields: []string{"Int64Value"},
178305
},
179306
}
180307
}

0 commit comments

Comments
 (0)