Skip to content
This repository was archived by the owner on Dec 1, 2018. It is now read-only.

Commit 6c245e7

Browse files
authored
Merge pull request #1848 from bloomberg/statsd_plugin
Added Statsd metrics sink for Heapster
2 parents be70071 + 1a420d4 commit 6c245e7

13 files changed

+1215
-0
lines changed

docs/sink-configuration.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,51 @@ To use the GCL sink add the following flag:
7777
* Where `project_ID` is the project ID of the Google Cloud Platform project.
7878
* Select `kubernetes.io/events` from the `All logs` drop down menu.
7979

80+
### StatsD
81+
This sink supports monitoring metrics only.
82+
To use the StatsD sink add the following flag:
83+
```
84+
--sink="statsd:udp://<HOST>:<PORT>[?<OPTIONS>]"
85+
```
86+
87+
The following options are available:
88+
89+
* `prefix` - Adds specified prefix to all metrics, default is empty
90+
* `protocolType` - Protocol type specifies the message format, it can be either etsystatsd or influxstatsd, default is etsystatsd
91+
* `numMetricsPerMsg` - number of metrics to be packed in an UDP message, default is 5
92+
* `renameLabels` - renames labels, old and new label separated by ':' and pairs of old and new labels separated by ','
93+
* `allowedLabels` - comma-separated labels that are allowed, default is empty ie all labels are allowed
94+
* `labelStyle` - convert labels from default snake case to other styles, default is no convertion. Styles supported are `lowerCamelCase` and `upperCamelCase`
95+
96+
For example.
97+
```
98+
--sink="statsd:udp://127.0.0.1:4125?prefix=kubernetes.example.&protocolType=influxstatsd&numMetricsPerMsg=10&renameLabels=type:k8s_type,host_name:k8s_host_name&allowedLabels=container_name,namespace_name,type,host_name&labelStyle=lowerCamelCase"
99+
```
100+
101+
#### etsystatsd metrics format
102+
103+
| Metric Set Type | Metrics Format |
104+
|:----------------|:--------------------------------------------------------------------------------------|
105+
| Cluster | `<PREFIX>.<SUFFIX>` |
106+
| Node | `<PREFIX>.node.<NODE>.<SUFFIX>` |
107+
| Namespace | `<PREFIX>.namespace.<NAMESPACE>.<SUFFIX>` |
108+
| Pod | `<PREFIX>.node.<NODE>.namespace.<NAMESPACE>.pod.<POD>.<SUFFIX>` |
109+
| PodContainer | `<PREFIX>.node.<NODE>.namespace.<NAMESPACE>.pod.<POD>.container.<CONTAINER>.<SUFFIX>` |
110+
| SystemContainer | `<PREFIX>.node.<NODE>.sys-container.<SYS-CONTAINER>.<SUFFIX>` |
111+
112+
* `PREFIX` - configured prefix
113+
* `SUFFIX` - `[.<USER_LABELS>].<METRIC>[.<RESOURCE_ID>]`
114+
* `USER_LABELS` - user provided labels `[.<KEY1>.<VAL1>][.<KEY2>.<VAL2>] ...`
115+
* `METRIC` - metric name, eg: filesystem/usage
116+
* `RESOURCE_ID` - An unique identifier used to differentiate multiple metrics of the same type. eg: FS partitions under filesystem/usage
117+
118+
#### influxstatsd metrics format
119+
Influx StatsD is very similar to Etsy StatsD. Tags are supported by adding a comma-separated list of tags in key=value format.
120+
121+
```
122+
<METRIC>[,<KEY1=VAL1>,<KEY2=VAL2>...]:<METRIC_VALUE>|<METRIC_TYPE>
123+
```
124+
80125
### Hawkular-Metrics
81126
This sink supports monitoring metrics only.
82127
To use the Hawkular-Metrics sink add the following flag:

docs/sink-owners.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,4 @@ List of Owners
3838
| Wavefront | :heavy_check_mark: | :x: | @ezeev | :ok: |
3939
| Librato | :heavy_check_mark: | :x: | @johanneswuerbach | :ok: |
4040
| Honeycomb | :heavy_check_mark: | :heavy_check_mark: | @emfree | :new: #1762 |
41+
| StatsD | :heavy_check_mark: | :x: | @yogeswaran | :ok: |

metrics/sinks/factory.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/heapster/metrics/sinks/opentsdb"
3535
"k8s.io/heapster/metrics/sinks/riemann"
3636
"k8s.io/heapster/metrics/sinks/stackdriver"
37+
"k8s.io/heapster/metrics/sinks/statsd"
3738
"k8s.io/heapster/metrics/sinks/wavefront"
3839
)
3940

@@ -48,6 +49,8 @@ func (this *SinkFactory) Build(uri flags.Uri) (core.DataSink, error) {
4849
return gcm.CreateGCMSink(&uri.Val)
4950
case "stackdriver":
5051
return stackdriver.CreateStackdriverSink(&uri.Val)
52+
case "statsd":
53+
return statsd.NewStatsdSink(&uri.Val)
5154
case "graphite":
5255
return graphite.NewGraphiteSink(&uri.Val)
5356
case "hawkular":

metrics/sinks/statsd/driver.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// Copyright 2016 Google Inc. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package statsd
16+
17+
import (
18+
"fmt"
19+
"github.com/golang/glog"
20+
"k8s.io/heapster/metrics/core"
21+
"net/url"
22+
"strconv"
23+
"strings"
24+
"sync"
25+
)
26+
27+
const (
28+
defaultHost = "localhost:8125"
29+
defaultNumMetricsPerMsg = 5
30+
defaultProtocolType = "etsystatsd"
31+
)
32+
33+
type statsdSink struct {
34+
config statsdConfig
35+
formatter Formatter
36+
client statsdClient
37+
sync.RWMutex
38+
}
39+
40+
type statsdConfig struct {
41+
host string
42+
prefix string
43+
numMetricsPerMsg int
44+
protocolType string
45+
renameLabels map[string]string
46+
allowedLabels map[string]string
47+
customizeLabel CustomizeLabel
48+
}
49+
50+
func getConfig(uri *url.URL) (cfg statsdConfig, err error) {
51+
config := statsdConfig{
52+
host: defaultHost,
53+
prefix: "",
54+
numMetricsPerMsg: defaultNumMetricsPerMsg,
55+
protocolType: defaultProtocolType,
56+
renameLabels: make(map[string]string),
57+
allowedLabels: make(map[string]string),
58+
customizeLabel: nil,
59+
}
60+
61+
if len(uri.Host) > 0 {
62+
config.host = uri.Host
63+
}
64+
opts := uri.Query()
65+
if len(opts["numMetricsPerMsg"]) >= 1 {
66+
val, err := strconv.Atoi(opts["numMetricsPerMsg"][0])
67+
if err != nil {
68+
return config, fmt.Errorf("failed to parse `numMetricsPerMsg` field - %v", err)
69+
}
70+
config.numMetricsPerMsg = val
71+
}
72+
if len(opts["protocolType"]) >= 1 {
73+
config.protocolType = strings.ToLower(opts["protocolType"][0])
74+
}
75+
if len(opts["prefix"]) >= 1 {
76+
config.prefix = opts["prefix"][0]
77+
}
78+
if len(opts["renameLabels"]) >= 1 {
79+
renameLabels := strings.Split(opts["renameLabels"][0], ",")
80+
for _, renameLabel := range renameLabels {
81+
kv := strings.SplitN(renameLabel, ":", 2)
82+
config.renameLabels[kv[0]] = kv[1]
83+
}
84+
}
85+
if len(opts["allowedLabels"]) >= 1 {
86+
allowedLabels := strings.Split(opts["allowedLabels"][0], ",")
87+
for _, allowedLabel := range allowedLabels {
88+
config.allowedLabels[allowedLabel] = allowedLabel
89+
}
90+
}
91+
labelStyle := DefaultLabelStyle
92+
if len(opts["labelStyle"]) >= 1 {
93+
switch opts["labelStyle"][0] {
94+
case "lowerCamelCase":
95+
labelStyle = SnakeToLowerCamel
96+
case "upperCamelCase":
97+
labelStyle = SnakeToUpperCamel
98+
default:
99+
glog.Errorf("invalid labelStyle - %s", opts["labelStyle"][0])
100+
}
101+
}
102+
labelCustomizer := LabelCustomizer{config.renameLabels, labelStyle}
103+
config.customizeLabel = labelCustomizer.Customize
104+
glog.Infof("statsd metrics sink using configuration : %+v", config)
105+
return config, nil
106+
}
107+
108+
func (sink *statsdSink) ExportData(dataBatch *core.DataBatch) {
109+
sink.Lock()
110+
defer sink.Unlock()
111+
112+
var metrics []string
113+
var tmpstr string
114+
var err error
115+
allowAllLabels := len(sink.config.allowedLabels) == 0
116+
for _, metricSet := range dataBatch.MetricSets {
117+
var metricSetLabels map[string]string
118+
if allowAllLabels {
119+
metricSetLabels = metricSet.Labels
120+
} else {
121+
metricSetLabels = make(map[string]string)
122+
for k, v := range metricSet.Labels {
123+
_, allowed := sink.config.allowedLabels[k]
124+
if allowed {
125+
metricSetLabels[k] = v
126+
}
127+
}
128+
}
129+
for metricName, metricValue := range metricSet.MetricValues {
130+
tmpstr, err = sink.formatter.Format(sink.config.prefix, metricName, metricSetLabels, sink.config.customizeLabel, metricValue)
131+
if err != nil {
132+
glog.Errorf("statsd metrics sink - failed to format metrics : %s", err.Error())
133+
continue
134+
}
135+
metrics = append(metrics, tmpstr)
136+
}
137+
for _, metric := range metricSet.LabeledMetrics {
138+
labels := make(map[string]string)
139+
for k, v := range metricSetLabels {
140+
labels[k] = v
141+
}
142+
for k, v := range metric.Labels {
143+
_, allowed := sink.config.allowedLabels[k]
144+
if allowed || allowAllLabels {
145+
labels[k] = v
146+
}
147+
}
148+
tmpstr, err = sink.formatter.Format(sink.config.prefix, metric.Name, labels, sink.config.customizeLabel, metric.MetricValue)
149+
if err != nil {
150+
glog.Errorf("statsd metrics sink - failed to format labeled metrics : %v", err)
151+
continue
152+
}
153+
metrics = append(metrics, tmpstr)
154+
}
155+
}
156+
glog.V(5).Infof("Sending metrics --- %s", metrics)
157+
err = sink.client.send(metrics)
158+
if err != nil {
159+
glog.Errorf("statsd metrics sink - failed to send some metrics : %v", err)
160+
}
161+
}
162+
163+
func (sink *statsdSink) Name() string {
164+
return "StatsD Sink"
165+
}
166+
167+
func (sink *statsdSink) Stop() {
168+
glog.V(2).Info("statsd metrics sink is stopping")
169+
sink.client.close()
170+
}
171+
172+
func NewStatsdSinkWithClient(uri *url.URL, client statsdClient) (sink core.DataSink, err error) {
173+
config, err := getConfig(uri)
174+
if err != nil {
175+
return nil, err
176+
}
177+
formatter, err := NewFormatter(config.protocolType)
178+
if err != nil {
179+
return nil, err
180+
}
181+
glog.V(2).Info("statsd metrics sink is created")
182+
return &statsdSink{
183+
config: config,
184+
formatter: formatter,
185+
client: client,
186+
}, nil
187+
}
188+
189+
func NewStatsdSink(uri *url.URL) (sink core.DataSink, err error) {
190+
config, err := getConfig(uri)
191+
if err != nil {
192+
return nil, err
193+
}
194+
client, err := NewStatsdClient(config.host, config.numMetricsPerMsg)
195+
if err != nil {
196+
return nil, err
197+
}
198+
return NewStatsdSinkWithClient(uri, client)
199+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright 2016 Google Inc. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package statsd
16+
17+
import (
18+
"fmt"
19+
"net/url"
20+
"strings"
21+
"testing"
22+
"time"
23+
24+
"github.com/stretchr/testify/assert"
25+
"k8s.io/heapster/metrics/core"
26+
)
27+
28+
const (
29+
driverUrl = "udp://127.0.0.1:4125?protocolType=influxstatsd&allowedLabels=tag1,tag3"
30+
)
31+
32+
func TestDriverName(t *testing.T) {
33+
url, err := url.Parse(driverUrl)
34+
assert.NoError(t, err)
35+
36+
sink, err := NewStatsdSink(url)
37+
assert.NoError(t, err)
38+
assert.NotNil(t, sink)
39+
40+
assert.Equal(t, "StatsD Sink", sink.Name())
41+
}
42+
43+
func TestDriverExportData(t *testing.T) {
44+
url, err := url.Parse(driverUrl)
45+
assert.NoError(t, err)
46+
47+
client := &dummyStatsdClientImpl{messages: nil}
48+
sink, err := NewStatsdSinkWithClient(url, client)
49+
assert.NoError(t, err)
50+
assert.NotNil(t, sink)
51+
52+
timestamp := time.Now()
53+
54+
m1 := "test.metric.1"
55+
m2 := "test.metric.2"
56+
m3 := "test.metric.3"
57+
m4 := "test.metric.4"
58+
59+
var labels = map[string]string{
60+
"tag1": "value1",
61+
"tag2": "value2",
62+
"tag3": "value3",
63+
}
64+
65+
labelStr := "tag1=value1,tag3=value3"
66+
expectedMsgs := [...]string{
67+
fmt.Sprintf("%s,%s:1|g\n", m1, labelStr),
68+
fmt.Sprintf("%s,%s:2|g\n", m2, labelStr),
69+
fmt.Sprintf("%s,%s:3|g\n", m3, labelStr),
70+
fmt.Sprintf("%s,%s:4|g\n", m4, labelStr),
71+
}
72+
metricSet1 := core.MetricSet{
73+
Labels: labels,
74+
MetricValues: map[string]core.MetricValue{
75+
m1: {
76+
ValueType: core.ValueInt64,
77+
MetricType: core.MetricGauge,
78+
IntValue: 1,
79+
},
80+
m2: {
81+
ValueType: core.ValueInt64,
82+
MetricType: core.MetricGauge,
83+
IntValue: 2,
84+
},
85+
},
86+
}
87+
88+
metricSet2 := core.MetricSet{
89+
Labels: labels,
90+
MetricValues: map[string]core.MetricValue{
91+
m3: {
92+
ValueType: core.ValueInt64,
93+
MetricType: core.MetricGauge,
94+
IntValue: 3,
95+
},
96+
m4: {
97+
ValueType: core.ValueInt64,
98+
MetricType: core.MetricGauge,
99+
IntValue: 4,
100+
},
101+
},
102+
}
103+
104+
dataBatch := &core.DataBatch{
105+
Timestamp: timestamp,
106+
MetricSets: map[string]*core.MetricSet{
107+
"pod1": &metricSet1,
108+
"pod2": &metricSet2,
109+
},
110+
}
111+
112+
sink.ExportData(dataBatch)
113+
114+
res := strings.Join(client.messages, "\n") + "\n"
115+
for _, expectedMsg := range expectedMsgs[:] {
116+
assert.Contains(t, res, expectedMsg)
117+
}
118+
}

0 commit comments

Comments
 (0)