@@ -52,17 +52,25 @@ func NewPIWebAPIDatasource(ctx context.Context, settings backend.DataSourceInsta
5252 return nil , fmt .Errorf ("httpclient new: %w" , err )
5353 }
5454
55- webIDCache := newWebIDCache ()
55+ var maxDuration int
56+ if dataSourceOptions .MaxCacheTime != nil && * dataSourceOptions .MaxCacheTime > 0 {
57+ maxDuration = * dataSourceOptions .MaxCacheTime
58+ } else {
59+ maxDuration = 12
60+ }
61+ webIDCache := newWebIDCache (maxDuration )
62+ webCache := newCache [string , PiBatchData ]()
5663
57- // Create a new scheduler that will be used to clean the webIDCache every 60 minutes .
64+ // Create a new scheduler that will be used to clean the webIDCache every MaxCacheTime hours .
5865 scheduler := gocron .NewScheduler (time .UTC )
59- scheduler .Every (1 ).Hour ().Do (cleanWebIDCache , webIDCache )
66+ scheduler .Every (maxDuration ).Hour ().Do (cleanWebIDCache , webIDCache )
6067 scheduler .StartAsync ()
6168
6269 ds := & Datasource {
6370 settings : settings ,
6471 httpClient : httpClient ,
6572 webIDCache : webIDCache ,
73+ webCache : webCache ,
6674 scheduler : scheduler ,
6775 websocketConnectionsMutex : & sync.Mutex {},
6876 datasourceMutex : & sync.Mutex {},
@@ -79,7 +87,7 @@ func NewPIWebAPIDatasource(ctx context.Context, settings backend.DataSourceInsta
7987 // Create a new query mux and assign it to the datasource.
8088 ds .queryMux = ds .newQueryMux ()
8189
82- backend . Logger .Info ("NewPIWebAPIDatasource Created" )
90+ log . DefaultLogger .Info ("PIWebAPI Datasource Created" , "UID" , settings . UID , "Name" , settings . Name )
8391
8492 return ds , nil
8593}
@@ -103,8 +111,8 @@ func (d *Datasource) updateRate() {
103111 time .Sleep (time .Duration (d .callRate ) * time .Millisecond )
104112 }
105113
106- // reset every 5 minutes (300 s)
107- if time .Since (d .initalTime ).Seconds () > 300 {
114+ // reset every 10 minutes (600 s)
115+ if time .Since (d .initalTime ).Seconds () > 600 {
108116 d .initalTime = time .Now ()
109117 d .totalCalls = 1
110118 d .callRate = float64 (d .totalCalls ) / float64 (time .Now ().Unix ()- d .initalTime .Unix ())
@@ -132,34 +140,40 @@ func (d *Datasource) newQueryMux() *datasource.QueryTypeMux {
132140// The QueryDataResponse contains a map of RefID to the response for each query, and each response
133141// contains Frames ([]*Frame).
134142func (d * Datasource ) QueryData (ctx context.Context , req * backend.QueryDataRequest ) (* backend.QueryDataResponse , error ) {
135- // //TODO: Remove this debug information
136- // jsonReq, err := json.Marshal(req)
137- // if err != nil {
138- // return nil, fmt.Errorf("error marshaling QueryDataRequest: %v", err)
139- // }
140- // backend.Logger.Info("QueryDataRequest: ", "REQUEST", string(jsonReq))
141- // end remove this debug information
142143 // Pass the query to the query muxer.
143144 return d .queryMux .QueryData (ctx , req )
144145}
145146
146147// TODO: Missing functionality: Add Replace Bad Values
147148// QueryTSData is called by Grafana when a user executes a time series data query.
148149func (d * Datasource ) QueryTSData (ctx context.Context , req * backend.QueryDataRequest ) (* backend.QueryDataResponse , error ) {
149- processedPIWebAPIQueries := make (map [string ][]PiProcessedQuery )
150150 datasourceUID := req .PluginContext .DataSourceInstanceSettings .UID
151151
152+ // tracer
153+ ctx , span := tracing .DefaultTracer ().Start (
154+ ctx ,
155+ "New annotation query recieved" ,
156+ )
157+ defer span .End ()
158+
152159 // Process queries generic query objects and turn them into a suitable format for the PI Web API
153- for _ , q := range req .Queries {
154- processedPIWebAPIQueries [q .RefID ] = d .processQuery (q , datasourceUID )
155- }
160+ processedPIWebAPIQueries := d .processQuery (req .Queries , datasourceUID )
161+
162+ // span
163+ span .AddEvent ("Completed processing query request" )
156164
157165 // Send the queries to the PI Web API
158166 processedQueries_temp := d .batchRequest (ctx , processedPIWebAPIQueries )
159167
168+ // span
169+ span .AddEvent ("Completed processing batch request" )
170+
160171 // Convert the PI Web API response into Grafana frames
161172 response := d .processBatchtoFrames (processedQueries_temp )
162173
174+ // span
175+ span .AddEvent ("Completed processing batch to frames" )
176+
163177 // Update rate and do backpressure
164178 d .updateRate ()
165179
@@ -178,7 +192,6 @@ func (d *Datasource) QueryAnnotations(ctx context.Context, req *backend.QueryDat
178192 defer span .End ()
179193
180194 for _ , q := range req .Queries {
181-
182195 span .AddEvent ("Processing annotation query request" ,
183196 trace .WithAttributes (
184197 attribute .String ("query.ref_id" , q .RefID ),
@@ -188,7 +201,6 @@ func (d *Datasource) QueryAnnotations(ctx context.Context, req *backend.QueryDat
188201 ),
189202 )
190203
191- // backend.Logger.Info("Processing Annotation Query", "RefID", q.RefID)
192204 // Process the annotation query request, extracting only the useful information
193205 ProcessedAnnotationQuery := d .processAnnotationQuery (ctx , q )
194206 span .AddEvent ("Completed processing annotation query request" )
@@ -299,12 +311,13 @@ func (d *Datasource) CheckHealth(ctx context.Context, _ *backend.CheckHealthRequ
299311 }
300312 defer func () {
301313 if err := resp .Body .Close (); err != nil {
302- log .DefaultLogger .Error ("check health: failed to close response body" , "err" , err .Error ())
314+ log .DefaultLogger .Error ("PiWebAPI Check health: failed to close response body" , "err" , err .Error ())
303315 }
304316 }()
305317 if resp .StatusCode != http .StatusOK {
306318 return newHealthCheckErrorf ("got response code %d" , resp .StatusCode ), nil
307319 }
320+ // return good status
308321 return & backend.CheckHealthResult {
309322 Status : backend .HealthStatusOk ,
310323 Message : "Data source is working" ,
@@ -317,6 +330,25 @@ func newHealthCheckErrorf(format string, args ...interface{}) *backend.CheckHeal
317330 return & backend.CheckHealthResult {Status : backend .HealthStatusError , Message : fmt .Sprintf (format , args ... )}
318331}
319332
333+ // isUsingNewFormat checks whether the datasource is configured to use a new format.
334+ // This is determined by the NewFormat option in dataSourceOptions.
335+ // Returns true if NewFormat is set and enabled; otherwise, false.
320336func (d * Datasource ) isUsingNewFormat () bool {
321337 return d .dataSourceOptions .NewFormat != nil && * d .dataSourceOptions .NewFormat
322338}
339+
340+ // isUsingStreaming checks whether the datasource has streaming enabled in experimental mode.
341+ // This requires both the UseExperimental and UseStreaming options to be set and enabled.
342+ // Returns true if both options are enabled; otherwise, false.
343+ func (d * Datasource ) isUsingStreaming () bool {
344+ return d .dataSourceOptions .UseExperimental != nil && * d .dataSourceOptions .UseExperimental &&
345+ d .dataSourceOptions .UseStreaming != nil && * d .dataSourceOptions .UseStreaming
346+ }
347+
348+ // isUsingResponseCache checks if response caching is enabled in experimental mode for the datasource.
349+ // This requires both the UseExperimental and UseResponseCache options to be set and enabled.
350+ // Returns true if both options are enabled; otherwise, false.
351+ func (d * Datasource ) isUsingResponseCache () bool {
352+ return d .dataSourceOptions .UseExperimental != nil && * d .dataSourceOptions .UseExperimental &&
353+ d .dataSourceOptions .UseResponseCache != nil && * d .dataSourceOptions .UseResponseCache
354+ }
0 commit comments