Skip to content

Commit a863c8c

Browse files
authored
Adding Apache Pinot Query Runner (#5798)
* Adding Apache Pinot integration * address comments
1 parent 71458e5 commit a863c8c

File tree

5 files changed

+137
-0
lines changed

5 files changed

+137
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ Redash supports more than 35 SQL and NoSQL [data sources](https://redash.io/help
7575
- MySQL
7676
- Oracle
7777
- Apache Phoenix
78+
- Apache Pinot
7879
- PostgreSQL
7980
- Presto
8081
- Prometheus
29 KB
Loading

redash/query_runner/pinot.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
try:
2+
import pinotdb
3+
enabled = True
4+
except ImportError:
5+
enabled = False
6+
7+
from redash.query_runner import BaseQueryRunner, register
8+
from redash.query_runner import TYPE_DATETIME, TYPE_FLOAT, TYPE_STRING, TYPE_INTEGER, TYPE_BOOLEAN
9+
from redash.utils import json_dumps
10+
11+
import requests
12+
from requests.auth import HTTPBasicAuth
13+
import logging
14+
15+
logger = logging.getLogger(__name__)
16+
17+
PINOT_TYPES_MAPPING = {
18+
"BOOLEAN": TYPE_BOOLEAN,
19+
"INT": TYPE_INTEGER,
20+
"LONG": TYPE_INTEGER,
21+
"FLOAT": TYPE_FLOAT,
22+
"DOUBLE": TYPE_FLOAT,
23+
"STRING": TYPE_STRING,
24+
"BYTES": TYPE_STRING,
25+
"JSON": TYPE_STRING,
26+
"TIMESTAMP": TYPE_DATETIME,
27+
}
28+
29+
30+
class Pinot(BaseQueryRunner):
31+
noop_query = "SELECT 1"
32+
username = None
33+
password = None
34+
35+
@classmethod
36+
def configuration_schema(cls):
37+
return {
38+
"type": "object",
39+
"properties": {
40+
"brokerHost": {"type": "string", "default": ""},
41+
"brokerPort": {"type": "number", "default": 8099},
42+
"brokerScheme": {"type": "string", "default": "http"},
43+
"controllerURI": {"type": "string", "default": ""},
44+
"username": {"type": "string"},
45+
"password": {"type": "string"},
46+
},
47+
"order": ["brokerScheme", "brokerHost", "brokerPort", "controllerURI", "username", "password"],
48+
"required": ["brokerHost", "controllerURI"],
49+
"secret": ["password"],
50+
}
51+
52+
@classmethod
53+
def enabled(cls):
54+
return enabled
55+
56+
def __init__(self, configuration):
57+
super(Pinot, self).__init__(configuration)
58+
self.controller_uri = self.configuration.get("controllerURI")
59+
self.username=(self.configuration.get("username") or None)
60+
self.password=(self.configuration.get("password") or None)
61+
62+
def run_query(self, query, user):
63+
logger.debug("Running query %s with username: %s, password: %s", query, self.username, self.password)
64+
connection = pinotdb.connect(
65+
host=self.configuration["brokerHost"],
66+
port=self.configuration["brokerPort"],
67+
path="/query/sql",
68+
scheme=(self.configuration.get("brokerScheme") or "http"),
69+
verify_ssl=False,
70+
username=self.username,
71+
password=self.password,
72+
)
73+
74+
cursor = connection.cursor()
75+
76+
try:
77+
cursor.execute(query)
78+
logger.debug("cursor.schema = %s",cursor.schema)
79+
columns = self.fetch_columns(
80+
[(i["name"], PINOT_TYPES_MAPPING.get(i["type"], None)) for i in cursor.schema]
81+
)
82+
rows = [
83+
dict(zip((column["name"] for column in columns), row)) for row in cursor
84+
]
85+
86+
data = {"columns": columns, "rows": rows}
87+
error = None
88+
json_data = json_dumps(data)
89+
logger.debug("Pinot execute query [%s]", query)
90+
finally:
91+
connection.close()
92+
93+
return json_data, error
94+
95+
def get_schema(self, get_stats=False):
96+
schema = {}
97+
for schema_name in self.get_schema_names():
98+
for table_name in self.get_table_names():
99+
schema_table_name = "{}.{}".format(schema_name, table_name)
100+
if table_name not in schema:
101+
schema[schema_table_name] = {"name": schema_table_name, "columns": []}
102+
table_schema =self.get_pinot_table_schema(table_name)
103+
104+
for column in table_schema.get("dimensionFieldSpecs", []) + table_schema.get(
105+
"metricFieldSpecs", []) + table_schema.get("dateTimeFieldSpecs", []):
106+
c = {
107+
"name": column["name"],
108+
"type": PINOT_TYPES_MAPPING[column["dataType"]],
109+
}
110+
schema[schema_table_name]["columns"].append(c)
111+
return list(schema.values())
112+
113+
def get_schema_names(self):
114+
return ["default"]
115+
116+
def get_pinot_table_schema(self, pinot_table_name):
117+
return self.get_metadata_from_controller("/tables/" + pinot_table_name + "/schema")
118+
119+
def get_table_names(self):
120+
return self.get_metadata_from_controller("/tables")["tables"]
121+
122+
def get_metadata_from_controller(self, path):
123+
url = self.controller_uri + path
124+
r = requests.get(url, headers={"Accept": "application/json"}, auth= HTTPBasicAuth(self.username, self.password))
125+
try:
126+
result = r.json()
127+
logger.debug("get_metadata_from_controller from path %s", path)
128+
except ValueError as e:
129+
raise pinotdb.exceptions.DatabaseError(
130+
f"Got invalid json response from {self.controller_uri}:{path}: {r.text}"
131+
) from e
132+
return result
133+
134+
register(Pinot)

redash/settings/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ def email_server_is_configured():
349349
"redash.query_runner.amazon_elasticsearch",
350350
"redash.query_runner.trino",
351351
"redash.query_runner.presto",
352+
"redash.query_runner.pinot",
352353
"redash.query_runner.databricks",
353354
"redash.query_runner.hive_ds",
354355
"redash.query_runner.impala_ds",

requirements_all_ds.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,4 @@ pandas==1.3.4
4545
nzpy>=1.15
4646
nzalchemy
4747
python-arango==6.1.0
48+
pinotdb>=0.4.5

0 commit comments

Comments
 (0)