Skip to content

Commit 4186f83

Browse files
susodapopNicolasLMwwl717195673
authored
New ElasticSearch Query Runner (#5794)
- A runner supporting the newest versions of ES, aggregation, nested aggregations and nested fields. - A runner for the SQL OpenDistro flavor - A runner for the SQL X-Pack flavor Co-authored-by: Nicolas Le Manchet <[email protected]> Co-authored-by: wwl717195673 <[email protected]>
1 parent 0712abb commit 4186f83

File tree

8 files changed

+589
-2
lines changed

8 files changed

+589
-2
lines changed
16.2 KB
Loading
16.2 KB
Loading
16.2 KB
Loading

redash/query_runner/amazon_elasticsearch.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .elasticsearch import ElasticSearch
1+
from .elasticsearch2 import ElasticSearch2
22
from . import register
33

44
try:
@@ -10,7 +10,7 @@
1010
enabled = False
1111

1212

13-
class AmazonElasticsearchService(ElasticSearch):
13+
class AmazonElasticsearchService(ElasticSearch2):
1414
@classmethod
1515
def name(cls):
1616
return "Amazon Elasticsearch Service"

redash/query_runner/elasticsearch.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
class BaseElasticSearch(BaseQueryRunner):
4545
should_annotate_query = False
4646
DEBUG_ENABLED = False
47+
deprecated=True
4748

4849
@classmethod
4950
def configuration_schema(cls):
Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
import logging
2+
from typing import Tuple, Optional
3+
4+
from redash.query_runner import *
5+
from redash.utils import json_dumps, json_loads
6+
7+
8+
logger = logging.getLogger(__name__)
9+
10+
ELASTICSEARCH_TYPES_MAPPING = {
11+
"integer": TYPE_INTEGER,
12+
"long": TYPE_INTEGER,
13+
"float": TYPE_FLOAT,
14+
"double": TYPE_FLOAT,
15+
"boolean": TYPE_BOOLEAN,
16+
"string": TYPE_STRING,
17+
"date": TYPE_DATE,
18+
"object": TYPE_STRING,
19+
}
20+
21+
22+
TYPES_MAP = {
23+
str: TYPE_STRING,
24+
int: TYPE_INTEGER,
25+
float: TYPE_FLOAT,
26+
bool: TYPE_BOOLEAN,
27+
}
28+
29+
30+
class ElasticSearch2(BaseHTTPQueryRunner):
31+
32+
should_annotate_query = False
33+
34+
@classmethod
35+
def name(cls):
36+
return "ElasticSearch"
37+
38+
def __init__(self, *args, **kwargs):
39+
super().__init__(*args, **kwargs)
40+
self.syntax = 'json'
41+
42+
def get_response(self, url, auth=None, http_method='get', **kwargs):
43+
url = "{}{}".format(self.configuration["url"], url)
44+
headers = kwargs.pop('headers', {})
45+
headers['Accept'] = 'application/json'
46+
return super().get_response(url, auth, http_method, headers=headers, **kwargs)
47+
48+
def test_connection(self):
49+
_, error = self.get_response("/_cluster/health")
50+
if error is not None:
51+
raise Exception(error)
52+
53+
def run_query(self, query, user):
54+
query, url, result_fields = self._build_query(query)
55+
response, error = self.get_response(
56+
url,
57+
http_method='post',
58+
json=query
59+
)
60+
query_results = response.json()
61+
data = self._parse_results(result_fields, query_results)
62+
error = None
63+
json_data = json_dumps(data)
64+
return json_data, error
65+
66+
def _build_query(self, query: str) -> Tuple[dict, str, Optional[list]]:
67+
query = json_loads(query)
68+
index_name = query.pop('index', '')
69+
result_fields = query.pop('result_fields', None)
70+
url = "/{}/_search".format(index_name)
71+
return query, url, result_fields
72+
73+
@classmethod
74+
def _parse_mappings(cls, mappings_data: dict):
75+
mappings = {}
76+
77+
def _parse_properties(prefix: str, properties: dict):
78+
for property_name, property_data in properties.items():
79+
if property_name not in mappings:
80+
property_type = property_data.get('type', None)
81+
nested_properties = property_data.get('properties', None)
82+
if property_type:
83+
mappings[index_name][prefix + property_name] = (
84+
ELASTICSEARCH_TYPES_MAPPING.get(property_type, TYPE_STRING)
85+
)
86+
elif nested_properties:
87+
new_prefix = prefix + property_name + '.'
88+
_parse_properties(new_prefix, nested_properties)
89+
90+
for index_name in mappings_data:
91+
mappings[index_name] = {}
92+
index_mappings = mappings_data[index_name]
93+
try:
94+
for m in index_mappings.get("mappings", {}):
95+
_parse_properties('', index_mappings['mappings'][m]['properties'])
96+
except KeyError:
97+
_parse_properties('', index_mappings['mappings']['properties'])
98+
99+
return mappings
100+
101+
def get_mappings(self):
102+
response, error = self.get_response('/_mappings')
103+
return self._parse_mappings(response.json())
104+
105+
def get_schema(self, *args, **kwargs):
106+
schema = {}
107+
for name, columns in self.get_mappings().items():
108+
schema[name] = {
109+
'name': name,
110+
'columns': list(columns.keys())
111+
}
112+
return list(schema.values())
113+
114+
@classmethod
115+
def _parse_results(cls, result_fields, raw_result):
116+
result_columns = []
117+
result_rows = []
118+
result_columns_index = {c["name"]: c for c in result_columns}
119+
result_fields_index = {}
120+
121+
def add_column_if_needed(column_name, value=None):
122+
if column_name not in result_columns_index:
123+
result_columns.append({
124+
'name': column_name,
125+
'friendly_name': column_name,
126+
'type': TYPES_MAP.get(type(value), TYPE_STRING)
127+
})
128+
result_columns_index[column_name] = result_columns[-1]
129+
130+
def get_row(rows, row):
131+
if row is None:
132+
row = {}
133+
rows.append(row)
134+
return row
135+
136+
def collect_value(row, key, value):
137+
if result_fields and key not in result_fields_index:
138+
return
139+
140+
add_column_if_needed(key, value)
141+
row[key] = value
142+
143+
def parse_bucket_to_row(data, row, agg_key):
144+
sub_agg_key = ""
145+
for key, item in data.items():
146+
if key == 'key_as_string':
147+
continue
148+
if key == 'key':
149+
if 'key_as_string' in data:
150+
collect_value(row, agg_key, data['key_as_string'])
151+
else:
152+
collect_value(row, agg_key, data['key'])
153+
continue
154+
155+
if isinstance(item, (str, int, float)):
156+
collect_value(row, agg_key + '.' + key, item)
157+
elif isinstance(item, dict):
158+
if 'buckets' not in item:
159+
for sub_key, sub_item in item.items():
160+
collect_value(
161+
row,
162+
agg_key + '.' + key + '.' + sub_key,
163+
sub_item,
164+
)
165+
else:
166+
sub_agg_key = key
167+
168+
return sub_agg_key
169+
170+
def parse_buckets_list(rows, parent_key, data, row, depth):
171+
if len(rows) > 0 and depth == 0:
172+
row = rows.pop()
173+
174+
for value in data:
175+
row = row.copy()
176+
sub_agg_key = parse_bucket_to_row(value, row, parent_key)
177+
178+
if sub_agg_key == "":
179+
rows.append(row)
180+
else:
181+
depth += 1
182+
parse_buckets_list(rows, sub_agg_key, value[sub_agg_key]['buckets'], row, depth)
183+
184+
def collect_aggregations(rows, parent_key, data, row, depth):
185+
row = get_row(rows, row)
186+
parse_bucket_to_row(data, row, parent_key)
187+
188+
if 'buckets' in data:
189+
parse_buckets_list(rows, parent_key, data['buckets'], row, depth)
190+
191+
return None
192+
193+
def get_flatten_results(dd, separator='.', prefix=''):
194+
if isinstance(dd, dict):
195+
return {
196+
prefix + separator + k if prefix else k: v
197+
for kk, vv in dd.items()
198+
for k, v in get_flatten_results(vv, separator, kk).items()
199+
}
200+
elif isinstance(dd, list) and len(dd) == 1:
201+
return {prefix: dd[0]}
202+
else:
203+
return {prefix: dd}
204+
205+
if result_fields:
206+
for r in result_fields:
207+
result_fields_index[r] = None
208+
209+
if 'error' in raw_result:
210+
error = raw_result['error']
211+
if len(error) > 10240:
212+
error = error[:10240] + '... continues'
213+
214+
raise Exception(error)
215+
elif 'aggregations' in raw_result:
216+
for key, data in raw_result["aggregations"].items():
217+
collect_aggregations(result_rows, key, data, None, 0)
218+
219+
elif 'hits' in raw_result and 'hits' in raw_result['hits']:
220+
for h in raw_result["hits"]["hits"]:
221+
row = {}
222+
223+
fields_parameter_name = "_source" if "_source" in h else "fields"
224+
for column in h[fields_parameter_name]:
225+
if result_fields and column not in result_fields_index:
226+
continue
227+
228+
unested_results = get_flatten_results({column: h[fields_parameter_name][column]})
229+
230+
for column_name, value in unested_results.items():
231+
add_column_if_needed(column_name, value=value)
232+
row[column_name] = value
233+
234+
result_rows.append(row)
235+
else:
236+
raise Exception("Redash failed to parse the results it got from Elasticsearch.")
237+
238+
return {
239+
'columns': result_columns,
240+
'rows': result_rows
241+
}
242+
243+
244+
class OpenDistroSQLElasticSearch(ElasticSearch2):
245+
246+
def __init__(self, *args, **kwargs):
247+
super().__init__(*args, **kwargs)
248+
self.syntax = 'sql'
249+
250+
def _build_query(self, query: str) -> Tuple[dict, str, Optional[list]]:
251+
sql_query = {
252+
'query': query
253+
}
254+
sql_query_url = '/_opendistro/_sql'
255+
return sql_query, sql_query_url, None
256+
257+
@classmethod
258+
def name(cls):
259+
return cls.__name__
260+
261+
@classmethod
262+
def type(cls):
263+
return "elasticsearch2_OpenDistroSQLElasticSearch"
264+
265+
266+
267+
class XPackSQLElasticSearch(ElasticSearch2):
268+
269+
def __init__(self, *args, **kwargs):
270+
super().__init__(*args, **kwargs)
271+
self.syntax = 'sql'
272+
273+
def _build_query(self, query: str) -> Tuple[dict, str, Optional[list]]:
274+
sql_query = {
275+
'query': query
276+
}
277+
sql_query_url = '/_xpack/sql'
278+
return sql_query, sql_query_url, None
279+
280+
@classmethod
281+
def _parse_results(cls, result_fields, raw_result):
282+
error = raw_result.get('error')
283+
if error:
284+
raise Exception(error)
285+
286+
rv = {
287+
'columns': [
288+
{
289+
'name': c['name'],
290+
'friendly_name': c['name'],
291+
'type': ELASTICSEARCH_TYPES_MAPPING.get(c['type'], 'string')
292+
} for c in raw_result['columns']
293+
],
294+
'rows': []
295+
}
296+
query_results_rows = raw_result['rows']
297+
298+
for query_results_row in query_results_rows:
299+
result_row = dict()
300+
for column, column_value in zip(rv['columns'], query_results_row):
301+
result_row[column['name']] = column_value
302+
rv['rows'].append(result_row)
303+
304+
return rv
305+
306+
@classmethod
307+
def name(cls):
308+
return cls.__name__
309+
310+
@classmethod
311+
def type(cls):
312+
return "elasticsearch2_XPackSQLElasticSearch"
313+
314+
315+
316+
register(ElasticSearch2)
317+
register(OpenDistroSQLElasticSearch)
318+
register(XPackSQLElasticSearch)

redash/settings/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ def email_server_is_configured():
345345
"redash.query_runner.url",
346346
"redash.query_runner.influx_db",
347347
"redash.query_runner.elasticsearch",
348+
"redash.query_runner.elasticsearch2",
348349
"redash.query_runner.amazon_elasticsearch",
349350
"redash.query_runner.trino",
350351
"redash.query_runner.presto",

0 commit comments

Comments
 (0)