Skip to content

Commit b0addbd

Browse files
committed
feat(influxdb): support multi row results
1 parent d8aa38f commit b0addbd

File tree

3 files changed

+93
-50
lines changed

3 files changed

+93
-50
lines changed

pkg/tsdb/influxdb/influxdb.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@ import (
1818

1919
type InfluxDBExecutor struct {
2020
*tsdb.DataSourceInfo
21-
QueryParser *InfluxdbQueryParser
22-
QueryBuilder *QueryBuilder
21+
QueryParser *InfluxdbQueryParser
22+
QueryBuilder *QueryBuilder
23+
ResponseParser *ResponseParser
2324
}
2425

2526
func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
2627
return &InfluxDBExecutor{
2728
DataSourceInfo: dsInfo,
2829
QueryParser: &InfluxdbQueryParser{},
2930
QueryBuilder: &QueryBuilder{},
31+
ResponseParser: &ResponseParser{},
3032
}
3133
}
3234

@@ -124,7 +126,7 @@ func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice,
124126
}
125127

126128
result.QueryResults = make(map[string]*tsdb.QueryResult)
127-
result.QueryResults["A"] = ParseQueryResult(&response)
129+
result.QueryResults["A"] = e.ResponseParser.Parse(&response)
128130

129131
return result
130132
}

pkg/tsdb/influxdb/response_parser.go

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,48 +2,79 @@ package influxdb
22

33
import (
44
"encoding/json"
5+
"fmt"
56

67
"github.com/grafana/grafana/pkg/tsdb"
78
"gopkg.in/guregu/null.v3"
89
)
910

10-
func ParseQueryResult(response *Response) *tsdb.QueryResult {
11+
type ResponseParser struct{}
12+
13+
func (rp *ResponseParser) Parse(response *Response) *tsdb.QueryResult {
1114
queryRes := tsdb.NewQueryResult()
1215

13-
for _, v := range response.Results {
14-
for _, r := range v.Series {
15-
serie := tsdb.TimeSeries{Name: r.Name}
16-
var points tsdb.TimeSeriesPoints
16+
for _, result := range response.Results {
17+
rp.parseResult(result.Series, queryRes)
18+
}
1719

20+
for _, serie := range queryRes.Series {
21+
glog.Debug("result", "name", serie.Name, "points", serie.Points)
22+
}
23+
24+
return queryRes
25+
}
26+
27+
func (rp *ResponseParser) parseResult(result []Row, queryResult *tsdb.QueryResult) {
28+
for _, r := range result {
29+
for columnIndex, column := range r.Columns {
30+
if column == "time" {
31+
continue
32+
}
33+
34+
var points tsdb.TimeSeriesPoints
1835
for _, k := range r.Values {
19-
var value null.Float
20-
var err error
21-
num, ok := k[1].(json.Number)
22-
if !ok {
23-
value = null.FloatFromPtr(nil)
24-
} else {
25-
fvalue, err := num.Float64()
26-
if err == nil {
27-
value = null.FloatFrom(fvalue)
28-
}
29-
}
30-
31-
pos0, ok := k[0].(json.Number)
32-
timestamp, err := pos0.Float64()
33-
if err == nil && ok {
34-
points = append(points, tsdb.NewTimePoint(value, timestamp))
35-
} else {
36-
//glog.Error("Failed to convert response", "err1", err, "ok", ok, "timestamp", timestamp, "value", value.Float64)
37-
}
38-
serie.Points = points
36+
points = append(points, rp.parseTimepoint(k, columnIndex))
3937
}
40-
queryRes.Series = append(queryRes.Series, &serie)
38+
39+
queryResult.Series = append(queryResult.Series, &tsdb.TimeSeries{
40+
Name: rp.formatName(r, column),
41+
Points: points,
42+
})
4143
}
4244
}
45+
}
4346

44-
for _, v := range queryRes.Series {
45-
glog.Info("result", "name", v.Name, "points", v.Points)
47+
func (rp *ResponseParser) formatName(row Row, column string) string {
48+
return fmt.Sprintf("%s.%s", row.Name, column)
49+
}
50+
51+
func (rp *ResponseParser) parseTimepoint(k []interface{}, valuePosition int) tsdb.TimePoint {
52+
var value null.Float = rp.parseValue(k[valuePosition])
53+
54+
timestampNumber, _ := k[0].(json.Number)
55+
timestamp, err := timestampNumber.Float64()
56+
if err != nil {
57+
glog.Error("Invalid timestamp format. This should never happen!")
4658
}
4759

48-
return queryRes
60+
return tsdb.NewTimePoint(value, timestamp)
61+
}
62+
63+
func (rp *ResponseParser) parseValue(value interface{}) null.Float {
64+
num, ok := value.(json.Number)
65+
if !ok {
66+
return null.FloatFromPtr(nil)
67+
}
68+
69+
fvalue, err := num.Float64()
70+
if err == nil {
71+
return null.FloatFrom(fvalue)
72+
}
73+
74+
ivalue, err := num.Int64()
75+
if err == nil {
76+
return null.FloatFrom(float64(ivalue))
77+
}
78+
79+
return null.FloatFromPtr(nil)
4980
}

pkg/tsdb/influxdb/response_parser_test.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,13 @@ import (
44
"encoding/json"
55
"testing"
66

7-
"github.com/grafana/grafana/pkg/setting"
87
. "github.com/smartystreets/goconvey/convey"
98
)
109

1110
func TestInfluxdbResponseParser(t *testing.T) {
1211
Convey("Influxdb response parser", t, func() {
1312

14-
setting.NewConfigContext(&setting.CommandLineArgs{
15-
HomePath: "../../../",
16-
})
13+
parser := &ResponseParser{}
1714

1815
response := &Response{
1916
Results: []Result{
@@ -22,28 +19,41 @@ func TestInfluxdbResponseParser(t *testing.T) {
2219
{
2320
Name: "cpu",
2421
Columns: []string{"time", "mean", "sum"},
22+
Tags: map[string]string{"datacenter": "America"},
2523
Values: [][]interface{}{
26-
{json.Number("123"), json.Number("123"), json.Number("123")},
27-
{json.Number("123"), json.Number("123"), json.Number("123")},
28-
{json.Number("123"), json.Number("123"), json.Number("123")},
29-
{json.Number("123"), json.Number("123"), json.Number("123")},
30-
{json.Number("123"), json.Number("123"), json.Number("123")},
31-
{json.Number("123"), json.Number("123"), json.Number("123")},
32-
{json.Number("123"), json.Number("123"), json.Number("123")},
33-
{json.Number("123"), json.Number("123"), json.Number("123")},
34-
{json.Number("123"), json.Number("123"), json.Number("123")},
35-
{json.Number("123"), json.Number("123"), json.Number("123")},
24+
{json.Number("111"), json.Number("222"), json.Number("333")},
25+
{json.Number("111"), json.Number("222"), json.Number("333")},
26+
{json.Number("111"), json.Number("null"), json.Number("333")},
3627
},
3728
},
3829
},
3930
},
4031
},
4132
}
4233

43-
Convey("can parse response", func() {
44-
result := ParseQueryResult(response)
45-
So(len(result.Series), ShouldEqual, 1)
46-
So(len(result.Series[0].Points), ShouldEqual, 10)
34+
result := parser.Parse(response)
35+
36+
Convey("can parse all series", func() {
37+
So(len(result.Series), ShouldEqual, 2)
38+
})
39+
40+
Convey("can parse all points", func() {
41+
So(len(result.Series[0].Points), ShouldEqual, 3)
42+
So(len(result.Series[1].Points), ShouldEqual, 3)
43+
})
44+
45+
Convey("can parse multi row result", func() {
46+
So(result.Series[0].Points[1][0].Float64, ShouldEqual, float64(222))
47+
So(result.Series[1].Points[1][0].Float64, ShouldEqual, float64(333))
48+
})
49+
50+
Convey("can parse null points", func() {
51+
So(result.Series[0].Points[2][0].Valid, ShouldBeFalse)
52+
})
53+
54+
Convey("can format serie names", func() {
55+
So(result.Series[0].Name, ShouldEqual, "cpu.mean")
56+
So(result.Series[1].Name, ShouldEqual, "cpu.sum")
4757
})
4858
})
4959
}

0 commit comments

Comments
 (0)