@@ -54,15 +54,29 @@ def TOPO(self):
5454 def format_response (self , data : dict , jreq : dict ) -> List [dict ]:
5555 respList = []
5656 metrics = set (data .values ())
57- for metric in metrics :
58- for st in metric .timeseries :
59- res = SingleTimeSeriesResponse (jreq .get ('inputQuery' ),
60- jreq .get ('showQuery' ),
61- jreq .get ('globalAnnotations' ),
62- st .tags , st .aggregatedTags )
63- # self.logger.trace(f'OpenTSDB queryResponse for :
64- # {data.keys()[0]} with {len(st.dps)} datapoints')
65- respList .append (res .to_dict (st .dps ))
57+ if jreq .get ('start' ) == 'last' :
58+ for metric in metrics :
59+ for st in metric .timeseries :
60+ timestmp = ''
61+ val = 'null'
62+ if len (st .dps ) > 0 :
63+ timestmp = list (st .dps .keys ())[0 ]
64+ val = st .dps [timestmp ]
65+ res = LastSingleTimeSeriesResponse (jreq .get ('inputQuery' ),
66+ timestmp ,
67+ val ,
68+ st .tags )
69+ respList .append (res .to_dict ())
70+ else :
71+ for metric in metrics :
72+ for st in metric .timeseries :
73+ res = SingleTimeSeriesResponse (jreq .get ('inputQuery' ),
74+ jreq .get ('showQuery' ),
75+ jreq .get ('globalAnnotations' ),
76+ st .tags , st .aggregatedTags )
77+ # self.logger.trace(f'OpenTSDB queryResponse for :
78+ # {data.keys()[0]} with {len(st.dps)} datapoints')
79+ respList .append (res .to_dict (st .dps ))
6680 return respList
6781
6882 @execution_time ()
@@ -115,29 +129,34 @@ def build_collector(self, jreq: dict) -> SensorCollector:
115129
116130 q = jreq .get ('inputQuery' )
117131
118- period = self .md .getSensorPeriodForMetric (q .get ('metric' ))
132+ sensor = self .TOPO .getSensorForMetric (q .get ('metric' ))
133+ period = self .md .getSensorPeriod (sensor )
119134 if period < 1 :
120135 self .logger .error (MSG ['SensorDisabled' ].format (q .get ('metric' )))
121136 raise cherrypy .HTTPError (
122137 400 , MSG ['SensorDisabled' ].format (q .get ('metric' )))
123138
124- sensor = self .TOPO .getSensorForMetric (q .get ('metric' ))
125-
126139 args = {}
127140 args ['metricsaggr' ] = {q .get ('metric' ): q .get ('aggregator' )}
128- args ['start' ] = str (int (int (str (jreq .get ('start' ))) / 1000 ))
129- if jreq .get ('end' ) is not None :
130- args ['end' ] = str (int (int (str (jreq .get ('end' ))) / 1000 ))
131141
132- if q .get ('downsample' ):
133- args ['dsOp' ] = self ._get_downsmpl_op (q .get ('downsample' ))
134- args ['dsBucketSize' ] = self ._calc_bucket_size (q .get ('downsample' ))
142+ if jreq .get ('start' ) == 'last' :
143+ args ['nsamples' ] = 1
144+ if q .get ('tags' ):
145+ args ['filters' ] = q .get ('tags' )
146+ else :
147+ args ['start' ] = str (int (int (str (jreq .get ('start' ))) / 1000 ))
148+ if jreq .get ('end' ) is not None :
149+ args ['end' ] = str (int (int (str (jreq .get ('end' ))) / 1000 ))
150+
151+ if q .get ('downsample' ):
152+ args ['dsOp' ] = self ._get_downsmpl_op (q .get ('downsample' ))
153+ args ['dsBucketSize' ] = self ._calc_bucket_size (q .get ('downsample' ))
135154
136- if q .get ('filters' ):
137- filters , grouptags = self ._parse_input_query_filters (
138- q .get ('filters' ))
139- args ['filters' ] = filters
140- args ['grouptags' ] = grouptags
155+ if q .get ('filters' ):
156+ filters , grouptags = self ._parse_input_query_filters (
157+ q .get ('filters' ))
158+ args ['filters' ] = filters
159+ args ['grouptags' ] = grouptags
141160
142161 args ['rawData' ] = q .get ('explicitTags' , False )
143162
@@ -296,6 +315,38 @@ def GET(self, **params):
296315 elif 'lookup' in cherrypy .request .script_name :
297316 resp = self .lookup (params )
298317
318+ # /api/query/last
319+ elif '/api/query/last' == cherrypy .request .script_name :
320+ jreq = {}
321+
322+ if params .get ('timeseries' ) is None :
323+ self .logger .error (MSG ['QueryError' ].format ('empty' ))
324+ raise cherrypy .HTTPError (400 , ERR [400 ])
325+
326+ queries = []
327+ timeseries = params .get ('timeseries' )
328+ if not isinstance (timeseries , list ):
329+ timeseries = [timeseries ]
330+ for timeserie in timeseries :
331+ try :
332+ metricDict = {}
333+ params_list = re .split (r'\{(.*)\}' , timeserie .strip ())
334+ metricDict ['metric' ] = params_list [0 ]
335+
336+ if len (params_list ) > 1 :
337+ attr = params_list [1 ]
338+ filterBy = dict (x .split ('=' ) for x in attr .split (',' ))
339+ metricDict ['tags' ] = filterBy
340+ queries .append (metricDict )
341+
342+ except Exception as e :
343+ self .logger .exception (MSG ['IntError' ].format (str (e )))
344+ raise cherrypy .HTTPError (500 , MSG [500 ])
345+ jreq ['start' ] = 'last'
346+ jreq ['queries' ] = queries
347+
348+ resp = self .query (jreq )
349+
299350 elif 'aggregators' in cherrypy .request .script_name :
300351 resp = ["noop" , "sum" , "avg" , "max" , "min" , "rate" ]
301352
@@ -336,7 +387,7 @@ def POST(self):
336387 raise cherrypy .HTTPError (400 , ERR [400 ])
337388
338389 # /api/query
339- if 'query' in cherrypy .request .script_name :
390+ if '/api/ query' == cherrypy .request .script_name :
340391
341392 # read query request parameters
342393 jreq = cherrypy .request .json
@@ -399,3 +450,17 @@ def to_dict(self, dps: dict = None):
399450 if dps :
400451 res ['dps' ] = dps
401452 return res
453+
454+
455+ class LastSingleTimeSeriesResponse (object ):
456+
457+ def __init__ (self , inputQuery , timestmp , value , tags : dict = None ):
458+ self .metric = inputQuery .get ('metric' )
459+ self .timestamp = timestmp
460+ self .value = value
461+ self .tags = tags or defaultdict (list )
462+
463+ def to_dict (self ):
464+ ''' Converts the LastSingleTimeSeriesResponse object to dict. '''
465+ res = self .__dict__
466+ return res
0 commit comments