Skip to content

Commit 23e87c0

Browse files
committed
feat: add powerbi query runner [WIP]
1 parent 5cf13af commit 23e87c0

File tree

2 files changed

+211
-0
lines changed

2 files changed

+211
-0
lines changed

redash/query_runner/powerbi.py

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
# TODO: test
2+
import logging
3+
from typing import Optional, Tuple
4+
5+
from redash.query_runner import (
6+
TYPE_BOOLEAN,
7+
TYPE_DATETIME,
8+
TYPE_FLOAT,
9+
TYPE_INTEGER,
10+
TYPE_STRING,
11+
BaseHTTPQueryRunner,
12+
register,
13+
)
14+
from redash.utils import json_dumps, json_loads
15+
from redash.utils.requests_session import UnacceptableAddressException
16+
17+
try:
18+
import msal
19+
import numpy
20+
import pandas
21+
22+
enabled = True
23+
except ImportError:
24+
enabled = False
25+
26+
CONVERSIONS = [
27+
{"pandas_type": numpy.bool_, "redash_type": TYPE_BOOLEAN},
28+
{
29+
"pandas_type": numpy.datetime64,
30+
"redash_type": TYPE_DATETIME,
31+
"to_redash": lambda x: x.strftime("%Y-%m-%d %H:%M:%S"),
32+
},
33+
{"pandas_type": numpy.inexact, "redash_type": TYPE_FLOAT},
34+
{"pandas_type": numpy.integer, "redash_type": TYPE_INTEGER},
35+
{"pandas_type": numpy.object, "redash_type": TYPE_STRING},
36+
]
37+
38+
39+
logger = logging.getLogger(__name__)
40+
41+
42+
class PowerBIDAX(BaseHTTPQueryRunner):
43+
noop_query = """
44+
EVALUATE
45+
DATATABLE(
46+
"Name", STRING, "Region", STRING,
47+
{
48+
{"User1", "East"},
49+
{"User2", "East"},
50+
{"User3", "West"},
51+
{"User4", "West"},
52+
{"User4", "East"}
53+
}
54+
)
55+
"""
56+
response_error = "Power BI returned unexpected status code"
57+
client_id_title = "Client ID"
58+
authority_url_title = "Authority URL"
59+
scope_title = "Scope"
60+
# client_id = "Enter_the_Application_Id_here"
61+
# authority_url = 'https://login.microsoftonline.com/yourdomain.com'
62+
scope = ["https://analysis.windows.net/powerbi/api/.default"]
63+
64+
requires_authentication = True
65+
requires_url = False
66+
url_title = "Power BI URL"
67+
url = "https://api.powerbi.com/v1.0/myorg"
68+
# should_annotate_query = False
69+
username_title = "Username"
70+
password_title = "Password"
71+
72+
@classmethod
73+
def configuration_schema(cls):
74+
schema = super().configuration_schema()
75+
properties: dict = schema["properties"]
76+
properties.update(
77+
{
78+
"client_id": {"type": "string", "title": cls.client_id_title},
79+
"authority_url": {
80+
"type": "string",
81+
"title": cls.authority_url_title,
82+
"default": "https://login.microsoftonline.com/",
83+
},
84+
"scope": {
85+
"type": "list",
86+
"title": cls.scope_title,
87+
# "default": ["https://analysis.windows.net/powerbi/api/.default"],
88+
},
89+
}
90+
)
91+
schema["required"] = schema.get("required", []) + [
92+
"client_id",
93+
"authority_url",
94+
"scope",
95+
]
96+
return schema
97+
98+
@classmethod
99+
def name(cls):
100+
return "Power BI (DAX)"
101+
102+
@classmethod
103+
def enabled(cls):
104+
return enabled
105+
106+
def __init__(self, *args, **kwargs):
107+
super().__init__(*args, **kwargs)
108+
self.syntax = "yaml"
109+
110+
def test_connection(self):
111+
_, error = self.get_response("/availableFeatures")
112+
if error is not None:
113+
raise Exception(error)
114+
115+
def get_auth(self):
116+
client_id = self.configuration["client_id"]
117+
authority_url = self.configuration["authority_url"]
118+
scope = self.configuration["scope"]
119+
# username = self.configuration["username"]
120+
# password = self.configuration["password"]
121+
username, password = super().get_auth()
122+
app = msal.PublicClientApplication(client_id=client_id, authority=authority_url)
123+
result = app.acquire_token_by_username_password(
124+
username=username,
125+
password=password,
126+
scopes=scope,
127+
)
128+
access_token = result["access_token"]
129+
return f"Bearer {access_token}"
130+
131+
def get_response(self, url, auth=None, http_method="get", **kwargs):
132+
url = "{}{}".format(self.configuration["url"], url)
133+
headers = kwargs.pop("headers", {})
134+
headers["Accept"] = "application/json"
135+
headers["Content-Type"] = "application/json"
136+
# access_token = self._get_access_token()
137+
# headers["Authorization"] = f"Bearer {access_token}"
138+
return super().get_response(url, auth, http_method, headers=headers, **kwargs)
139+
140+
def _build_query(self, query: str) -> Tuple[dict, str, Optional[list]]:
141+
query: dict = json_loads(query)
142+
group_id = query.pop("group_id", "")
143+
dataset_id = query.pop("dataset_id", "")
144+
query = (
145+
{
146+
"queries": [{"query": query.pop("query", "")}],
147+
"serializerSettings": {"includeNulls": True},
148+
# "impersonatedUserName": email,
149+
},
150+
)
151+
url = "/groups/{groupId}/datasets/{datasetId}/executeQueries".format_map(
152+
{
153+
"groupId": group_id,
154+
"datasetId": dataset_id,
155+
}
156+
)
157+
return url, query
158+
159+
@classmethod
160+
def _parse_results(cls, query_results: dict):
161+
try:
162+
rows = query_results.get("results", {}).get("tables", [{}]).get("rows", [])
163+
df = pandas.from_records(data=rows)
164+
data = {"columns": [], "rows": []}
165+
conversions = CONVERSIONS
166+
labels = []
167+
for dtype, label in zip(df.dtypes, df.columns):
168+
for conversion in conversions:
169+
if issubclass(dtype.type, conversion["pandas_type"]):
170+
data["columns"].append(
171+
{
172+
"name": label,
173+
"friendly_name": label,
174+
"type": conversion["redash_type"],
175+
}
176+
)
177+
labels.append(label)
178+
func = conversion.get("to_redash")
179+
if func:
180+
df[label] = df[label].apply(func)
181+
break
182+
data["rows"] = (
183+
df[labels].replace({numpy.nan: None}).to_dict(orient="records")
184+
)
185+
json_data = json_dumps(data)
186+
error = None
187+
except KeyboardInterrupt:
188+
error = "Query cancelled by user."
189+
json_data = None
190+
except UnacceptableAddressException:
191+
error = "Can't query private addresses."
192+
json_data = None
193+
except Exception as e:
194+
error = f"{e}"
195+
json_data = None
196+
return json_data, error
197+
198+
def run_query(self, query, user):
199+
url, query = self._build_query(query)
200+
response, error = self.get_response(
201+
http_method="post",
202+
url=url,
203+
json=query,
204+
)
205+
query_results = response.json()
206+
json_data, error = self._parse_results(query_results)
207+
return json_data, error
208+
209+
210+
register(PowerBIDAX)

tests/query_runner/test_powerbi.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# TODO: add

0 commit comments

Comments
 (0)