Skip to content

Commit ab87517

Browse files
committed
feat(influxdb): send request and parse response
1 parent b519d5e commit ab87517

File tree

9 files changed

+181
-22
lines changed

9 files changed

+181
-22
lines changed

pkg/tsdb/influxdb/influxdb.go

Lines changed: 135 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,17 @@ package influxdb
33
import (
44
"context"
55
"crypto/tls"
6+
"encoding/json"
7+
"fmt"
68
"net/http"
9+
"net/url"
10+
"path"
711
"time"
812

13+
"gopkg.in/guregu/null.v3"
14+
15+
"golang.org/x/net/context/ctxhttp"
16+
917
"github.com/grafana/grafana/pkg/log"
1018
"github.com/grafana/grafana/pkg/tsdb"
1119
)
@@ -43,22 +51,141 @@ func init() {
4351
}
4452
}
4553

46-
func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
47-
result := &tsdb.BatchResult{}
54+
func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.QueryContext) (string, error) {
4855
for _, v := range queries {
49-
5056
query, err := e.QueryParser.Parse(v.Model)
5157
if err != nil {
52-
result.Error = err
53-
return result
58+
return "", err
5459
}
5560

56-
glog.Info("Influxdb executor", "query", query)
61+
rawQuery, err := e.QueryBuilder.Build(query, context)
62+
if err != nil {
63+
return "", err
64+
}
65+
66+
return rawQuery, nil
67+
}
68+
69+
return "", fmt.Errorf("Tsdb request contains no queries")
70+
}
71+
72+
func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
73+
result := &tsdb.BatchResult{}
74+
75+
query, err := e.getQuery(queries, context)
76+
if err != nil {
77+
result.Error = err
78+
return result
79+
}
80+
81+
glog.Info("Influxdb", "query", query)
82+
83+
u, _ := url.Parse(e.Url)
84+
u.Path = path.Join(u.Path, "query")
85+
86+
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
87+
if err != nil {
88+
result.Error = err
89+
return result
90+
}
91+
92+
params := req.URL.Query()
93+
params.Set("q", query)
94+
params.Set("db", e.Database)
95+
params.Set("epoch", "s")
96+
97+
req.URL.RawQuery = params.Encode()
5798

58-
rawQuery, err := e.QueryBuilder.Build(query)
99+
req.Header.Set("Content-Type", "")
100+
req.Header.Set("User-Agent", "Grafana")
101+
if e.BasicAuth {
102+
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
103+
}
104+
105+
glog.Info("influxdb request", "url", req.URL.String())
106+
resp, err := ctxhttp.Do(ctx, HttpClient, req)
107+
if err != nil {
108+
result.Error = err
109+
return result
110+
}
111+
112+
if resp.StatusCode/100 != 2 {
113+
result.Error = fmt.Errorf("Influxdb returned statuscode %v body %v", resp.Status)
114+
return result
115+
}
116+
117+
var response Response
118+
dec := json.NewDecoder(resp.Body)
119+
dec.UseNumber()
120+
err = dec.Decode(&response)
121+
if err != nil {
122+
glog.Error("Influxdb decode failed", "err", err)
123+
result.Error = err
124+
return result
125+
}
126+
127+
result.QueryResults = make(map[string]*tsdb.QueryResult)
128+
queryRes := tsdb.NewQueryResult()
129+
130+
for _, v := range response.Results {
131+
for _, r := range v.Series {
132+
serie := tsdb.TimeSeries{Name: r.Name}
133+
var points tsdb.TimeSeriesPoints
134+
135+
for _, k := range r.Values {
136+
var value null.Float
137+
var err error
138+
num, ok := k[1].(json.Number)
139+
if !ok {
140+
value = null.FloatFromPtr(nil)
141+
} else {
142+
fvalue, err := num.Float64()
143+
if err == nil {
144+
value = null.FloatFrom(fvalue)
145+
}
146+
}
147+
148+
pos0, ok := k[0].(json.Number)
149+
timestamp, err := pos0.Float64()
150+
if err == nil && ok {
151+
points = append(points, tsdb.NewTimePoint(value, timestamp))
152+
} else {
153+
glog.Error("Failed to convert response", "err1", err, "ok", ok, "timestamp", timestamp, "value", value.Float64)
154+
}
155+
serie.Points = points
156+
}
157+
queryRes.Series = append(queryRes.Series, &serie)
158+
}
159+
}
59160

60-
glog.Info("Influxdb", "error", err, "rawQuery", rawQuery)
161+
for _, v := range queryRes.Series {
162+
glog.Info("result", "name", v.Name, "points", v.Points)
61163
}
62164

165+
result.QueryResults["A"] = queryRes
166+
63167
return result
64168
}
169+
170+
type Response struct {
171+
Results []Result
172+
Err error
173+
}
174+
175+
type Result struct {
176+
Series []Row
177+
Messages []*Message
178+
Err error
179+
}
180+
181+
type Message struct {
182+
Level string `json:"level,omitempty"`
183+
Text string `json:"text,omitempty"`
184+
}
185+
186+
type Row struct {
187+
Name string `json:"name,omitempty"`
188+
Tags map[string]string `json:"tags,omitempty"`
189+
Columns []string `json:"columns,omitempty"`
190+
Values [][]interface{} `json:"values,omitempty"`
191+
}

pkg/tsdb/influxdb/models.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ type Query struct {
77
Tags []*Tag
88
GroupBy []*QueryPart
99
Selects []*Select
10+
11+
Interval string
1012
}
1113

1214
type Tag struct {

pkg/tsdb/influxdb/query_builder.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package influxdb
33
import (
44
"fmt"
55
"strings"
6+
7+
"github.com/grafana/grafana/pkg/tsdb"
68
)
79

810
type QueryBuild struct{}
@@ -27,7 +29,7 @@ func renderTags(query *Query) []string {
2729
return res
2830
}
2931

30-
func (*QueryBuild) Build(query *Query) (string, error) {
32+
func (*QueryBuild) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) {
3133
res := "SELECT "
3234

3335
var selectors []string
@@ -42,7 +44,9 @@ func (*QueryBuild) Build(query *Query) (string, error) {
4244
res += strings.Join(selectors, ", ")
4345

4446
policy := ""
45-
if query.Policy != "" {
47+
if query.Policy == "" || query.Policy == "default" {
48+
policy = ""
49+
} else {
4650
policy = `"` + query.Policy + `".`
4751
}
4852
res += fmt.Sprintf(` FROM %s"%s"`, policy, query.Measurement)
@@ -54,7 +58,8 @@ func (*QueryBuild) Build(query *Query) (string, error) {
5458
res += " AND "
5559
}
5660

57-
res += "$timeFilter"
61+
//res += "$timeFilter"
62+
res += "time > " + strings.Replace(queryContext.TimeRange.From, "now", "now()", 1)
5863

5964
var groupBy []string
6065
for _, group := range query.GroupBy {

pkg/tsdb/influxdb/query_builder_test.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package influxdb
33
import (
44
"testing"
55

6+
"github.com/grafana/grafana/pkg/tsdb"
67
. "github.com/smartystreets/goconvey/convey"
78
)
89

@@ -19,17 +20,22 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
1920
tag1 := &Tag{Key: "hostname", Value: "server1", Operator: "="}
2021
tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"}
2122

23+
queryContext := &tsdb.QueryContext{
24+
TimeRange: tsdb.NewTimeRange("now-5h", "now"),
25+
}
26+
2227
Convey("can build query", func() {
2328
query := &Query{
2429
Selects: []*Select{{*qp1, *qp2}},
2530
Measurement: "cpu",
2631
Policy: "policy",
2732
GroupBy: []*QueryPart{groupBy1, groupBy2},
33+
Interval: "10s",
2834
}
2935

30-
rawQuery, err := builder.Build(query)
36+
rawQuery, err := builder.Build(query, queryContext)
3137
So(err, ShouldBeNil)
32-
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE $timeFilter GROUP BY time($interval) fill(null)`)
38+
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now()-5h GROUP BY time(10s) fill(null)`)
3339
})
3440

3541
Convey("can asd query", func() {
@@ -38,11 +44,12 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
3844
Measurement: "cpu",
3945
GroupBy: []*QueryPart{groupBy1},
4046
Tags: []*Tag{tag1, tag2},
47+
Interval: "5s",
4148
}
4249

43-
rawQuery, err := builder.Build(query)
50+
rawQuery, err := builder.Build(query, queryContext)
4451
So(err, ShouldBeNil)
45-
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND $timeFilter GROUP BY time($interval)`)
52+
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now()-5h GROUP BY time(10s)`)
4653
})
4754
})
4855
}

pkg/tsdb/influxdb/query_part.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ func fieldRenderer(part *QueryPart, innerExpr string) string {
9191
}
9292

9393
func functionRenderer(part *QueryPart, innerExpr string) string {
94+
for i, v := range part.Params {
95+
if v == "$interval" {
96+
part.Params[i] = "10s"
97+
}
98+
}
99+
94100
if innerExpr != "" {
95101
part.Params = append([]string{innerExpr}, part.Params...)
96102
}

pkg/tsdb/influxdb/query_part_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ func TestInfluxdbQueryPart(t *testing.T) {
3333
So(res, ShouldEqual, "bottom(value, 3)")
3434
})
3535

36+
Convey("time", func() {
37+
part, err := NewQueryPart("time", []string{"$interval"})
38+
So(err, ShouldBeNil)
39+
40+
res := part.Render("")
41+
So(res, ShouldEqual, "time(10s)")
42+
})
43+
3644
Convey("should nest spread function", func() {
3745
part, err := NewQueryPart("spread", []string{})
3846
So(err, ShouldBeNil)

pkg/tsdb/models.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,15 @@ func NewQueryResult() *QueryResult {
7373
}
7474
}
7575

76-
func NewTimePoint(value float64, timestamp float64) TimePoint {
77-
return TimePoint{null.FloatFrom(value), null.FloatFrom(timestamp)}
76+
func NewTimePoint(value null.Float, timestamp float64) TimePoint {
77+
return TimePoint{value, null.FloatFrom(timestamp)}
7878
}
7979

8080
func NewTimeSeriesPointsFromArgs(values ...float64) TimeSeriesPoints {
8181
points := make(TimeSeriesPoints, 0)
8282

8383
for i := 0; i < len(values); i += 2 {
84-
points = append(points, NewTimePoint(values[i], values[i+1]))
84+
points = append(points, NewTimePoint(null.FloatFrom(values[i]), values[i+1]))
8585
}
8686

8787
return points

pkg/tsdb/prometheus/prometheus.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"strings"
99
"time"
1010

11+
"gopkg.in/guregu/null.v3"
12+
1113
"github.com/grafana/grafana/pkg/log"
1214
"github.com/grafana/grafana/pkg/tsdb"
1315
"github.com/prometheus/client_golang/api/prometheus"
@@ -145,7 +147,7 @@ func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb
145147
}
146148

147149
for _, k := range v.Values {
148-
series.Points = append(series.Points, tsdb.NewTimePoint(float64(k.Value), float64(k.Timestamp.Unix()*1000)))
150+
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(float64(k.Value)), float64(k.Timestamp.Unix()*1000)))
149151
}
150152

151153
queryRes.Series = append(queryRes.Series, &series)

pkg/tsdb/testdata/scenarios.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"strings"
77
"time"
88

9+
"gopkg.in/guregu/null.v3"
10+
911
"github.com/grafana/grafana/pkg/log"
1012
"github.com/grafana/grafana/pkg/tsdb"
1113
)
@@ -42,7 +44,7 @@ func init() {
4244
walker := rand.Float64() * 100
4345

4446
for i := int64(0); i < 10000 && timeWalkerMs < to; i++ {
45-
points = append(points, tsdb.NewTimePoint(walker, float64(timeWalkerMs)))
47+
points = append(points, tsdb.NewTimePoint(null.FloatFrom(walker), float64(timeWalkerMs)))
4648

4749
walker += rand.Float64() - 0.5
4850
timeWalkerMs += query.IntervalMs
@@ -73,7 +75,7 @@ func init() {
7375
series := newSeriesForQuery(query)
7476
outsideTime := context.TimeRange.MustGetFrom().Add(-1*time.Hour).Unix() * 1000
7577

76-
series.Points = append(series.Points, tsdb.NewTimePoint(10, float64(outsideTime)))
78+
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(10), float64(outsideTime)))
7779
queryRes.Series = append(queryRes.Series, series)
7880

7981
return queryRes
@@ -105,7 +107,7 @@ func init() {
105107
step := (endTime - startTime) / int64(len(values)-1)
106108

107109
for _, val := range values {
108-
series.Points = append(series.Points, tsdb.NewTimePoint(val, float64(startTime)))
110+
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(val), float64(startTime)))
109111
startTime += step
110112
}
111113

0 commit comments

Comments
 (0)