Skip to content

Commit e9d3457

Browse files
committed
QueryHandler improvements
Some caching/memoization was added, main changes were to intermediate data structure to avoid O(n^2) lookups in that case. Updated Topo parsing as well, changed components from list to set thus avoiding duplicates and linear searches. Filters tried to avoid duplicates which had a linear search which turned out to be unnecessary.
1 parent 0896f15 commit e9d3457

File tree

3 files changed

+118
-137
lines changed

3 files changed

+118
-137
lines changed

source/queryHandler/QueryHandler.py

Lines changed: 75 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
@author: NSCHULD
2121
'''
2222

23-
from collections import namedtuple, defaultdict
24-
import inspect
25-
from itertools import chain
2623
import json
2724
import operator
2825
import socket
2926
import time
27+
from collections import namedtuple, defaultdict
28+
import inspect
29+
from itertools import chain
30+
from typing import NamedTuple, Optional, Tuple
31+
3032
from .PerfmonRESTclient import perfHTTPrequestHelper, createRequestDataObj, getAuthHandler
3133

3234

@@ -53,10 +55,45 @@ def is_empty(self):
5355
# return len(self.values) == self.values.count(None)
5456

5557

56-
class ColumnInfo(namedtuple('_ColumnInfo', 'name, semType, keys, column')):
57-
'''header data for each column string, int, [KEY], int '''
58+
class Key(namedtuple('_Key', 'parent, sensor, identifier, metric, domains')):
59+
'''data structure of a Key string, string, tuple of strings (can be empty), string, tuple of domains
60+
describing parent (node or cluster), sensor and metric for the data
61+
as well as the identifier if the metric applies to multiple items.
62+
Also joined are the aggregation domains of that key which describe the effective bucket size'''
5863
__slots__ = ()
5964

65+
@classmethod
66+
def _from_string(cls, key, domains):
67+
'''Split a string like node.localnet.com|Network|eth0|netdev_bytes_s to its consisting parts'''
68+
items = key.split('|')
69+
return Key(items[0], items[1], tuple(items[2:-1]), items[-1], domains)
70+
71+
def __str__(self):
72+
return '|'.join([self.parent, self.sensor, '|'.join(self.identifier), self.metric]).replace('||', '|')
73+
74+
def shortKey_str(self):
75+
return '|'.join([self.parent, self.sensor, '|'.join(self.identifier)])
76+
77+
def __repr__(self):
78+
return self.__str__()
79+
80+
def __hash__(self):
81+
return hash((self.parent, self.sensor, self.identifier, self.metric))
82+
83+
def __eq__(self, other):
84+
return (self.parent, self.sensor, self.identifier, self.metric) == (other.parent, other.sensor, other.identifier, other.metric)
85+
86+
def __ne__(self, other):
87+
return not(self == other)
88+
89+
90+
class ColumnInfo(NamedTuple):
91+
'''header data for each column string, int, [KEY], int '''
92+
name: str
93+
semType: int
94+
keys: Tuple[Key]
95+
column: int
96+
6097
@property
6198
def key_str(self):
6299
'''string which includes all keys'''
@@ -82,50 +119,18 @@ def parents(self):
82119
def flat_keys(self):
83120
'''for computed columns we have multiple lists of keys, flatten it to a simple list'''
84121
if len(self.keys) > 1 or isinstance(self.keys[0], list):
85-
flat_keys = list(chain.from_iterable(self.keys))
122+
flat_keys = tuple(chain.from_iterable(self.keys))
86123
if not isinstance(flat_keys[0], Key):
87124
flat_keys = self.keys
88125
else:
89126
flat_keys = self.keys
90127
return flat_keys
91128

92129
def __hash__(self):
93-
return hash((self.name, self.key_str))
94-
95-
def __eq__(self, other):
96-
return (self.name, self.key_str) == (other.name, other.key_str)
97-
98-
def __ne__(self, other):
99-
return not(self == other)
100-
101-
102-
class Key(namedtuple('_Key', 'parent, sensor, identifier, metric, domains')):
103-
'''data structure of a Key string, string, tuple of strings (can be empty), string, tuple of domains
104-
describing parent (node or cluster), sensor and metric for the data
105-
as well as the identifier if the metric applies to multiple items.
106-
Also joined are the aggregation domains of that key which describe the effective bucket size'''
107-
__slots__ = ()
108-
109-
@classmethod
110-
def _from_string(cls, key, domains):
111-
'''Split a string like node.localnet.com|Network|eth0|netdev_bytes_s to its consisting parts'''
112-
items = key.split('|')
113-
return Key(items[0], items[1], tuple(items[2:-1]), items[-1], domains)
114-
115-
def __str__(self):
116-
return '|'.join([self.parent, self.sensor, '|'.join(self.identifier), self.metric]).replace('||', '|')
117-
118-
def shortKey_str(self):
119-
return '|'.join([self.parent, self.sensor, '|'.join(self.identifier)])
120-
121-
def __repr__(self):
122-
return self.__str__()
123-
124-
def __hash__(self):
125-
return hash((self.parent, self.sensor, self.identifier, self.metric))
130+
return hash((self.name, self.keys))
126131

127132
def __eq__(self, other):
128-
return (self.parent, self.sensor, self.identifier, self.metric) == (other.parent, other.sensor, other.identifier, other.metric)
133+
return (self.name, self.keys) == (other.name, other.keys)
129134

130135
def __ne__(self, other):
131136
return not(self == other)
@@ -162,6 +167,7 @@ def __init__(self, query, res_json):
162167
self.ids = self._findIdentifiers()
163168

164169
if self.query and len(self.query.measurements) > 0:
170+
self._populate_index_cache()
165171
self._add_calculated_colunm_headers()
166172

167173
calc = Calculator()
@@ -196,32 +202,30 @@ def __parseRows(self):
196202
return [Row(**item) for item in self.json['rows']]
197203

198204
def _findIdentifiers(self):
199-
ids = [] # not a set or dict because order matters
205+
ids = {} # using dict as a ordered set, order matters!
200206
for ci in self.columnInfos:
201207
p = set(key.parent for key in ci.keys)
202208
if len(p) == 1:
203209
parents = p.pop()
204210
else:
205211
parents = tuple(p)
206-
207212
id_item = (parents, ci.identifiers)
208-
if id_item not in ids:
209-
ids.append(id_item)
210-
return ids
213+
ids[id_item] = 1
214+
return ids.keys()
211215

212216
def _add_calculated_colunm_headers(self):
213217
'''for each measurement create a result column for each ID '''
218+
nextidx = max(ci.column for ci in self.columnInfos)
219+
214220
for q_name, prg in self.query.measurements.items():
221+
metrics = [step for step in prg if step not in Calculator.OPS and not step.isnumeric()]
215222
for parent, myid in self.ids:
216223
key_aq = []
217-
for step in prg:
218-
if step not in Calculator.OPS and not is_number(step):
219-
metric = step
220-
idx = self._index_by_metric_id(metric, parent, myid)
221-
if idx != -1:
222-
# key_aq.append([key for key in self.columnInfos[idx].keys if key])
223-
key_aq.extend(key for key in self.columnInfos[idx].keys if key)
224-
nextidx = max(ci.column for ci in self.columnInfos) + 1
224+
for metric in metrics:
225+
idx = self.index_cache.get((metric, parent, myid), -1)
226+
if idx != -1:
227+
key_aq.extend(key for key in self.columnInfos[idx].keys if key)
228+
nextidx += 1
225229
self.columnInfos.append(ColumnInfo(q_name, 15, (tuple(key_aq)), nextidx))
226230

227231
def _add_calculated_row_data(self, calc, row):
@@ -232,10 +236,10 @@ def _add_calculated_row_data(self, calc, row):
232236
for step in prg:
233237
if step in Calculator.OPS:
234238
calc.op(step)
235-
elif is_number(step):
239+
elif step.isnumeric(): #is_number(step):
236240
calc.push(float(step))
237241
else:
238-
idx = self._index_by_metric_id(step, parent, myid)
242+
idx = self.index_cache.get((step, parent, myid), -1)
239243
if idx != -1:
240244
value = row.values[idx]
241245
if value is None:
@@ -251,18 +255,12 @@ def _add_calculated_row_data(self, calc, row):
251255
def __getitem__(self, index):
252256
return self.rows[index]
253257

254-
def _index_by_metric_id(self, metric, parent, identifier):
255-
'''get index to columInfo / values for a given metric and identifier'''
256-
idx = self.index_cache.get((metric, parent, identifier), -1)
257-
if idx != -1:
258-
return idx
259-
258+
def _populate_index_cache(self):
260259
for ci in self.columnInfos:
261-
# check for k.name too? handle parent?
262-
if ci.keys[0].metric == metric and ci.identifiers == identifier and ci.parents == parent:
263-
self.index_cache[(metric, parent, identifier)] = ci.column
264-
return ci.column
265-
return -1
260+
key = (ci.keys[0].metric, ci.parents, ci.identifiers)
261+
if key in self.index_cache:
262+
SysmonLogger.getLogger(__name__).error("hash collision in _populate_index_cache")
263+
self.index_cache[key] = ci.column
266264

267265
def drop_base_metrics(self):
268266
'''remove all headers and data columns which were used to compute a measurement'''
@@ -303,9 +301,9 @@ def downsampleResults(self, interval, aggregator='avg'):
303301
''' Performs downsampling of QueryResult.rows with specified aggregation method and interval'''
304302
try:
305303
func = __builtins__[aggregator]
306-
return self.__downsample(func, interval)
304+
return self.__downsample(func, int(interval))
307305
except Exception:
308-
return self.__downsample(self.dAVG, interval)
306+
return self.__downsample(self.dAVG, int(interval))
309307

310308
def max(self, column):
311309
''' get maximum value of a column'''
@@ -339,7 +337,7 @@ def __colstat(self, column, fn, reverse=False):
339337
else:
340338
data = (row.values[idx] for row in self.rows)
341339
try:
342-
return fn(list(filter(lambda x: x is not None, data)))
340+
return fn([x for x in data if x is not None])
343341
except Exception:
344342
return None
345343

@@ -357,7 +355,7 @@ def __downsample(self, fn, interval, column='all'):
357355
if len(column_values) == column_values.count(None):
358356
aggr_value = None
359357
else:
360-
aggr_value = fn(list(filter(lambda x: x is not None, column_values)))
358+
aggr_value = fn([x for x in column_values if x is not None])
361359
except Exception:
362360
aggr_value = None
363361
aggr_values[idx] = aggr_value
@@ -385,9 +383,7 @@ def div(a, b): # defined anew because of py 2/3 difference
385383

386384
class Calculator(object):
387385
'''simple UPN calculator'''
388-
389-
OPS = {"+": operator.add, "-": operator.sub, '*': operator.mul, '/': div,
390-
">=": operator.ge, ">": operator.gt, "<=": operator.le, "<": operator.lt, "==": operator.eq}
386+
OPS = {"+": operator.add, "-": operator.sub, '*': operator.mul, '/': div, ">=": operator.ge, ">": operator.gt, "<=": operator.le, "<": operator.lt, "==": operator.eq }
391387

392388
def __init__(self):
393389
self.stack = []
@@ -396,38 +392,19 @@ def push(self, arg):
396392
self.stack.append(arg)
397393
return self
398394

399-
def peek(self):
400-
return self.stack[0]
401-
402395
def pop(self):
403396
return self.stack.pop()
404397

405398
def clear(self):
406-
del self.stack[:]
399+
self.stack.clear()
407400

408401
def op(self, operation):
409-
if (operation in Calculator.OPS):
410-
operation = Calculator.OPS[operation]
402+
operation = Calculator.OPS[operation]
411403

412-
if (inspect.isbuiltin(operation)):
413-
if operation.__name__ in ('neg', 'pos', 'abs', 'not_', 'inv'):
414-
ary = 1
415-
else:
416-
ary = 2
417-
else:
418-
ary = len(inspect.getargspec(operation)[0])
419-
420-
if ary == 1:
421-
a = self.stack.pop()
422-
c = operation(a)
423-
self.push(c)
424-
elif ary == 2:
425-
a = self.stack.pop()
426-
b = self.stack.pop()
427-
c = operation(b, a)
428-
self.push(c)
429-
else:
430-
raise ValueError('unknown operator')
404+
a = self.stack.pop()
405+
b = self.stack.pop()
406+
c = operation(b, a)
407+
self.push(c)
431408
return self
432409

433410

0 commit comments

Comments
 (0)