@@ -3,17 +3,28 @@ import {
3
3
DataQueryResponse ,
4
4
DataSourceApi ,
5
5
DataSourceInstanceSettings ,
6
- MutableDataFrame ,
7
- FieldType ,
8
6
QueryFixAction ,
7
+ DataSourceWithSupplementaryQueriesSupport ,
8
+ SupplementaryQueryType ,
9
+ LogLevel ,
9
10
} from '@grafana/data' ;
10
-
11
+ import { Observable } from 'rxjs' ;
11
12
import { getBackendSrv } from '@grafana/runtime' ;
13
+ import { queryLogsVolume } from './features/log/LogsModel' ;
12
14
13
- import { MyQuery , MyDataSourceOptions , TimeRange } from './types' ;
14
- import { b64EncodeUnicode , logsErrorMessage } from 'utils/zincutils' ;
15
+ import { MyQuery , MyDataSourceOptions } from './types' ;
16
+ import { logsErrorMessage , getConsumableTime } from 'utils/zincutils' ;
15
17
import { getOrganizations } from 'services/organizations' ;
16
- export class DataSource extends DataSourceApi < MyQuery , MyDataSourceOptions > {
18
+ import { cloneDeep } from 'lodash' ;
19
+ import { getGraphDataFrame , getLogsDataFrame } from 'features/log/queryResponseBuilder' ;
20
+ import { buildQuery } from './features/query/queryBuilder' ;
21
+
22
+ const REF_ID_STARTER_LOG_VOLUME = 'log-volume-' ;
23
+
24
+ export class DataSource
25
+ extends DataSourceApi < MyQuery , MyDataSourceOptions >
26
+ implements DataSourceWithSupplementaryQueriesSupport < MyQuery >
27
+ {
17
28
instanceSettings ?: DataSourceInstanceSettings < MyDataSourceOptions > ;
18
29
url : string ;
19
30
streamFields : any [ ] ;
@@ -25,70 +36,34 @@ export class DataSource extends DataSourceApi<MyQuery, MyDataSourceOptions> {
25
36
this . streamFields = [ ] ;
26
37
}
27
38
28
- getConsumableTime ( range : any ) {
29
- const startTimeInMicro : any = new Date ( new Date ( range ! . from . valueOf ( ) ) . toISOString ( ) ) . getTime ( ) * 1000 ;
30
- const endTimeInMirco : any = new Date ( new Date ( range ! . to . valueOf ( ) ) . toISOString ( ) ) . getTime ( ) * 1000 ;
31
- return {
32
- startTimeInMicro,
33
- endTimeInMirco,
34
- } ;
35
- }
36
-
37
- buildLogsDataFrame ( logs : any [ ] , target : MyQuery ) {
38
- const fieldsMapping : { [ key : string ] : FieldType } = {
39
- Utf8 : FieldType . string ,
40
- Int64 : FieldType . number ,
41
- timestamp : FieldType . time ,
42
- } ;
43
-
44
- const fields = [
45
- { name : 'Time' , type : FieldType . time } ,
46
- { name : 'Content' , type : FieldType . string } ,
47
- ] ;
48
-
49
- this . streamFields . forEach ( ( field : any ) => {
50
- fields . push ( {
51
- name : field . name ,
52
- type : fieldsMapping [ field . type ] ,
53
- } ) ;
54
- } ) ;
55
-
56
- const frame = new MutableDataFrame ( {
57
- refId : target . refId ,
58
- meta : {
59
- preferredVisualisationType : 'logs' ,
60
- } ,
61
- fields,
62
- } ) ;
63
-
64
- logs . forEach ( ( log : any ) => {
65
- frame . add ( { ...log , Content : JSON . stringify ( log ) } ) ;
66
- } ) ;
67
-
68
- return frame ;
69
- }
70
-
71
39
async query ( options : DataQueryRequest < MyQuery > ) : Promise < DataQueryResponse > {
72
- const timestamps = this . getConsumableTime ( options . range ) ;
40
+ const timestamps = getConsumableTime ( options . range ) ;
73
41
const promises = options . targets . map ( ( target ) => {
74
- const reqData = this . buildQuery ( target , timestamps ) ;
42
+ const reqData = buildQuery ( target , timestamps , this . streamFields ) ;
75
43
return this . doRequest ( target , reqData )
76
44
. then ( ( response ) => {
77
- return this . buildLogsDataFrame ( response . hits , target ) ;
45
+ if ( target ?. refId ?. includes ( REF_ID_STARTER_LOG_VOLUME ) ) {
46
+ return getGraphDataFrame ( response , target ) ;
47
+ }
48
+ return getLogsDataFrame ( response , target , this . streamFields ) ;
78
49
} )
79
50
. catch ( ( err ) => {
80
- let error = '' ;
81
- if ( err . response !== undefined ) {
82
- error = err . response . data . error ;
51
+ let error = {
52
+ message : '' ,
53
+ detail : '' ,
54
+ } ;
55
+ if ( err . data ) {
56
+ error . message = err . data ?. message ;
57
+ error . detail = err . data ?. error_detail ;
83
58
} else {
84
- error = err . message ;
59
+ error . message = err . statusText ;
85
60
}
86
61
87
- const customMessage = logsErrorMessage ( err . response . data . code ) ;
62
+ const customMessage = logsErrorMessage ( err . data . code ) ;
88
63
if ( customMessage ) {
89
- error = customMessage ;
64
+ error . message = customMessage ;
90
65
}
91
- throw new Error ( error ) ;
66
+ throw new Error ( error . message + ( error . detail ? ` ( ${ error . detail } ) ` : '' ) ) ;
92
67
} ) ;
93
68
} ) ;
94
69
@@ -146,68 +121,54 @@ export class DataSource extends DataSourceApi<MyQuery, MyDataSourceOptions> {
146
121
return { ...query , query : expression } ;
147
122
}
148
123
149
- buildQuery ( queryData : MyQuery , timestamps : TimeRange ) {
150
- try {
151
- let query : string = queryData . query || '' ;
152
-
153
- let req : any = {
154
- query : {
155
- sql : 'select * from "[INDEX_NAME]" [WHERE_CLAUSE]' ,
156
- start_time : timestamps . startTimeInMicro ,
157
- end_time : timestamps . endTimeInMirco ,
158
- size : 150 ,
159
- sql_mode : 'full' ,
160
- } ,
161
- } ;
162
-
163
- if ( queryData . sqlMode ) {
164
- req . query . sql = queryData . query ;
165
- }
124
+ updateStreamFields ( streamFields : any [ ] ) {
125
+ this . streamFields = [ ...streamFields ] ;
126
+ }
166
127
167
- if ( ! queryData . sqlMode ) {
168
- let whereClause = query ;
169
-
170
- if ( query . trim ( ) . length ) {
171
- whereClause = whereClause
172
- . replace ( / = (? = (?: [ ^ " ' ] * " [ ^ " ' ] * " ' ) * [ ^ " ' ] * $ ) / g, ' =' )
173
- . replace ( / > (? = (?: [ ^ " ' ] * " [ ^ " ' ] * " ' ) * [ ^ " ' ] * $ ) / g, ' >' )
174
- . replace ( / < (? = (?: [ ^ " ' ] * " [ ^ " ' ] * " ' ) * [ ^ " ' ] * $ ) / g, ' <' ) ;
175
-
176
- whereClause = whereClause
177
- . replace ( / ! = (? = (?: [ ^ " ' ] * " [ ^ " ' ] * " ' ) * [ ^ " ' ] * $ ) / g, ' !=' )
178
- . replace ( / ! = (? = (?: [ ^ " ' ] * " [ ^ " ' ] * " ' ) * [ ^ " ' ] * $ ) / g, ' !=' )
179
- . replace ( / < = (? = (?: [ ^ " ' ] * " [ ^ " ' ] * " ' ) * [ ^ " ' ] * $ ) / g, ' <=' )
180
- . replace ( / > = (? = (?: [ ^ " ' ] * " [ ^ " ' ] * " ' ) * [ ^ " ' ] * $ ) / g, ' >=' ) ;
181
-
182
- const parsedSQL = whereClause . split ( ' ' ) ;
183
- this . streamFields . forEach ( ( field : any ) => {
184
- parsedSQL . forEach ( ( node : any , index : any ) => {
185
- if ( node === field . name ) {
186
- node = node . replaceAll ( '"' , '' ) ;
187
- parsedSQL [ index ] = '"' + node + '"' ;
188
- }
189
- } ) ;
190
- } ) ;
191
-
192
- whereClause = parsedSQL . join ( ' ' ) ;
193
-
194
- req . query . sql = req . query . sql . replace ( '[WHERE_CLAUSE]' , ' WHERE ' + whereClause ) ;
195
- } else {
196
- req . query . sql = req . query . sql . replace ( '[WHERE_CLAUSE]' , '' ) ;
197
- }
128
+ getDataProvider (
129
+ type : SupplementaryQueryType ,
130
+ request : DataQueryRequest < MyQuery >
131
+ ) : Observable < DataQueryResponse > | undefined {
132
+ if ( ! this . getSupportedSupplementaryQueryTypes ( ) . includes ( type ) ) {
133
+ return undefined ;
134
+ }
198
135
199
- req . query . sql = req . query . sql . replace ( '[INDEX_NAME]' , queryData . stream ) ;
200
- }
136
+ switch ( type ) {
137
+ case SupplementaryQueryType . LogsVolume :
138
+ return this . getLogsVolumeDataProvider ( request ) ;
139
+ default :
140
+ return undefined ;
141
+ }
142
+ }
201
143
202
- req [ 'encoding' ] = 'base64' ;
203
- req . query . sql = b64EncodeUnicode ( req . query . sql ) ;
144
+ getSupportedSupplementaryQueryTypes ( ) : SupplementaryQueryType [ ] {
145
+ return [ SupplementaryQueryType . LogsVolume ] ;
146
+ // return [SupplementaryQueryType.LogsVolume, SupplementaryQueryType.LogsSample];
147
+ }
204
148
205
- return req ;
206
- } catch ( e ) {
207
- console . log ( 'error in building query:' , e ) ;
208
- }
149
+ getSupplementaryQuery ( type : SupplementaryQueryType , query : MyQuery ) : MyQuery | undefined {
150
+ return undefined ;
209
151
}
210
- updateStreamFields ( streamFields : any [ ] ) {
211
- this . streamFields = [ ...streamFields ] ;
152
+
153
+ getLogsVolumeDataProvider ( request : DataQueryRequest < MyQuery > ) : Observable < DataQueryResponse > | undefined {
154
+ const logsVolumeRequest = cloneDeep ( request ) ;
155
+ const targets = logsVolumeRequest . targets . map ( ( target ) => {
156
+ target [ 'refId' ] = REF_ID_STARTER_LOG_VOLUME + target . refId ;
157
+ return target ;
158
+ } ) ;
159
+
160
+ if ( ! targets . length ) {
161
+ return undefined ;
162
+ }
163
+
164
+ return queryLogsVolume (
165
+ this ,
166
+ { ...logsVolumeRequest , targets } ,
167
+ {
168
+ extractLevel : ( ) => LogLevel . unknown ,
169
+ range : logsVolumeRequest . range ,
170
+ targets : logsVolumeRequest . targets ,
171
+ }
172
+ ) ;
212
173
}
213
174
}
0 commit comments