@@ -10,8 +10,6 @@ import (
10
10
"path"
11
11
"time"
12
12
13
- "gopkg.in/guregu/null.v3"
14
-
15
13
"golang.org/x/net/context/ctxhttp"
16
14
17
15
"github.com/grafana/grafana/pkg/log"
@@ -21,14 +19,14 @@ import (
21
19
type InfluxDBExecutor struct {
22
20
* tsdb.DataSourceInfo
23
21
QueryParser * InfluxdbQueryParser
24
- QueryBuilder * QueryBuild
22
+ QueryBuilder * QueryBuilder
25
23
}
26
24
27
25
func NewInfluxDBExecutor (dsInfo * tsdb.DataSourceInfo ) tsdb.Executor {
28
26
return & InfluxDBExecutor {
29
27
DataSourceInfo : dsInfo ,
30
28
QueryParser : & InfluxdbQueryParser {},
31
- QueryBuilder : & QueryBuild {},
29
+ QueryBuilder : & QueryBuilder {},
32
30
}
33
31
}
34
32
@@ -66,126 +64,67 @@ func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.Query
66
64
return rawQuery , nil
67
65
}
68
66
69
- return "" , fmt .Errorf ("Tsdb request contains no queries" )
67
+ return "" , fmt .Errorf ("query request contains no queries" )
70
68
}
71
69
72
- func (e * InfluxDBExecutor ) Execute (ctx context.Context , queries tsdb.QuerySlice , context * tsdb.QueryContext ) * tsdb.BatchResult {
73
- result := & tsdb.BatchResult {}
74
-
75
- query , err := e .getQuery (queries , context )
76
- if err != nil {
77
- result .Error = err
78
- return result
79
- }
80
-
81
- glog .Info ("Influxdb" , "query" , query )
82
-
70
+ func (e * InfluxDBExecutor ) createRequest (query string ) (* http.Request , error ) {
83
71
u , _ := url .Parse (e .Url )
84
72
u .Path = path .Join (u .Path , "query" )
85
73
86
74
req , err := http .NewRequest (http .MethodGet , u .String (), nil )
87
75
if err != nil {
88
- result .Error = err
89
- return result
76
+ return nil , err
90
77
}
91
78
92
79
params := req .URL .Query ()
93
80
params .Set ("q" , query )
94
81
params .Set ("db" , e .Database )
95
82
params .Set ("epoch" , "s" )
96
-
97
83
req .URL .RawQuery = params .Encode ()
98
84
99
- req .Header .Set ("Content-Type" , "" )
100
85
req .Header .Set ("User-Agent" , "Grafana" )
101
86
if e .BasicAuth {
102
87
req .SetBasicAuth (e .BasicAuthUser , e .BasicAuthPassword )
103
88
}
104
89
105
- glog .Info ("influxdb request" , "url" , req .URL .String ())
90
+ glog .Debug ("influxdb request" , "url" , req .URL .String ())
91
+ return req , nil
92
+ }
93
+
94
+ func (e * InfluxDBExecutor ) Execute (ctx context.Context , queries tsdb.QuerySlice , context * tsdb.QueryContext ) * tsdb.BatchResult {
95
+ result := & tsdb.BatchResult {}
96
+
97
+ query , err := e .getQuery (queries , context )
98
+ if err != nil {
99
+ return result .WithError (err )
100
+ }
101
+
102
+ glog .Debug ("Influxdb query" , "raw query" , query )
103
+
104
+ req , err := e .createRequest (query )
105
+ if err != nil {
106
+ return result .WithError (err )
107
+ }
108
+
106
109
resp , err := ctxhttp .Do (ctx , HttpClient , req )
107
110
if err != nil {
108
- result .Error = err
109
- return result
111
+ return result .WithError (err )
110
112
}
111
113
112
114
if resp .StatusCode / 100 != 2 {
113
- result .Error = fmt .Errorf ("Influxdb returned statuscode %v body %v" , resp .Status )
114
- return result
115
+ return result .WithError (fmt .Errorf ("Influxdb returned statuscode invalid status code: %v" , resp .Status ))
115
116
}
116
117
117
118
var response Response
118
119
dec := json .NewDecoder (resp .Body )
119
120
dec .UseNumber ()
120
121
err = dec .Decode (& response )
121
122
if err != nil {
122
- glog .Error ("Influxdb decode failed" , "err" , err )
123
- result .Error = err
124
- return result
123
+ return result .WithError (err )
125
124
}
126
125
127
126
result .QueryResults = make (map [string ]* tsdb.QueryResult )
128
- queryRes := tsdb .NewQueryResult ()
129
-
130
- for _ , v := range response .Results {
131
- for _ , r := range v .Series {
132
- serie := tsdb.TimeSeries {Name : r .Name }
133
- var points tsdb.TimeSeriesPoints
134
-
135
- for _ , k := range r .Values {
136
- var value null.Float
137
- var err error
138
- num , ok := k [1 ].(json.Number )
139
- if ! ok {
140
- value = null .FloatFromPtr (nil )
141
- } else {
142
- fvalue , err := num .Float64 ()
143
- if err == nil {
144
- value = null .FloatFrom (fvalue )
145
- }
146
- }
147
-
148
- pos0 , ok := k [0 ].(json.Number )
149
- timestamp , err := pos0 .Float64 ()
150
- if err == nil && ok {
151
- points = append (points , tsdb .NewTimePoint (value , timestamp ))
152
- } else {
153
- glog .Error ("Failed to convert response" , "err1" , err , "ok" , ok , "timestamp" , timestamp , "value" , value .Float64 )
154
- }
155
- serie .Points = points
156
- }
157
- queryRes .Series = append (queryRes .Series , & serie )
158
- }
159
- }
160
-
161
- for _ , v := range queryRes .Series {
162
- glog .Info ("result" , "name" , v .Name , "points" , v .Points )
163
- }
164
-
165
- result .QueryResults ["A" ] = queryRes
127
+ result .QueryResults ["A" ] = ParseQueryResult (& response )
166
128
167
129
return result
168
130
}
169
-
170
- type Response struct {
171
- Results []Result
172
- Err error
173
- }
174
-
175
- type Result struct {
176
- Series []Row
177
- Messages []* Message
178
- Err error
179
- }
180
-
181
- type Message struct {
182
- Level string `json:"level,omitempty"`
183
- Text string `json:"text,omitempty"`
184
- }
185
-
186
- type Row struct {
187
- Name string `json:"name,omitempty"`
188
- Tags map [string ]string `json:"tags,omitempty"`
189
- Columns []string `json:"columns,omitempty"`
190
- Values [][]interface {} `json:"values,omitempty"`
191
- }
0 commit comments