-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbigquery.py
More file actions
101 lines (73 loc) · 3.08 KB
/
bigquery.py
File metadata and controls
101 lines (73 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
try:
from google.cloud import bigquery
from google.colab import auth
except:
pass
def _parse_table_string(table):
project_id, schema_id, table_id = table.split('.')
return project_id, schema_id, table_id
def bq_auth():
return auth.authenticate_user()
class BigQueryClient():
def __init__(self, project='etsy-bigquery-adhoc-prod', params=dict()):
self.project = project
self.params = params
try:
self.client = bigquery.Client(project=project)
except ModuleNotFoundError as error:
print('Warning:', error)
try:
bq_auth()
except ModuleNotFoundError as error:
print('Warning:', error)
def query(self, sql):
return self.client.query(sql).to_dataframe()
def audit_table(self, table, pk=None):
project_id, schema_id, table_id = _parse_table_string(table)
print('Column count:', self.query(
f"""
select format("%'d", sum(1)) from {'.'.join([project_id, schema_id])}.INFORMATION_SCHEMA.COLUMNS where table_name = '{table_id}'
"""
).squeeze())
print('Row count:', self.query(
f"""select format("%'d", sum(1)) from {table}"""
).squeeze())
if pk:
print('Rows with null PK:', str(self.query(
f"""
select coalesce(sum(1),0) from {table} where {pk} is null
"""
).squeeze()).upper())
print('PK is unique:', str(self.query(
f"""
select sum(1) = count(distinct {pk}) from {table}
"""
).squeeze()).upper())
def preview_table(self, table, nrows=5, order_by='rand()', ascending=True):
direction = 'asc' if ascending else 'desc'
return self.query(f'select * from {table} order by {order_by} {direction} limit {nrows}')
def view_table_columns(self, table):
project_id, schema_id, table_id = _parse_table_string(table)
return self.query(
f"""
select column_name, is_nullable, data_type
from {'.'.join([project_id, schema_id])}.INFORMATION_SCHEMA.COLUMNS
where table_name = '{table_id}'
order by ordinal_position
"""
)
def grant_table(self, table, users=None, groups=None, role='viewer'):
grantees = []
if users:
grantees.extend([f"'user:{i}'" for i in users])
if groups:
grantees.extend([f"'group:{i}'" for i in groups])
# GRANT TABLE appears to require backticks around the project ID,
# To avoid double-backticking, strip out any pre-existing backticks in the `table` string
return self.query(f"""grant `roles/bigquery.data{role.capitalize()}` on table `{table.replace('`', '')}` to {','.join(grantees)}""")
def set_params(self, params: dict, verbose=False):
self.params.update(params)
if verbose:
print(self.params)
else:
pass