Skip to content

Commit 45e8c00

Browse files
authored
Merge pull request #281 from Helene/opentsdb_arrays_true
Support OpenTSDB 2.4 /query/api JSON serializer
2 parents f527cee + 752fe5e commit 45e8c00

File tree

3 files changed

+80
-8
lines changed

3 files changed

+80
-8
lines changed

source/collector.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ def _get_sensor_labels(self):
205205

206206
@classattributes(dict(metricsaggr=None, filters=None, grouptags=None,
207207
start='', end='', nsamples=0, duration=0,
208-
dsBucketSize=0, dsOp='', rawData=False),
208+
dsBucketSize=0, dsOp='', rawData=False, dpsArrays=False),
209209
['sensor', 'period'])
210210
class QueryPolicy(object):
211211

@@ -372,10 +372,16 @@ def _collect(self):
372372
list(self.request.metricsaggr.keys())[0]))
373373
rows = res.downsampleResults(self.dsInterval, self.request.dsOp)
374374

375-
columnValues = defaultdict(dict)
376-
for row in rows:
377-
for value, columnInfo in zip(row.values, res.columnInfos):
378-
columnValues[columnInfo][row.tstamp] = value
375+
if self.request.dpsArrays:
376+
columnValues = defaultdict(list)
377+
for row in rows:
378+
for value, columnInfo in zip(row.values, res.columnInfos):
379+
columnValues[columnInfo].append([row.tstamp, value])
380+
else:
381+
columnValues = defaultdict(dict)
382+
for row in rows:
383+
for value, columnInfo in zip(row.values, res.columnInfos):
384+
columnValues[columnInfo][row.tstamp] = value
379385

380386
for columnInfo, dps in columnValues.items():
381387
ts = TimeSeries(columnInfo, dps, self.filtersMap, self.labels)

source/opentsdb.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
from collections import defaultdict
2828
from collector import SensorCollector, QueryPolicy
2929
from utils import getTimeMultiplier, execution_time, cond_execution_time
30-
from typing import List
30+
from typing import List, TypeVar
31+
32+
T = TypeVar('T', dict, list)
3133

3234

3335
class OpenTsdbApi(object):
@@ -158,6 +160,9 @@ def build_collector(self, jreq: dict) -> SensorCollector:
158160
args['filters'] = filters
159161
args['grouptags'] = grouptags
160162

163+
if 'arrays' in jreq:
164+
args['dpsArrays'] = jreq['arrays']
165+
161166
args['rawData'] = q.get('explicitTags', False)
162167

163168
args['sensor'] = sensor
@@ -379,7 +384,7 @@ def GET(self, **params):
379384
@cherrypy.config(**{'tools.json_in.force': False})
380385
@cherrypy.tools.json_in() # @UndefinedVariable
381386
@cherrypy.tools.json_out() # @UndefinedVariable
382-
def POST(self):
387+
def POST(self, **params):
383388
''' Process POST. tools.json_in.force is set to False for
384389
compatability between versions of grafana < 3 and version 3.'''
385390

@@ -399,6 +404,9 @@ def POST(self):
399404
self.logger.error(MSG['QueryError'].format('empty'))
400405
raise cherrypy.HTTPError(400, ERR[400])
401406

407+
if params and params.get('arrays') == 'true':
408+
jreq['arrays'] = True
409+
402410
return self.query(jreq)
403411

404412
def OPTIONS(self):
@@ -446,13 +454,15 @@ def __init__(self, inputQuery, showQuery=False,
446454
self.tags = tags or defaultdict(list)
447455
self.aggregatedTags = aggrTags or []
448456

449-
def to_dict(self, dps: dict = None):
457+
def to_dict(self, dps: T = None):
450458
''' Converts the SingleTimeSeriesResponse object to dict. '''
451459
res = self.__dict__
452460
# Since a single Timeseries might have a huge number of datapoints (dps),
453461
# first convert object to dict and then fetch the dict of dps to it
454462
if dps:
455463
res['dps'] = dps
464+
elif isinstance(dps, list):
465+
res['dps'] = []
456466
return res
457467

458468

tests/test_opentsdb.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,34 @@ def query_last_setup():
7676
'index': 0}}
7777

7878

79+
def query_arrays_setup():
80+
global key1, col1, labels, filtersMap, dps1, ts1, ts2, metricTS, metricTS1, data, data1, jreq
81+
82+
key1 = Key._from_string('scale-11|CPU|cpu_system', '')
83+
col1 = ColumnInfo(name='cpu_user', semType=1, keys=(key1,), column=0)
84+
filtersMap = [{'node': 'scale-11'}, {'node': 'scale-12'}, {'node': 'scale-13'}, {'node': 'scale-14'}, {'node': 'scale-15'}, {'node': 'scale-16'}]
85+
labels = ['node']
86+
dps1 = [[1739214990, 3], [1739215050, 2], [1739215110, 3], [1739215170, 4], [1739215230, 3]]
87+
dps2 = []
88+
ts1 = TimeSeries(col1, dps1, filtersMap, labels)
89+
ts2 = TimeSeries(col1, dps2, filtersMap, labels)
90+
metricTS = MetricTimeSeries('cpu_system', '')
91+
metricTS.timeseries = [ts1]
92+
data = {'cpu_user': metricTS}
93+
metricTS1 = MetricTimeSeries('cpu_system', '')
94+
metricTS1.timeseries = [ts2]
95+
data1 = {'cpu_user': metricTS1}
96+
jreq = {'start': 1739214930519, 'end': 1739215230519, 'arrays': True,
97+
'inputQuery': {'aggregator': 'noop', 'downsample': '1m-avg',
98+
'filters': [
99+
{'filter': 'scale-11', 'groupBy': False,
100+
'tagk': 'node', 'type': 'pm_filter'
101+
}],
102+
'metric': 'cpu_system', 'index': 0
103+
}
104+
}
105+
106+
79107
@with_setup(my_setup)
80108
def test_case01():
81109
ts = TimeSeries(col3, dps2, filtersMap, labels)
@@ -110,6 +138,7 @@ def test_case03():
110138
assert 'gpfs_fs_name' in resp[0].get('tags')
111139
assert 'node' in resp[0].get('tags')
112140
assert 'gpfs_cluster_name' in resp[0].get('tags')
141+
assert isinstance(resp[0].get('dps'), dict)
113142

114143

115144
@with_setup(my_setup)
@@ -139,3 +168,30 @@ def test_case05():
139168
assert resp[0].get('metric') == "cpu_user"
140169
assert 'gpfs_fs_name' not in resp[0].get('tags')
141170
assert 'node' in resp[0].get('tags')
171+
172+
173+
@with_setup(query_arrays_setup)
174+
def test_case06():
175+
with mock.patch('source.metadata.MetadataHandler') as md:
176+
md_instance = md.return_value
177+
logger = logging.getLogger(__name__)
178+
opentsdb = OpenTsdbApi(logger, md_instance, '9999')
179+
resp = opentsdb.format_response(data, jreq)
180+
assert set(resp[0].keys()) == set(['metric', 'dps', 'tags', 'aggregatedTags'])
181+
assert resp[0].get('metric') == "cpu_system"
182+
assert 'node' in resp[0].get('tags')
183+
assert isinstance(resp[0].get('dps'), list)
184+
185+
186+
@with_setup(query_arrays_setup)
187+
def test_case07():
188+
with mock.patch('source.metadata.MetadataHandler') as md:
189+
md_instance = md.return_value
190+
logger = logging.getLogger(__name__)
191+
opentsdb = OpenTsdbApi(logger, md_instance, '9999')
192+
resp = opentsdb.format_response(data1, jreq)
193+
assert set(resp[0].keys()) == set(['metric', 'dps', 'tags', 'aggregatedTags'])
194+
assert resp[0].get('metric') == "cpu_system"
195+
assert 'node' in resp[0].get('tags')
196+
assert isinstance(resp[0].get('dps'), list)
197+
assert len(resp[0].get('dps')) == 0

0 commit comments

Comments
 (0)