Skip to content

Commit d1cbd70

Browse files
authored
add match rule (#64)
1 parent 732b9f6 commit d1cbd70

File tree

6 files changed

+110
-3
lines changed

6 files changed

+110
-3
lines changed

config.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
package main
1616

1717
import (
18+
"fmt"
19+
dto "github.com/prometheus/client_model/go"
20+
"github.com/prometheus/common/expfmt"
21+
"gopkg.in/yaml.v2"
1822
"os"
1923
"strings"
2024
"text/template"
@@ -26,6 +30,7 @@ var (
2630
kafkaBrokerList = "kafka:9092"
2731
kafkaTopic = "metrics"
2832
topicTemplate *template.Template
33+
match = make(map[string]*dto.MetricFamily, 0)
2934
basicauth = false
3035
basicauthUsername = ""
3136
basicauthPassword = ""
@@ -107,6 +112,14 @@ func init() {
107112
kafkaSaslPassword = value
108113
}
109114

115+
if value := os.Getenv("MATCH"); value != "" {
116+
matchList, err := parseMatchList(value)
117+
if err != nil {
118+
logrus.WithError(err).Fatalln("couldn't parse the match rules")
119+
}
120+
match = matchList
121+
}
122+
110123
var err error
111124
serializer, err = parseSerializationFormat(os.Getenv("SERIALIZATION_FORMAT"))
112125
if err != nil {
@@ -119,6 +132,27 @@ func init() {
119132
}
120133
}
121134

135+
func parseMatchList(text string) (map[string]*dto.MetricFamily, error) {
136+
var matchRules []string
137+
err := yaml.Unmarshal([]byte(text), &matchRules)
138+
if err != nil {
139+
return nil, err
140+
}
141+
var metricsList []string
142+
for _, v := range matchRules {
143+
metricsList = append(metricsList, fmt.Sprintf("%s 0\n", v))
144+
}
145+
146+
metricsText := strings.Join(metricsList, "")
147+
148+
var parser expfmt.TextParser
149+
metricFamilies, err := parser.TextToMetricFamilies(strings.NewReader(metricsText))
150+
if err != nil {
151+
return nil, fmt.Errorf("couldn't parse match rules: %s", err)
152+
}
153+
return metricFamilies, nil
154+
}
155+
122156
func parseLogLevel(value string) logrus.Level {
123157
level, err := logrus.ParseLevel(value)
124158

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/linkedin/goavro v2.1.0+incompatible
1414
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
1515
github.com/prometheus/client_golang v0.8.0
16-
github.com/prometheus/client_model v0.2.0 // indirect
16+
github.com/prometheus/client_model v0.2.0
1717
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e
1818
github.com/prometheus/procfs v0.1.3 // indirect
1919
github.com/prometheus/prometheus v2.4.2+incompatible
@@ -23,6 +23,7 @@ require (
2323
google.golang.org/grpc v1.15.0 // indirect
2424
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
2525
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
26+
gopkg.in/yaml.v2 v2.2.8
2627
)
2728

2829
go 1.13

helm/prometheus-kafka-adapter/templates/deployment.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ spec:
6464
name: {{ include "prometheus-kafka-adapter.fullname" . }}
6565
key: BASIC_AUTH_PASSWORD
6666
{{- end }}
67+
{{- if .Values.environment.MATCH }}
68+
- name: MATCH
69+
value: {{ .Values.environment.MATCH | quote }}
70+
{{- end }}
6771
- name: LOG_LEVEL
6872
value: {{ .Values.environment.LOG_LEVEL | quote }}
6973
- name: GIN_MODE

helm/prometheus-kafka-adapter/values.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ environment:
6363
# Kafka SSL broker CA certificate file, defaults to "/ca_cert/ssl_ca_cert.pem" if
6464
# KAFKA_SSL_CA_CERT is provided, "" otherwise
6565
KAFKA_SSL_CA_CERT_FILE:
66+
# defines the match rules, simple metric name match and label match
67+
MATCH:
6668

6769
serviceAccount:
6870
# Specifies whether a service account should be created

serializers.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,16 @@ func Serialize(s Serializer, req *prompb.WriteRequest) (map[string][][]byte, err
4747
t := topic(labels)
4848

4949
for _, sample := range ts.Samples {
50-
epoch := time.Unix(sample.Timestamp/1000, 0).UTC()
50+
name := string(labels["__name__"])
51+
if !filter(name, labels) {
52+
continue
53+
}
5154

55+
epoch := time.Unix(sample.Timestamp/1000, 0).UTC()
5256
m := map[string]interface{}{
5357
"timestamp": epoch.Format(time.RFC3339),
5458
"value": strconv.FormatFloat(sample.Value, 'f', -1, 64),
55-
"name": string(labels["__name__"]),
59+
"name": name,
5660
"labels": labels,
5761
}
5862

@@ -115,3 +119,33 @@ func topic(labels map[string]string) string {
115119
}
116120
return buf.String()
117121
}
122+
123+
func filter(name string, labels map[string]string) bool {
124+
if len(match) == 0 {
125+
return true
126+
}
127+
mf, ok := match[name]
128+
if !ok {
129+
return false
130+
}
131+
132+
for _, m := range mf.Metric {
133+
if len(m.Label) == 0 {
134+
return true
135+
}
136+
137+
labelMatch := true
138+
for _, label := range m.Label {
139+
val, ok := labels[label.GetName()]
140+
if !ok || val != label.GetValue() {
141+
labelMatch = false
142+
break
143+
}
144+
}
145+
146+
if labelMatch {
147+
return true
148+
}
149+
}
150+
return false
151+
}

serializers_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,38 @@ func TestTemplatedTopic(t *testing.T) {
100100
}
101101
}
102102

103+
func TestFilter(t *testing.T) {
104+
rulesText := `['foo{y="2"}','foo', 'bar{x="1"}',
105+
'up{x="1",y="2"}', 'baz{key="valu
106+
e1;value2"}','bar{y="2"}']`
107+
108+
rules, _ := parseMatchList(rulesText)
109+
for _, mf := range rules {
110+
match[mf.GetName()] = mf
111+
}
112+
type TestCase struct {
113+
Name string
114+
Labels map[string]string
115+
Expect bool
116+
}
117+
118+
testList := []TestCase{
119+
{Name: "foo", Labels: map[string]string{"z": "3"}, Expect: true},
120+
{Name: "bar", Labels: map[string]string{"x": "1"}, Expect: true},
121+
{Name: "bar", Labels: map[string]string{"x": "2"}, Expect: false},
122+
{Name: "bar", Labels: map[string]string{"y": "2"}, Expect: true},
123+
{Name: "bar", Labels: map[string]string{"y": "1"}, Expect: false},
124+
{Name: "up", Labels: map[string]string{"x": "1", "y": "2"}, Expect: true},
125+
{Name: "up", Labels: map[string]string{"x": "1", "y": "2", "z": "3"}, Expect: true},
126+
{Name: "up", Labels: map[string]string{"x": "2", "y": "1"}, Expect: false},
127+
{Name: "go", Labels: map[string]string{"x": "1", "y": "2"}, Expect: false},
128+
}
129+
130+
for _, tcase := range testList {
131+
assert.Equal(t, tcase.Expect, filter(tcase.Name, tcase.Labels))
132+
}
133+
}
134+
103135
func BenchmarkSerializeToAvroJSON(b *testing.B) {
104136
serializer, _ := NewAvroJSONSerializer("schemas/metric.avsc")
105137
writeRequest := NewWriteRequest()

0 commit comments

Comments
 (0)