Skip to content

Commit c15ad2a

Browse files
committed
rebuild and retest
1 parent ff733fc commit c15ad2a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+21579
-4
lines changed

build/lib/data_algebra/BigQuery.py

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
"""
2+
Adapter for Google BigQuery database
3+
"""
4+
5+
6+
from typing import Optional
7+
8+
import gzip
9+
import os
10+
import os.path
11+
12+
import data_algebra
13+
import data_algebra.data_ops
14+
import data_algebra.db_model
15+
16+
_have_bigquery = False
17+
try:
18+
# noinspection PyUnresolvedReferences
19+
import google.cloud.bigquery
20+
21+
_have_bigquery = True
22+
except ImportError:
23+
pass
24+
25+
26+
def _bigquery_median_expr(dbmodel, expression):
27+
return (
28+
"PERCENTILE_CONT("
29+
+ dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
30+
+ ", 0.5)"
31+
)
32+
33+
34+
def _bigquery_std_expr(dbmodel, expression):
35+
return (
36+
"STDDEV_SAMP("
37+
+ dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
38+
+ ")"
39+
)
40+
41+
42+
def _bigquery_var_expr(dbmodel, expression):
43+
return (
44+
"VAR_SAMP("
45+
+ dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
46+
+ ")"
47+
)
48+
49+
50+
def _bigquery_is_bad_expr(dbmodel, expression):
51+
subexpr = dbmodel.expr_to_sql(expression.args[0], want_inline_parens=True)
52+
return (
53+
"("
54+
+ subexpr
55+
+ " IS NULL OR "
56+
+ "IS_INF("
57+
+ subexpr
58+
+ ")"
59+
+ " OR ("
60+
+ subexpr
61+
+ " != 0 AND "
62+
+ subexpr
63+
+ " = -"
64+
+ subexpr
65+
+ "))"
66+
)
67+
68+
69+
def _bigquery_any_expr(dbmodel, expression):
70+
subexpr = dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
71+
return f"LOGICAL_OR({subexpr})"
72+
73+
74+
def _bigquery_all_expr(dbmodel, expression):
75+
subexpr = dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
76+
return f"LOGICAL_AND({subexpr})"
77+
78+
79+
def _bigquery_any_value_expr(dbmodel, expression):
80+
return (
81+
"ANY_VALUE("
82+
+ dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
83+
+ ")"
84+
)
85+
86+
87+
def _bigquery_ieee_divide_expr(dbmodel, expression):
88+
# don't issue an error
89+
# https://cloud.google.com/bigquery/docs/reference/standard-sql/mathematical_functions#ieee_divide
90+
assert len(expression.args) == 2
91+
e0 = dbmodel.expr_to_sql(expression.args[0], want_inline_parens=False)
92+
e1 = dbmodel.expr_to_sql(expression.args[1], want_inline_parens=True)
93+
return f"IEEE_DIVIDE({e0}, 1.0 * {e1})"
94+
95+
96+
BigQuery_formatters = {
97+
"median": _bigquery_median_expr,
98+
"is_bad": _bigquery_is_bad_expr,
99+
"std": _bigquery_std_expr,
100+
"var": _bigquery_var_expr,
101+
"any": _bigquery_any_expr,
102+
"all": _bigquery_all_expr,
103+
"any_value": _bigquery_any_value_expr,
104+
"%/%": _bigquery_ieee_divide_expr,
105+
}
106+
107+
108+
class BigQueryModel(data_algebra.db_model.DBModel):
109+
"""A model of how SQL should be generated for BigQuery
110+
connection should be google.cloud.bigquery.client.Client"""
111+
112+
def __init__(self, *, table_prefix: Optional[str] = None):
113+
data_algebra.db_model.DBModel.__init__(
114+
self,
115+
identifier_quote="`",
116+
string_quote='"',
117+
sql_formatters=BigQuery_formatters,
118+
on_start="(",
119+
on_end=")",
120+
on_joiner=" AND ",
121+
string_type="STRING",
122+
)
123+
self.table_prefix = table_prefix
124+
125+
def get_table_name(self, table_description):
126+
if not isinstance(table_description, str):
127+
try:
128+
if table_description.node_name == "TableDescription":
129+
table_description = table_description.table_name
130+
else:
131+
raise TypeError(
132+
"Expected table_description to be a string or data_algebra.data_ops.TableDescription)"
133+
)
134+
except KeyError:
135+
raise TypeError(
136+
"Expected table_description to be a string or data_algebra.data_ops.TableDescription)"
137+
)
138+
if self.table_prefix is not None:
139+
table_description = self.table_prefix + "." + table_description
140+
return table_description
141+
142+
def quote_table_name(self, table_description):
143+
table_name = self.get_table_name(table_description)
144+
return self.quote_identifier(table_name)
145+
146+
# noinspection PyMethodMayBeStatic
147+
def execute(self, conn, q):
148+
"""
149+
150+
:param conn: database connection
151+
:param q: sql query
152+
"""
153+
assert _have_bigquery
154+
assert isinstance(conn, google.cloud.bigquery.client.Client)
155+
if isinstance(q, data_algebra.data_ops.ViewRepresentation):
156+
q = q.to_sql(db_model=self)
157+
else:
158+
q = str(q)
159+
assert isinstance(q, str)
160+
conn.query(q).result()
161+
162+
def read_query(self, conn, q):
163+
"""
164+
165+
:param conn: database connection
166+
:param q: sql query
167+
:return: query results as table
168+
"""
169+
assert _have_bigquery
170+
assert isinstance(conn, google.cloud.bigquery.client.Client)
171+
if isinstance(q, data_algebra.data_ops.ViewRepresentation):
172+
q = q.to_sql(db_model=self)
173+
else:
174+
q = str(q)
175+
assert isinstance(q, str)
176+
r = self.local_data_model.pd.DataFrame(conn.query(q).result().to_dataframe())
177+
self.local_data_model.clean_copy(r)
178+
return r.copy() # fresh copy
179+
180+
def insert_table(
181+
self, conn, d, table_name, *, qualifiers=None, allow_overwrite=False
182+
):
183+
prepped_table_name = table_name
184+
if self.table_prefix is not None:
185+
prepped_table_name = self.table_prefix + "." + table_name
186+
if allow_overwrite:
187+
self.drop_table(conn, table_name)
188+
else:
189+
table_exists = True
190+
# noinspection PyBroadException
191+
try:
192+
self.read_query(
193+
conn,
194+
"SELECT * FROM " + self.quote_table_name(table_name) + " LIMIT 1",
195+
)
196+
except Exception:
197+
table_exists = False
198+
if table_exists:
199+
raise ValueError("table " + prepped_table_name + " already exists")
200+
job = conn.load_table_from_dataframe(d, prepped_table_name)
201+
job.result()
202+
203+
def db_handle(self, conn, *, db_engine=None):
204+
return BigQuery_DBHandle(db_model=self, conn=conn)
205+
206+
207+
class BigQuery_DBHandle(data_algebra.db_model.DBHandle):
208+
def __init__(self, *, db_model=BigQueryModel(), conn):
209+
assert isinstance(db_model, BigQueryModel)
210+
data_algebra.db_model.DBHandle.__init__(self, db_model=db_model, conn=conn)
211+
212+
def describe_bq_table(
213+
self, *, table_catalog, table_schema, table_name, row_limit=7
214+
) -> data_algebra.data_ops.TableDescription:
215+
full_name = f"{table_catalog}.{table_schema}.{table_name}"
216+
head = self.db_model.read_query(
217+
conn=self.conn,
218+
q="SELECT * FROM "
219+
+ self.db_model.quote_identifier(
220+
full_name
221+
) # don't quote table name: adds more qualifiers
222+
+ " LIMIT "
223+
+ str(row_limit),
224+
)
225+
cat_name = f"{table_catalog}.{table_schema}.INFORMATION_SCHEMA.COLUMNS"
226+
sql_meta = self.db_model.read_query(
227+
self.conn,
228+
f"SELECT * FROM {self.db_model.quote_identifier(cat_name)} "
229+
+ f"WHERE table_name={self.db_model.quote_string(table_name)}",
230+
)
231+
qualifiers = {
232+
"table_catalog": table_catalog,
233+
"table_schema": table_schema,
234+
"table_name": table_name,
235+
"full_name": full_name,
236+
}
237+
td = data_algebra.data_ops.describe_table(
238+
head,
239+
table_name=full_name,
240+
row_limit=row_limit,
241+
qualifiers=qualifiers,
242+
sql_meta=sql_meta,
243+
)
244+
return td
245+
246+
def query_to_csv(self, q, *, res_name) -> None:
247+
"""Write query to csv"""
248+
if isinstance(q, data_algebra.data_ops.ViewRepresentation):
249+
q = q.to_sql(self.db_model)
250+
else:
251+
q = str(q)
252+
if res_name.endswith(".gz"):
253+
op = lambda: gzip.open(res_name, "w", encoding="utf-8")
254+
else:
255+
op = lambda: open(res_name, "w", encoding="utf-8")
256+
with op() as res:
257+
res_iter = self.conn.query(q).result().to_dataframe_iterable()
258+
is_first = True
259+
for block in res_iter:
260+
block.to_csv(res, index=False, header=is_first)
261+
is_first = False
262+
263+
264+
def example_handle():
265+
"""
266+
Return an example db handle for testing. Returns None if helper packages not present.
267+
Note: binds in a data_catalog and data schema prefix. So this handle is specific
268+
to one database.
269+
270+
"""
271+
# TODO: parameterize this
272+
assert _have_bigquery
273+
credential_file = "/Users/johnmount/big_query/big_query_jm.json"
274+
# assert os.path.isfile(credential_file)
275+
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credential_file
276+
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] # trigger key error if not present
277+
# noinspection PyBroadException
278+
try:
279+
data_catalog = "data-algebra-test"
280+
data_schema = "test_1"
281+
db_handle = BigQueryModel(
282+
table_prefix=f"{data_catalog}.{data_schema}"
283+
).db_handle(google.cloud.bigquery.Client())
284+
db_handle.db_model.prepare_connection(db_handle.conn)
285+
return db_handle
286+
except Exception:
287+
return None

build/lib/data_algebra/MySQL.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""
2+
Partial adapter of data algebra for MySQL. Not all data algebra operations are supported on this database at this time.
3+
"""
4+
5+
6+
import data_algebra.data_ops
7+
import data_algebra.sql_model
8+
import data_algebra.db_model
9+
10+
11+
have_sqlalchemy = False
12+
try:
13+
# noinspection PyUnresolvedReferences
14+
import sqlalchemy
15+
16+
have_sqlalchemy = True
17+
except ImportError:
18+
have_sqlalchemy = False
19+
20+
21+
def _MySQL_is_bad_expr(dbmodel, expression):
22+
subexpr = dbmodel.expr_to_sql(expression.args[0], want_inline_parens=True)
23+
return (
24+
"("
25+
+ subexpr
26+
+ " IS NULL " # TODO get infinity checks here
27+
+ " OR ("
28+
+ subexpr
29+
+ " != 0 AND "
30+
+ subexpr
31+
+ " = -"
32+
+ subexpr
33+
+ "))"
34+
)
35+
36+
37+
def _MySQL_concat_expr(dbmodel, expression):
38+
return (
39+
"CONCAT(" # TODO: cast each to char on way in
40+
+ ", ".join(
41+
[
42+
dbmodel.expr_to_sql(ai, want_inline_parens=False)
43+
for ai in expression.args
44+
]
45+
)
46+
+ ")"
47+
)
48+
49+
50+
# map from op-name to special SQL formatting code
51+
MySQL_formatters = {
52+
"___": lambda dbmodel, expression: str(expression.to_python()),
53+
"is_bad": _MySQL_is_bad_expr,
54+
"concat": _MySQL_concat_expr,
55+
}
56+
57+
58+
class MySQLModel(data_algebra.db_model.DBModel):
59+
"""A model of how SQL should be generated for MySQL.
60+
Assuming we are using a sqlalchemy engine as our connection.
61+
"""
62+
63+
def __init__(self):
64+
op_replacements = data_algebra.sql_model.db_default_op_replacements.copy()
65+
op_replacements["std"] = "STDDEV_SAMP"
66+
op_replacements["var"] = "VAR_SAMP"
67+
data_algebra.db_model.DBModel.__init__(
68+
self,
69+
string_type="CHAR",
70+
identifier_quote="`",
71+
string_quote="'",
72+
op_replacements=op_replacements,
73+
sql_formatters=MySQL_formatters,
74+
)
75+
76+
def quote_identifier(self, identifier):
77+
assert isinstance(identifier, str)
78+
if self.identifier_quote in identifier:
79+
# TODO: escape quotes
80+
raise ValueError(
81+
"did not expect " + self.identifier_quote + " in identifier"
82+
)
83+
return self.identifier_quote + identifier + self.identifier_quote
84+
85+
86+
def example_handle():
87+
"""
88+
Return an example db handle for testing. Returns None if helper packages not present.
89+
90+
"""
91+
# TODO: parameterize this
92+
assert have_sqlalchemy
93+
db_engine = sqlalchemy.engine.create_engine(
94+
"mysql+pymysql://jmount@localhost/jmount"
95+
)
96+
db_handle = MySQLModel().db_handle(conn=db_engine, db_engine=db_engine)
97+
db_handle.db_model.prepare_connection(db_handle.conn)
98+
return db_handle

0 commit comments

Comments
 (0)