Skip to content

Commit b0d154d

Browse files
authored
Merge pull request grafana#6221 from grafana/alerting_influxdb
Alerting support for influxdb
2 parents 04f417a + 8d96262 commit b0d154d

File tree

20 files changed

+1228
-14
lines changed

20 files changed

+1228
-14
lines changed

docker/blocks/influxdb/fig

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
influxdb:
2-
image: tutum/influxdb:0.12
2+
#image: influxdb/influxdb:1.0-alpine
3+
image: influxdb:latest
4+
container_name: influxdb
35
ports:
46
- "2004:2004"
57
- "8083:8083"

pkg/cmd/grafana-server/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
_ "github.com/grafana/grafana/pkg/services/alerting/conditions"
2121
_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
2222
_ "github.com/grafana/grafana/pkg/tsdb/graphite"
23+
_ "github.com/grafana/grafana/pkg/tsdb/influxdb"
2324
_ "github.com/grafana/grafana/pkg/tsdb/prometheus"
2425
_ "github.com/grafana/grafana/pkg/tsdb/testdata"
2526
)

pkg/services/alerting/conditions/evaluator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func NewAlertEvaluator(model *simplejson.Json) (AlertEvaluator, error) {
122122
return &NoDataEvaluator{}, nil
123123
}
124124

125-
return nil, alerting.ValidationError{Reason: "Evaludator invalid evaluator type"}
125+
return nil, alerting.ValidationError{Reason: "Evaludator invalid evaluator type: " + typ}
126126
}
127127

128128
func inSlice(a string, list []string) bool {

pkg/services/alerting/conditions/reducer_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package conditions
33
import (
44
"testing"
55

6+
"gopkg.in/guregu/null.v3"
7+
68
"github.com/grafana/grafana/pkg/tsdb"
79
. "github.com/smartystreets/goconvey/convey"
810
)
@@ -43,7 +45,7 @@ func testReducer(typ string, datapoints ...float64) float64 {
4345
}
4446

4547
for idx := range datapoints {
46-
series.Points = append(series.Points, tsdb.NewTimePoint(datapoints[idx], 1234134))
48+
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(datapoints[idx]), 1234134))
4749
}
4850

4951
return reducer.Reduce(series).Float64

pkg/tsdb/influxdb/influxdb.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package influxdb
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"encoding/json"
7+
"fmt"
8+
"net/http"
9+
"net/url"
10+
"path"
11+
"time"
12+
13+
"golang.org/x/net/context/ctxhttp"
14+
15+
"github.com/grafana/grafana/pkg/log"
16+
"github.com/grafana/grafana/pkg/tsdb"
17+
)
18+
19+
type InfluxDBExecutor struct {
20+
*tsdb.DataSourceInfo
21+
QueryParser *InfluxdbQueryParser
22+
QueryBuilder *QueryBuilder
23+
ResponseParser *ResponseParser
24+
}
25+
26+
func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
27+
return &InfluxDBExecutor{
28+
DataSourceInfo: dsInfo,
29+
QueryParser: &InfluxdbQueryParser{},
30+
QueryBuilder: &QueryBuilder{},
31+
ResponseParser: &ResponseParser{},
32+
}
33+
}
34+
35+
var (
36+
glog log.Logger
37+
HttpClient *http.Client
38+
)
39+
40+
func init() {
41+
glog = log.New("tsdb.influxdb")
42+
tsdb.RegisterExecutor("influxdb", NewInfluxDBExecutor)
43+
44+
tr := &http.Transport{
45+
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
46+
}
47+
48+
HttpClient = &http.Client{
49+
Timeout: time.Duration(15 * time.Second),
50+
Transport: tr,
51+
}
52+
}
53+
54+
func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
55+
result := &tsdb.BatchResult{}
56+
57+
query, err := e.getQuery(queries, context)
58+
if err != nil {
59+
return result.WithError(err)
60+
}
61+
62+
glog.Debug("Influxdb query", "raw query", query)
63+
64+
req, err := e.createRequest(query)
65+
if err != nil {
66+
return result.WithError(err)
67+
}
68+
69+
resp, err := ctxhttp.Do(ctx, HttpClient, req)
70+
if err != nil {
71+
return result.WithError(err)
72+
}
73+
74+
if resp.StatusCode/100 != 2 {
75+
return result.WithError(fmt.Errorf("Influxdb returned statuscode invalid status code: %v", resp.Status))
76+
}
77+
78+
var response Response
79+
dec := json.NewDecoder(resp.Body)
80+
dec.UseNumber()
81+
err = dec.Decode(&response)
82+
if err != nil {
83+
return result.WithError(err)
84+
}
85+
86+
result.QueryResults = make(map[string]*tsdb.QueryResult)
87+
result.QueryResults["A"] = e.ResponseParser.Parse(&response)
88+
89+
return result
90+
}
91+
92+
func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.QueryContext) (string, error) {
93+
for _, v := range queries {
94+
query, err := e.QueryParser.Parse(v.Model)
95+
if err != nil {
96+
return "", err
97+
}
98+
99+
rawQuery, err := e.QueryBuilder.Build(query, context)
100+
if err != nil {
101+
return "", err
102+
}
103+
104+
return rawQuery, nil
105+
}
106+
107+
return "", fmt.Errorf("query request contains no queries")
108+
}
109+
110+
func (e *InfluxDBExecutor) createRequest(query string) (*http.Request, error) {
111+
u, _ := url.Parse(e.Url)
112+
u.Path = path.Join(u.Path, "query")
113+
114+
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
115+
if err != nil {
116+
return nil, err
117+
}
118+
119+
params := req.URL.Query()
120+
params.Set("q", query)
121+
params.Set("db", e.Database)
122+
params.Set("epoch", "s")
123+
req.URL.RawQuery = params.Encode()
124+
125+
req.Header.Set("User-Agent", "Grafana")
126+
if e.BasicAuth {
127+
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
128+
}
129+
130+
glog.Debug("Influxdb request", "url", req.URL.String())
131+
return req, nil
132+
}

pkg/tsdb/influxdb/model_parser.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package influxdb
2+
3+
import (
4+
"strconv"
5+
6+
"github.com/grafana/grafana/pkg/components/simplejson"
7+
)
8+
9+
type InfluxdbQueryParser struct{}
10+
11+
func (qp *InfluxdbQueryParser) Parse(model *simplejson.Json) (*Query, error) {
12+
policy := model.Get("policy").MustString("default")
13+
14+
measurement, err := model.Get("measurement").String()
15+
if err != nil {
16+
return nil, err
17+
}
18+
19+
resultFormat, err := model.Get("resultFormat").String()
20+
if err != nil {
21+
return nil, err
22+
}
23+
24+
tags, err := qp.parseTags(model)
25+
if err != nil {
26+
return nil, err
27+
}
28+
29+
groupBys, err := qp.parseGroupBy(model)
30+
if err != nil {
31+
return nil, err
32+
}
33+
34+
selects, err := qp.parseSelects(model)
35+
if err != nil {
36+
return nil, err
37+
}
38+
39+
return &Query{
40+
Measurement: measurement,
41+
Policy: policy,
42+
ResultFormat: resultFormat,
43+
GroupBy: groupBys,
44+
Tags: tags,
45+
Selects: selects,
46+
}, nil
47+
}
48+
49+
func (qp *InfluxdbQueryParser) parseSelects(model *simplejson.Json) ([]*Select, error) {
50+
var result []*Select
51+
52+
for _, selectObj := range model.Get("select").MustArray() {
53+
selectJson := simplejson.NewFromAny(selectObj)
54+
var parts Select
55+
56+
for _, partObj := range selectJson.MustArray() {
57+
part := simplejson.NewFromAny(partObj)
58+
queryPart, err := qp.parseQueryPart(part)
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
parts = append(parts, *queryPart)
64+
}
65+
66+
result = append(result, &parts)
67+
}
68+
69+
return result, nil
70+
}
71+
72+
func (*InfluxdbQueryParser) parseTags(model *simplejson.Json) ([]*Tag, error) {
73+
var result []*Tag
74+
for _, t := range model.Get("tags").MustArray() {
75+
tagJson := simplejson.NewFromAny(t)
76+
tag := &Tag{}
77+
var err error
78+
79+
tag.Key, err = tagJson.Get("key").String()
80+
if err != nil {
81+
return nil, err
82+
}
83+
84+
tag.Value, err = tagJson.Get("value").String()
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
operator, err := tagJson.Get("operator").String()
90+
if err == nil {
91+
tag.Operator = operator
92+
}
93+
94+
condition, err := tagJson.Get("condition").String()
95+
if err == nil {
96+
tag.Condition = condition
97+
}
98+
99+
result = append(result, tag)
100+
}
101+
102+
return result, nil
103+
}
104+
105+
func (*InfluxdbQueryParser) parseQueryPart(model *simplejson.Json) (*QueryPart, error) {
106+
typ, err := model.Get("type").String()
107+
if err != nil {
108+
return nil, err
109+
}
110+
111+
var params []string
112+
for _, paramObj := range model.Get("params").MustArray() {
113+
param := simplejson.NewFromAny(paramObj)
114+
115+
stringParam, err := param.String()
116+
if err == nil {
117+
params = append(params, stringParam)
118+
continue
119+
}
120+
121+
intParam, err := param.Int()
122+
if err == nil {
123+
params = append(params, strconv.Itoa(intParam))
124+
continue
125+
}
126+
127+
return nil, err
128+
129+
}
130+
131+
qp, err := NewQueryPart(typ, params)
132+
if err != nil {
133+
return nil, err
134+
}
135+
136+
return qp, nil
137+
}
138+
139+
func (qp *InfluxdbQueryParser) parseGroupBy(model *simplejson.Json) ([]*QueryPart, error) {
140+
var result []*QueryPart
141+
142+
for _, groupObj := range model.Get("groupBy").MustArray() {
143+
groupJson := simplejson.NewFromAny(groupObj)
144+
queryPart, err := qp.parseQueryPart(groupJson)
145+
146+
if err != nil {
147+
return nil, err
148+
}
149+
result = append(result, queryPart)
150+
}
151+
152+
return result, nil
153+
}

0 commit comments

Comments
 (0)