Skip to content
This repository was archived by the owner on Dec 1, 2018. It is now read-only.

Commit 8c6c154

Browse files
authored
Merge pull request #1550 from aleksandra-malinowska/stackdriver-v3-all
add more metrics to Stackdriver sink
2 parents 87ad7ab + b3bf2f1 commit 8c6c154

File tree

2 files changed

+183
-9
lines changed

2 files changed

+183
-9
lines changed

docs/sink-owners.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ List of Owners
2727
| --------------- | ------------------ | -------------------| --------------------------------------------- | -------------- |
2828
| ElasticSearch | :heavy_check_mark: | :heavy_check_mark: | @AlmogBaku / @andyxning / @huangyuqi | :ok: |
2929
| GCM | :heavy_check_mark: | :x: | @kubernetes/heapster-maintainers | :ok: |
30+
| Stackdriver | :heavy_check_mark: | :x: | @kubernetes/heapster-maintainers | :ok: |
3031
| Hawkular | :heavy_check_mark: | :x: | @burmanm / @mwringe | :ok: |
3132
| InfluxDB | :heavy_check_mark: | :heavy_check_mark: | @kubernetes/heapster-maintainers / @andyxning | :ok: |
3233
| Metric (memory) | :heavy_check_mark: | :x: | @kubernetes/heapster-maintainers | :ok: |

metrics/sinks/stackdriver/stackdriver.go

Lines changed: 182 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,71 @@ type metricMetadata struct {
4545
}
4646

4747
var (
48+
cpuReservedCoresMD = &metricMetadata{
49+
MetricKind: "GAUGE",
50+
ValueType: "DOUBLE",
51+
Name: "container.googleapis.com/container/cpu/reserved_cores",
52+
}
53+
54+
cpuUsageTimeMD = &metricMetadata{
55+
MetricKind: "CUMULATIVE",
56+
ValueType: "DOUBLE",
57+
Name: "container.googleapis.com/container/cpu/usage_time",
58+
}
59+
4860
uptimeMD = &metricMetadata{
4961
MetricKind: "CUMULATIVE",
5062
ValueType: "DOUBLE",
5163
Name: "container.googleapis.com/container/uptime",
5264
}
65+
66+
utilizationMD = &metricMetadata{
67+
MetricKind: "GAUGE",
68+
ValueType: "DOUBLE",
69+
Name: "container.googleapis.com/container/cpu/utilization",
70+
}
71+
72+
networkRxMD = &metricMetadata{
73+
MetricKind: "CUMULATIVE",
74+
ValueType: "INT64",
75+
Name: "container.googleapis.com/container/network/received_bytes_count",
76+
}
77+
78+
networkTxMD = &metricMetadata{
79+
MetricKind: "CUMULATIVE",
80+
ValueType: "INT64",
81+
Name: "container.googleapis.com/container/network/sent_bytes_count",
82+
}
83+
84+
memoryLimitMD = &metricMetadata{
85+
MetricKind: "GAUGE",
86+
ValueType: "INT64",
87+
Name: "container.googleapis.com/container/memory/bytes_total",
88+
}
89+
90+
memoryBytesUsedMD = &metricMetadata{
91+
MetricKind: "GAUGE",
92+
ValueType: "INT64",
93+
Name: "container.googleapis.com/container/memory/bytes_used",
94+
}
95+
96+
memoryPageFaultsMD = &metricMetadata{
97+
MetricKind: "CUMULATIVE",
98+
ValueType: "INT64",
99+
Name: "container.googleapis.com/container/memory/page_fault_count",
100+
}
101+
102+
diskBytesUsedMD = &metricMetadata{
103+
MetricKind: "GAUGE",
104+
ValueType: "INT64",
105+
Name: "container.googleapis.com/container/disk/bytes_used",
106+
}
107+
108+
diskBytesTotalMD = &metricMetadata{
109+
MetricKind: "GAUGE",
110+
ValueType: "INT64",
111+
Name: "container.googleapis.com/container/disk/bytes_total",
112+
}
53113
)
54114

55115
func (sink *stackdriverSink) Name() string {
@@ -62,8 +122,19 @@ func (sink *stackdriverSink) Stop() {
62122

63123
func (sink *stackdriverSink) ExportData(dataBatch *core.DataBatch) {
64124
req := getReq()
65-
66125
for _, metricSet := range dataBatch.MetricSets {
126+
switch metricSet.Labels["type"] {
127+
case core.MetricSetTypeNode, core.MetricSetTypePod, core.MetricSetTypePodContainer, core.MetricSetTypeSystemContainer:
128+
default:
129+
continue
130+
}
131+
132+
if metricSet.Labels["type"] == core.MetricSetTypeNode {
133+
metricSet.Labels[core.LabelContainerName.Key] = "machine"
134+
}
135+
136+
sink.preprocessMemoryMetrics(metricSet)
137+
67138
for name, value := range metricSet.MetricValues {
68139
point := sink.translateMetric(dataBatch.Timestamp, metricSet.Labels, name, value, metricSet.CreateTime)
69140

@@ -75,6 +146,22 @@ func (sink *stackdriverSink) ExportData(dataBatch *core.DataBatch) {
75146
req = getReq()
76147
}
77148
}
149+
150+
for _, metric := range metricSet.LabeledMetrics {
151+
point := sink.translateLabeledMetric(dataBatch.Timestamp, metricSet.Labels, metric, metricSet.CreateTime)
152+
153+
if point != nil {
154+
req.TimeSeries = append(req.TimeSeries, point)
155+
}
156+
if len(req.TimeSeries) >= maxTimeseriesPerRequest {
157+
sink.sendRequest(req)
158+
req = getReq()
159+
}
160+
}
161+
}
162+
163+
if len(req.TimeSeries) > 0 {
164+
sink.sendRequest(req)
78165
}
79166
}
80167

@@ -124,17 +211,88 @@ func (sink *stackdriverSink) sendRequest(req *sd_api.CreateTimeSeriesRequest) {
124211
_, err := sink.stackdriverClient.Projects.TimeSeries.Create(fullProjectName(sink.project), req).Do()
125212
if err != nil {
126213
glog.Errorf("Error while sending request to Stackdriver %v", err)
127-
} else {
128-
glog.V(4).Infof("Successfully sent %v timeseries to Stackdriver", len(req.TimeSeries))
214+
}
215+
}
216+
217+
func (sink *stackdriverSink) preprocessMemoryMetrics(metricSet *core.MetricSet) {
218+
usage := metricSet.MetricValues[core.MetricMemoryUsage.MetricDescriptor.Name].IntValue
219+
workingSet := metricSet.MetricValues[core.MetricMemoryWorkingSet.MetricDescriptor.Name].IntValue
220+
bytesUsed := core.MetricValue{
221+
IntValue: usage - workingSet,
222+
}
223+
224+
metricSet.MetricValues["memory/bytes_used"] = bytesUsed
225+
226+
memoryFaults := metricSet.MetricValues[core.MetricMemoryPageFaults.MetricDescriptor.Name].IntValue
227+
majorMemoryFaults := metricSet.MetricValues[core.MetricMemoryMajorPageFaults.MetricDescriptor.Name].IntValue
228+
229+
minorMemoryFaults := core.MetricValue{
230+
IntValue: memoryFaults - majorMemoryFaults,
231+
}
232+
metricSet.MetricValues["memory/minor_page_faults"] = minorMemoryFaults
233+
}
234+
235+
func (sink *stackdriverSink) translateLabeledMetric(timestamp time.Time, labels map[string]string, metric core.LabeledMetric, createTime time.Time) *sd_api.TimeSeries {
236+
resourceLabels := sink.getResourceLabels(labels)
237+
switch metric.Name {
238+
case core.MetricFilesystemUsage.MetricDescriptor.Name:
239+
point := sink.intPoint(timestamp, timestamp, metric.MetricValue.IntValue)
240+
ts := createTimeSeries(resourceLabels, diskBytesUsedMD, point)
241+
ts.Metric.Labels = map[string]string{
242+
"device_name": metric.Labels[core.LabelResourceID.Key],
243+
}
244+
return ts
245+
case core.MetricFilesystemLimit.MetricDescriptor.Name:
246+
point := sink.intPoint(timestamp, timestamp, metric.MetricValue.IntValue)
247+
ts := createTimeSeries(resourceLabels, diskBytesTotalMD, point)
248+
ts.Metric.Labels = map[string]string{
249+
"device_name": metric.Labels[core.LabelResourceID.Key],
250+
}
251+
return ts
252+
default:
253+
return nil
129254
}
130255
}
131256

132257
func (sink *stackdriverSink) translateMetric(timestamp time.Time, labels map[string]string, name string, value core.MetricValue, createTime time.Time) *sd_api.TimeSeries {
258+
resourceLabels := sink.getResourceLabels(labels)
133259
switch name {
134260
case core.MetricUptime.MetricDescriptor.Name:
135-
point := sink.uptimePoint(timestamp, createTime, value)
136-
resourceLabels := sink.getResourceLabels(labels)
261+
doubleValue := float64(value.IntValue) / float64(time.Second/time.Millisecond)
262+
point := sink.doublePoint(timestamp, createTime, doubleValue)
137263
return createTimeSeries(resourceLabels, uptimeMD, point)
264+
case core.MetricCpuLimit.MetricDescriptor.Name:
265+
point := sink.doublePoint(timestamp, timestamp, float64(value.FloatValue))
266+
return createTimeSeries(resourceLabels, cpuReservedCoresMD, point)
267+
case core.MetricCpuUsage.MetricDescriptor.Name:
268+
point := sink.doublePoint(timestamp, createTime, float64(value.FloatValue))
269+
return createTimeSeries(resourceLabels, cpuUsageTimeMD, point)
270+
case core.MetricNetworkRx.MetricDescriptor.Name:
271+
point := sink.intPoint(timestamp, createTime, value.IntValue)
272+
return createTimeSeries(resourceLabels, networkRxMD, point)
273+
case core.MetricNetworkTx.MetricDescriptor.Name:
274+
point := sink.intPoint(timestamp, createTime, value.IntValue)
275+
return createTimeSeries(resourceLabels, networkTxMD, point)
276+
case core.MetricMemoryLimit.MetricDescriptor.Name:
277+
point := sink.intPoint(timestamp, timestamp, value.IntValue)
278+
return createTimeSeries(resourceLabels, memoryLimitMD, point)
279+
case core.MetricMemoryMajorPageFaults.MetricDescriptor.Name:
280+
point := sink.intPoint(timestamp, createTime, value.IntValue)
281+
ts := createTimeSeries(resourceLabels, memoryPageFaultsMD, point)
282+
ts.Metric.Labels = map[string]string{
283+
"fault_type": "major",
284+
}
285+
return ts
286+
case "memory/bytes_used":
287+
point := sink.intPoint(timestamp, timestamp, value.IntValue)
288+
return createTimeSeries(resourceLabels, memoryBytesUsedMD, point)
289+
case "memory/minor_page_faults":
290+
point := sink.intPoint(timestamp, createTime, value.IntValue)
291+
ts := createTimeSeries(resourceLabels, memoryPageFaultsMD, point)
292+
ts.Metric.Labels = map[string]string{
293+
"fault_type": "minor",
294+
}
295+
return ts
138296
default:
139297
return nil
140298
}
@@ -167,14 +325,29 @@ func createTimeSeries(resourceLabels map[string]string, metadata *metricMetadata
167325
}
168326
}
169327

170-
func (sink *stackdriverSink) uptimePoint(timestamp time.Time, createTime time.Time, value core.MetricValue) *sd_api.Point {
328+
func (sink *stackdriverSink) doublePoint(endTime time.Time, startTime time.Time, value float64) *sd_api.Point {
329+
return &sd_api.Point{
330+
Interval: &sd_api.TimeInterval{
331+
EndTime: endTime.Format(time.RFC3339),
332+
StartTime: startTime.Format(time.RFC3339),
333+
},
334+
Value: &sd_api.TypedValue{
335+
DoubleValue: value,
336+
ForceSendFields: []string{"DoubleValue"},
337+
},
338+
}
339+
340+
}
341+
342+
func (sink *stackdriverSink) intPoint(endTime time.Time, startTime time.Time, value int64) *sd_api.Point {
171343
return &sd_api.Point{
172344
Interval: &sd_api.TimeInterval{
173-
EndTime: timestamp.Format(time.RFC3339),
174-
StartTime: createTime.Format(time.RFC3339),
345+
EndTime: endTime.Format(time.RFC3339),
346+
StartTime: startTime.Format(time.RFC3339),
175347
},
176348
Value: &sd_api.TypedValue{
177-
DoubleValue: float64(value.IntValue) / float64(time.Second/time.Millisecond),
349+
Int64Value: value,
350+
ForceSendFields: []string{"Int64Value"},
178351
},
179352
}
180353
}

0 commit comments

Comments
 (0)