Skip to content

Commit 3b2d6a3

Browse files
committed
rebuild and retest
1 parent 3f0d08d commit 3b2d6a3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+74351
-0
lines changed

build/lib/data_algebra/BigQuery.py

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
"""
2+
Adapter for Google BigQuery database
3+
"""
4+
5+
6+
from typing import Optional
7+
8+
import gzip
9+
import os
10+
import os.path
11+
12+
import data_algebra
13+
import data_algebra.data_ops
14+
import data_algebra.db_model
15+
16+
_have_bigquery = False
17+
try:
18+
# noinspection PyUnresolvedReferences
19+
import google.cloud.bigquery
20+
21+
_have_bigquery = True
22+
except ImportError:
23+
pass
24+
25+
26+
def _bigquery_median_expr(dbmodel, expression):
27+
return (
28+
"PERCENTILE_CONT("
29+
+ dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
30+
+ ", 0.5)"
31+
)
32+
33+
34+
def _bigquery_std_expr(dbmodel, expression):
35+
return (
36+
"STDDEV_SAMP("
37+
+ dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
38+
+ ")"
39+
)
40+
41+
42+
def _bigquery_var_expr(dbmodel, expression):
43+
return (
44+
"VAR_SAMP("
45+
+ dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
46+
+ ")"
47+
)
48+
49+
50+
def _bigquery_is_bad_expr(dbmodel, expression):
51+
subexpr = dbmodel.expr_to_sql(expression.args[0], want_inline_parens=True)
52+
return (
53+
"("
54+
+ subexpr
55+
+ " IS NULL OR "
56+
+ "IS_INF("
57+
+ subexpr
58+
+ ")"
59+
+ " OR ("
60+
+ subexpr
61+
+ " != 0 AND "
62+
+ subexpr
63+
+ " = -"
64+
+ subexpr
65+
+ "))"
66+
)
67+
68+
69+
def _bigquery_any_expr(dbmodel, expression):
70+
subexpr = dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
71+
return f"LOGICAL_OR({subexpr})"
72+
73+
74+
def _bigquery_all_expr(dbmodel, expression):
75+
subexpr = dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
76+
return f"LOGICAL_AND({subexpr})"
77+
78+
79+
def _bigquery_any_value_expr(dbmodel, expression):
80+
return (
81+
"ANY_VALUE("
82+
+ dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
83+
+ ")"
84+
)
85+
86+
87+
def _bigquery_ieee_divide_expr(dbmodel, expression):
88+
# don't issue an error
89+
# https://cloud.google.com/bigquery/docs/reference/standard-sql/mathematical_functions#ieee_divide
90+
assert len(expression.args) == 2
91+
e0 = dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
92+
e1 = dbmodel.expr_to_sql(expression.args[1], want_inline_parens=True)
93+
return f"IEEE_DIVIDE({e0}, 1.0 * {e1})"
94+
95+
96+
BigQuery_formatters = {
97+
"median": _bigquery_median_expr,
98+
"is_bad": _bigquery_is_bad_expr,
99+
"std": _bigquery_std_expr,
100+
"var": _bigquery_var_expr,
101+
"any": _bigquery_any_expr,
102+
"all": _bigquery_all_expr,
103+
"any_value": _bigquery_any_value_expr,
104+
"%/%": _bigquery_ieee_divide_expr,
105+
}
106+
107+
108+
class BigQueryModel(data_algebra.db_model.DBModel):
109+
"""A model of how SQL should be generated for BigQuery
110+
connection should be google.cloud.bigquery.client.Client"""
111+
112+
def __init__(self, *, table_prefix: Optional[str] = None):
113+
data_algebra.db_model.DBModel.__init__(
114+
self,
115+
identifier_quote="`",
116+
string_quote='"',
117+
sql_formatters=BigQuery_formatters,
118+
on_start="(",
119+
on_end=")",
120+
on_joiner=" AND ",
121+
string_type="STRING",
122+
)
123+
self.table_prefix = table_prefix
124+
125+
def get_table_name(self, table_description):
126+
if not isinstance(table_description, str):
127+
try:
128+
if table_description.node_name == "TableDescription":
129+
table_description = table_description.table_name
130+
else:
131+
raise TypeError(
132+
"Expected table_description to be a string or data_algebra.data_ops.TableDescription)"
133+
)
134+
except KeyError:
135+
raise TypeError(
136+
"Expected table_description to be a string or data_algebra.data_ops.TableDescription)"
137+
)
138+
if self.table_prefix is not None:
139+
table_description = self.table_prefix + "." + table_description
140+
return table_description
141+
142+
def quote_table_name(self, table_description):
143+
table_name = self.get_table_name(table_description)
144+
return self.quote_identifier(table_name)
145+
146+
# noinspection PyMethodMayBeStatic
147+
def execute(self, conn, q):
148+
"""
149+
150+
:param conn: database connection
151+
:param q: sql query
152+
"""
153+
assert _have_bigquery
154+
assert isinstance(conn, google.cloud.bigquery.client.Client)
155+
if isinstance(q, data_algebra.data_ops.ViewRepresentation):
156+
q = q.to_sql(db_model=self)
157+
else:
158+
q = str(q)
159+
assert isinstance(q, str)
160+
conn.query(q).result()
161+
162+
def read_query(self, conn, q):
163+
"""
164+
165+
:param conn: database connection
166+
:param q: sql query
167+
:return: query results as table
168+
"""
169+
assert _have_bigquery
170+
assert isinstance(conn, google.cloud.bigquery.client.Client)
171+
if isinstance(q, data_algebra.data_ops.ViewRepresentation):
172+
q = q.to_sql(db_model=self)
173+
else:
174+
q = str(q)
175+
assert isinstance(q, str)
176+
r = self.local_data_model.pd.DataFrame(conn.query(q).result().to_dataframe())
177+
r.reset_index(drop=True, inplace=True)
178+
return r.copy() # fresh copy
179+
180+
def insert_table(
181+
self, conn, d, table_name, *, qualifiers=None, allow_overwrite=False
182+
):
183+
prepped_table_name = table_name
184+
if self.table_prefix is not None:
185+
prepped_table_name = self.table_prefix + "." + table_name
186+
if allow_overwrite:
187+
self.drop_table(conn, table_name)
188+
else:
189+
table_exists = True
190+
# noinspection PyBroadException
191+
try:
192+
self.read_query(
193+
conn,
194+
"SELECT * FROM " + self.quote_table_name(table_name) + " LIMIT 1",
195+
)
196+
except Exception:
197+
table_exists = False
198+
if table_exists:
199+
raise ValueError("table " + prepped_table_name + " already exists")
200+
job = conn.load_table_from_dataframe(d, prepped_table_name)
201+
job.result()
202+
203+
def db_handle(self, conn, *, db_engine=None):
204+
return BigQuery_DBHandle(db_model=self, conn=conn)
205+
206+
207+
class BigQuery_DBHandle(data_algebra.db_model.DBHandle):
208+
def __init__(self, *, db_model=BigQueryModel(), conn):
209+
assert isinstance(db_model, BigQueryModel)
210+
data_algebra.db_model.DBHandle.__init__(self, db_model=db_model, conn=conn)
211+
212+
def describe_bq_table(
213+
self, *, table_catalog, table_schema, table_name, row_limit=7
214+
) -> data_algebra.data_ops.TableDescription:
215+
full_name = f"{table_catalog}.{table_schema}.{table_name}"
216+
head = self.db_model.read_query(
217+
conn=self.conn,
218+
q="SELECT * FROM "
219+
+ self.db_model.quote_identifier(
220+
full_name
221+
) # don't quote table name: adds more qualifiers
222+
+ " LIMIT "
223+
+ str(row_limit),
224+
)
225+
cat_name = f"{table_catalog}.{table_schema}.INFORMATION_SCHEMA.COLUMNS"
226+
sql_meta = self.db_model.read_query(
227+
self.conn,
228+
f"SELECT * FROM {self.db_model.quote_identifier(cat_name)} "
229+
+ f"WHERE table_name={self.db_model.quote_string(table_name)}",
230+
)
231+
qualifiers = {
232+
"table_catalog": table_catalog,
233+
"table_schema": table_schema,
234+
"table_name": table_name,
235+
"full_name": full_name,
236+
}
237+
td = data_algebra.data_ops.describe_table(
238+
head,
239+
table_name=full_name,
240+
row_limit=row_limit,
241+
qualifiers=qualifiers,
242+
sql_meta=sql_meta,
243+
)
244+
return td
245+
246+
def query_to_csv(self, q, *, res_name) -> None:
247+
"""Write query to csv"""
248+
if isinstance(q, data_algebra.data_ops.ViewRepresentation):
249+
q = q.to_sql(self.db_model)
250+
else:
251+
q = str(q)
252+
253+
def open_regular():
254+
"""open regular"""
255+
return lambda: open(res_name, "w")
256+
257+
def open_gzip():
258+
"""open gzipped"""
259+
return lambda: gzip.open(res_name, "w")
260+
261+
if res_name.endswith(".gz"):
262+
op = open_gzip
263+
else:
264+
op = open_regular()
265+
266+
with op() as res:
267+
res_iter = self.conn.query(q).result().to_dataframe_iterable()
268+
is_first = True
269+
for block in res_iter:
270+
block.to_csv(res, index=False, header=is_first)
271+
is_first = False
272+
273+
274+
def example_handle():
275+
"""
276+
Return an example db handle for testing. Returns None if helper packages not present.
277+
Note: binds in a data_catalog and data schema prefix. So this handle is specific
278+
to one database.
279+
280+
"""
281+
# TODO: parameterize this
282+
assert _have_bigquery
283+
credential_file = "/Users/johnmount/big_query/big_query_jm.json"
284+
# assert os.path.isfile(credential_file)
285+
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credential_file
286+
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] # trigger key error if not present
287+
# noinspection PyBroadException
288+
try:
289+
data_catalog = "data-algebra-test"
290+
data_schema = "test_1"
291+
db_handle = BigQueryModel(
292+
table_prefix=f"{data_catalog}.{data_schema}"
293+
).db_handle(google.cloud.bigquery.Client())
294+
db_handle.db_model.prepare_connection(db_handle.conn)
295+
return db_handle
296+
except Exception:
297+
return None

build/lib/data_algebra/MySQL.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
"""
2+
Partial adapter of data algebra for MySQL. Not all data algebra operations are supported on this database at this time.
3+
"""
4+
5+
6+
import data_algebra.data_ops
7+
import data_algebra.db_model
8+
9+
10+
have_sqlalchemy = False
11+
try:
12+
# noinspection PyUnresolvedReferences
13+
import sqlalchemy
14+
15+
have_sqlalchemy = True
16+
except ImportError:
17+
have_sqlalchemy = False
18+
19+
20+
def _MySQL_is_bad_expr(dbmodel, expression):
21+
subexpr = dbmodel.expr_to_sql(expression.args[0], want_inline_parens=True)
22+
return (
23+
"("
24+
+ subexpr
25+
+ " IS NULL " # TODO get infinity checks here
26+
+ " OR ("
27+
+ subexpr
28+
+ " != 0 AND "
29+
+ subexpr
30+
+ " = -"
31+
+ subexpr
32+
+ "))"
33+
)
34+
35+
36+
def _MySQL_concat_expr(dbmodel, expression):
37+
return (
38+
"CONCAT(" # TODO: cast each to char on way in
39+
+ ", ".join(
40+
[
41+
dbmodel.expr_to_sql(ai, want_inline_parens=False)
42+
for ai in expression.args
43+
]
44+
)
45+
+ ")"
46+
)
47+
48+
49+
# map from op-name to special SQL formatting code
50+
MySQL_formatters = {
51+
"___": lambda dbmodel, expression: str(expression.to_python()),
52+
"is_bad": _MySQL_is_bad_expr,
53+
"concat": _MySQL_concat_expr,
54+
}
55+
56+
57+
class MySQLModel(data_algebra.db_model.DBModel):
58+
"""A model of how SQL should be generated for MySQL.
59+
Assuming we are using a sqlalchemy engine as our connection.
60+
"""
61+
62+
def __init__(self):
63+
op_replacements = data_algebra.db_model.db_default_op_replacements.copy()
64+
op_replacements["std"] = "STDDEV_SAMP"
65+
op_replacements["var"] = "VAR_SAMP"
66+
data_algebra.db_model.DBModel.__init__(
67+
self,
68+
string_type="CHAR",
69+
identifier_quote="`",
70+
string_quote="'",
71+
op_replacements=op_replacements,
72+
sql_formatters=MySQL_formatters,
73+
)
74+
75+
def quote_identifier(self, identifier):
76+
assert isinstance(identifier, str)
77+
if self.identifier_quote in identifier:
78+
# TODO: escape quotes
79+
raise ValueError(
80+
"did not expect " + self.identifier_quote + " in identifier"
81+
)
82+
return self.identifier_quote + identifier + self.identifier_quote
83+
84+
85+
def example_handle():
86+
"""
87+
Return an example db handle for testing. Returns None if helper packages not present.
88+
89+
"""
90+
# TODO: parameterize this
91+
assert have_sqlalchemy
92+
db_engine = sqlalchemy.engine.create_engine(
93+
"mysql+pymysql://jmount@localhost/jmount"
94+
)
95+
db_handle = MySQLModel().db_handle(conn=db_engine, db_engine=db_engine)
96+
db_handle.db_model.prepare_connection(db_handle.conn)
97+
return db_handle

0 commit comments

Comments
 (0)