Skip to content

Commit 9abc4f5

Browse files
authored
Clickhouse: Multi-statements support (#5792)
ClickHouse query runner splits query into several and execute each query in turn. The result of the last execution is returned. Implementation uses ClickHouse sessions in the HTTP protocol. `session_id` is generated for the first query and then it is used with the subsequent queries (together with the `session_check` parameter). If query runner gets a success response with empty body from ClickHouse (for example, in case of temporary table creation request) query runner returns empty response. authored-by: Liubov Ulitina <[email protected]>
1 parent f0a390b commit 9abc4f5

File tree

2 files changed

+257
-20
lines changed

2 files changed

+257
-20
lines changed

redash/query_runner/clickhouse.py

Lines changed: 65 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
import logging
22
import re
33
from urllib.parse import urlparse
4+
from uuid import uuid4
45

56
import requests
67

78
from redash.query_runner import *
9+
from redash.query_runner import split_sql_statements
810
from redash.utils import json_dumps, json_loads
911

1012
logger = logging.getLogger(__name__)
1113

1214

15+
def split_multi_query(query):
16+
return [st for st in split_sql_statements(query) if st != ""]
17+
18+
1319
class ClickHouse(BaseSQLQueryRunner):
1420
noop_query = "SELECT 1"
1521

@@ -87,25 +93,41 @@ def _get_tables(self, schema):
8793

8894
return list(schema.values())
8995

90-
def _send_query(self, data, stream=False):
96+
def _send_query(self, data, session_id=None, session_check=None):
9197
url = self.configuration.get("url", "http://127.0.0.1:8123")
98+
timeout = self.configuration.get("timeout", 30)
99+
100+
params = {
101+
"user": self.configuration.get("user", "default"),
102+
"password": self.configuration.get("password", ""),
103+
"database": self.configuration["dbname"],
104+
"default_format": "JSON",
105+
}
106+
107+
if session_id:
108+
params["session_id"] = session_id
109+
params["session_check"] = "1" if session_check else "0"
110+
params["session_timeout"] = timeout
111+
92112
try:
93113
verify = self.configuration.get("verify", True)
94114
r = requests.post(
95115
url,
96-
data=data.encode("utf-8","ignore"),
97-
stream=stream,
98-
timeout=self.configuration.get("timeout", 30),
99-
params={
100-
"user": self.configuration.get("user", "default"),
101-
"password": self.configuration.get("password", ""),
102-
"database": self.configuration["dbname"],
103-
},
116+
data=data.encode("utf-8", "ignore"),
117+
stream=False,
118+
timeout=timeout,
119+
params=params,
104120
verify=verify,
105121
)
122+
106123
if r.status_code != 200:
107124
raise Exception(r.text)
108-
# logging.warning(r.json())
125+
126+
# In certain situations the response body can be empty even if the query was successful, for example
127+
# when creating temporary tables.
128+
if not r.text:
129+
return {}
130+
109131
return r.json()
110132
except requests.RequestException as e:
111133
if e.response:
@@ -133,14 +155,19 @@ def _define_column_type(column):
133155
else:
134156
return TYPE_STRING
135157

136-
def _clickhouse_query(self, query):
158+
def _clickhouse_query(self, query, session_id=None, session_check=None):
159+
logger.debug("Clickhouse is about to execute query: %s", query)
160+
137161
query += "\nFORMAT JSON"
138-
result = self._send_query(query)
162+
163+
response = self._send_query(query, session_id, session_check)
164+
139165
columns = []
140166
columns_int64 = [] # db converts value to string if its type equals UInt64
141167
columns_totals = {}
142168

143-
for r in result["meta"]:
169+
meta = response.get("meta", [])
170+
for r in meta:
144171
column_name = r["name"]
145172
column_type = self._define_column_type(r["type"])
146173

@@ -155,31 +182,49 @@ def _clickhouse_query(self, query):
155182
{"name": column_name, "friendly_name": column_name, "type": column_type}
156183
)
157184

158-
rows = result["data"]
185+
rows = response.get("data", [])
159186
for row in rows:
160187
for column in columns_int64:
161188
try:
162189
row[column] = int(row[column])
163190
except TypeError:
164191
row[column] = None
165192

166-
if "totals" in result:
167-
totals = result["totals"]
193+
if "totals" in response:
194+
totals = response["totals"]
168195
for column, value in columns_totals.items():
169196
totals[column] = value
170197
rows.append(totals)
171198

172199
return {"columns": columns, "rows": rows}
173200

174201
def run_query(self, query, user):
175-
logger.debug("Clickhouse is about to execute query: %s", query)
176-
if query == "":
202+
queries = split_multi_query(query)
203+
204+
if not queries:
177205
json_data = None
178206
error = "Query is empty"
179207
return json_data, error
208+
180209
try:
181-
q = self._clickhouse_query(query)
182-
data = json_dumps(q)
210+
# If just one query was given no session is needed
211+
if len(queries) == 1:
212+
results = self._clickhouse_query(queries[0])
213+
else:
214+
# If more than one query was given, a session is needed. Parameter session_check must be false
215+
# for the first query
216+
session_id = "redash_{}".format(uuid4().hex)
217+
218+
results = self._clickhouse_query(
219+
queries[0], session_id, session_check=False
220+
)
221+
222+
for query in queries[1:]:
223+
results = self._clickhouse_query(
224+
query, session_id, session_check=True
225+
)
226+
227+
data = json_dumps(results)
183228
error = None
184229
except Exception as e:
185230
data = None
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
import json
2+
from unittest import TestCase
3+
from unittest.mock import Mock, patch
4+
5+
from redash.query_runner import TYPE_INTEGER
6+
from redash.query_runner.clickhouse import ClickHouse, split_multi_query
7+
8+
split_multi_query_samples = [
9+
# Regular query
10+
("SELECT 1", ["SELECT 1"]),
11+
# Multiple data queries inlined
12+
("SELECT 1; SELECT 2;", ["SELECT 1", "SELECT 2"]),
13+
# Multiline data queries
14+
(
15+
"""
16+
SELECT 1;
17+
SELECT 2;
18+
""",
19+
["SELECT 1", "SELECT 2"],
20+
),
21+
# Commented data queries
22+
(
23+
"""
24+
-- First query single-line commentary
25+
SELECT 1;
26+
27+
/**
28+
* Second query multi-line commentary
29+
*/
30+
SELECT 2;
31+
32+
-- Tail single-line commentary
33+
34+
/**
35+
* Tail multi-line commentary
36+
*/
37+
""",
38+
[
39+
"-- First query single-line commentary\nSELECT 1",
40+
"/**\n * Second query multi-line commentary\n */\nSELECT 2",
41+
],
42+
),
43+
# Should skip empty statements
44+
(
45+
"""
46+
;;;
47+
;
48+
SELECT 1;
49+
""",
50+
["SELECT 1"],
51+
),
52+
]
53+
54+
55+
class TestClickHouseQueriesSplit(TestCase):
56+
def test_split(self):
57+
for sample in split_multi_query_samples:
58+
query, expected = sample
59+
60+
self.assertEqual(split_multi_query(query), expected)
61+
62+
63+
simple_query_response = {
64+
"meta": [
65+
{"name": "1", "type": "UInt8"},
66+
],
67+
"data": [
68+
{"1": 1},
69+
],
70+
"rows": 1,
71+
"statistics": {"elapsed": 0.0001278, "rows_read": 1, "bytes_read": 1},
72+
}
73+
74+
75+
class TestClickHouse(TestCase):
76+
@patch("requests.post")
77+
def test_send_single_query(self, post_request):
78+
query_runner = ClickHouse(
79+
{"url": "http://clickhouse:8123", "dbname": "system", "timeout": 60}
80+
)
81+
82+
response = Mock()
83+
response.status_code = 200
84+
response.text = json.dumps(simple_query_response)
85+
response.json.return_value = simple_query_response
86+
post_request.return_value = response
87+
88+
data, error = query_runner.run_query("SELECT 1", None)
89+
90+
self.assertIsNone(error)
91+
self.assertEqual(
92+
json.loads(data),
93+
{
94+
"columns": [
95+
{"name": "1", "friendly_name": "1", "type": TYPE_INTEGER},
96+
],
97+
"rows": [
98+
{"1": 1},
99+
],
100+
},
101+
)
102+
103+
(url,), kwargs = post_request.call_args
104+
self.assertEqual(url, "http://clickhouse:8123")
105+
self.assertEqual(kwargs["data"], b"SELECT 1\nFORMAT JSON")
106+
self.assertEqual(
107+
kwargs["params"],
108+
{
109+
"user": "default",
110+
"password": "",
111+
"database": "system",
112+
"default_format": "JSON",
113+
},
114+
)
115+
self.assertEqual(kwargs["timeout"], 60)
116+
117+
@patch("requests.post")
118+
def test_send_multi_query(self, post_request):
119+
query_runner = ClickHouse(
120+
{"url": "http://clickhouse:8123", "dbname": "system", "timeout": 60}
121+
)
122+
123+
create_table_response = Mock()
124+
create_table_response.status_code = 200
125+
create_table_response.text = ""
126+
127+
select_response = Mock()
128+
select_response.status_code = 200
129+
select_response.text = json.dumps(simple_query_response)
130+
select_response.json.return_value = simple_query_response
131+
132+
post_request.side_effect = [create_table_response, select_response]
133+
134+
data, error = query_runner.run_query(
135+
"""
136+
CREATE
137+
TEMPORARY TABLE test AS
138+
SELECT 1;
139+
SELECT * FROM test;
140+
""",
141+
None,
142+
)
143+
144+
self.assertIsNone(error)
145+
self.assertEqual(
146+
json.loads(data),
147+
{
148+
"columns": [
149+
{"name": "1", "friendly_name": "1", "type": TYPE_INTEGER},
150+
],
151+
"rows": [
152+
{"1": 1},
153+
],
154+
},
155+
)
156+
157+
(url,), kwargs = post_request.call_args_list[0]
158+
self.assertEqual(url, "http://clickhouse:8123")
159+
self.assertEqual(
160+
kwargs["data"],
161+
b"""CREATE
162+
TEMPORARY TABLE test AS
163+
SELECT 1
164+
FORMAT JSON""",
165+
)
166+
self.assert_session_params(kwargs, expected_check="0", expected_timeout=60)
167+
168+
session_id = kwargs["params"]["session_id"]
169+
170+
(url,), kwargs = post_request.call_args_list[1]
171+
self.assertEqual(url, "http://clickhouse:8123")
172+
self.assertEqual(
173+
kwargs["data"],
174+
b"""SELECT * FROM test
175+
FORMAT JSON""",
176+
)
177+
178+
self.assert_session_params(
179+
kwargs, expected_check="1", expected_timeout=60, expected_id=session_id
180+
)
181+
182+
def assert_session_params(
183+
self, kwargs, expected_check, expected_timeout, expected_id=None
184+
):
185+
self.assertEqual(kwargs["params"]["session_check"], expected_check)
186+
self.assertEqual(kwargs["params"]["session_timeout"], expected_timeout)
187+
188+
session_id = kwargs["params"]["session_id"]
189+
self.assertRegex(session_id, r"redash_[a-f0-9]+")
190+
191+
if expected_id:
192+
self.assertEqual(kwargs["params"]["session_id"], session_id)

0 commit comments

Comments
 (0)