-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathcli.py
More file actions
272 lines (229 loc) · 8.7 KB
/
cli.py
File metadata and controls
272 lines (229 loc) · 8.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# encoding: utf-8
from typing import Any
import logging
import os
import json
import click
import sqlalchemy as sa
from ckan.model import parse_db_config
from ckan.common import config
import ckan.logic as logic
import ckanext.datastore as datastore_module
from ckanext.datastore.backend.postgres import (
identifier,
literal_string,
get_read_engine,
get_write_engine,
_get_field_info,
)
from ckanext.datastore.blueprint import DUMP_FORMATS, dump_to
log = logging.getLogger(__name__)
@click.group(short_help=u"Perform commands to set up the datastore.")
def datastore():
"""Perform commands to set up the datastore.
"""
pass
@datastore.command(
u'set-permissions',
short_help=u'Generate SQL for permission configuration.'
)
def set_permissions():
u'''Emit an SQL script that will set the permissions for the datastore
users as configured in your configuration file.'''
write_url = _parse_db_config(u'ckan.datastore.write_url')
read_url = _parse_db_config(u'ckan.datastore.read_url')
db_url = _parse_db_config(u'sqlalchemy.url')
# Basic validation that read and write URLs reference the same database.
# This obviously doesn't check they're the same database (the hosts/ports
# could be different), but it's better than nothing, I guess.
if write_url[u'db_name'] != read_url[u'db_name']:
click.secho(
u'The datastore write_url and read_url must refer to the same '
u'database!',
fg=u'red',
bold=True
)
raise click.Abort()
sql = permissions_sql(
maindb=db_url[u'db_name'],
datastoredb=write_url[u'db_name'],
mainuser=db_url[u'db_user'],
writeuser=write_url[u'db_user'],
readuser=read_url[u'db_user']
)
click.echo(sql)
def permissions_sql(maindb: str, datastoredb: str, mainuser: str,
writeuser: str, readuser: str):
template_filename = os.path.join(
os.path.dirname(datastore_module.__file__), u'set_permissions.sql'
)
with open(template_filename) as fp:
template = fp.read()
return template.format(
maindb=identifier(maindb),
datastoredb=identifier(datastoredb),
mainuser=identifier(mainuser),
writeuser=identifier(writeuser),
readuser=identifier(readuser)
)
@datastore.command()
@click.argument(u'resource-id', nargs=1)
@click.argument(
u'output-file',
type=click.File(u'wb'),
default=click.get_binary_stream(u'stdout')
)
@click.option(u'--format', default=u'csv', type=click.Choice(DUMP_FORMATS))
@click.option(u'--offset', type=click.IntRange(0, None), default=0)
@click.option(u'--limit', type=click.IntRange(0))
@click.option(u'--bom', is_flag=True)
@click.pass_context
def dump(ctx: Any, resource_id: str, output_file: Any, format: str,
offset: int, limit: int, bom: bool):
u'''Dump a datastore resource.
'''
flask_app = ctx.meta['flask_app']
user = logic.get_action('get_site_user')(
{'ignore_auth': True}, {})
with flask_app.test_request_context():
for block in dump_to(resource_id,
fmt=format,
offset=offset,
limit=limit,
options={u'bom': bom},
sort=u'_id',
search_params={},
user=user['name']):
output_file.write(block)
def _parse_db_config(config_key: str = u'sqlalchemy.url'):
db_config = parse_db_config(config_key)
if not db_config:
click.secho(
u'Could not extract db details from url: %r' % config[config_key],
fg=u'red',
bold=True
)
raise click.Abort()
return db_config
@datastore.command(
'purge',
short_help='purge orphaned or deleted resources from the datastore.'
)
# (canada fork only): more options and state=deleted handling
# TODO: upstream contrib!!
@click.option('-l', '--list', is_flag=True,
type=click.BOOL,
help='Only output the list of oprhaned or deleted Resource IDs.')
@click.option('-y', '--yes', is_flag=True,
type=click.BOOL, help='Purge without asking for confirmation.')
def purge(list: bool = False, yes: bool = False):
'''Purge orphaned or deleted resources from the datastore using the datastore_delete
action, which drops tables when called without filters.'''
site_user = logic.get_action('get_site_user')({'ignore_auth': True}, {})
result = logic.get_action('datastore_search')(
{'user': site_user['name']},
{'resource_id': '_table_metadata'}
)
resource_id_list = []
for record in result['records']:
try:
# ignore 'alias' records (views) as they are automatically
# deleted when the parent resource table is dropped
if record['alias_of']:
continue
# (canada fork only): more options and state=deleted handling
# TODO: upstream contrib!!
res = logic.get_action('resource_show')(
{'user': site_user['name']},
{'id': record['name']}
)
pkg = logic.get_action('package_show')(
{'user': site_user['name']},
{'id': res['package_id']}
)
except logic.NotFound:
resource_id_list.append(record['name'])
if not list:
click.echo("Resource '%s' orphaned - queued for drop" %
record['name'])
continue
except KeyError:
continue
# (canada fork only): more options and state=deleted handling
# TODO: upstream contrib!!
if res['state'] == 'deleted':
resource_id_list.append(record['name'])
if not list:
click.echo("Resource '%s' deleted - queued for drop" %
record['name'])
if pkg['state'] == 'deleted':
resource_id_list.append(record['name'])
if not list:
click.echo("Package '%s' deleted - queued for drop" %
pkg['id'])
# (canada fork only): more options and state=deleted handling
# TODO: upstream contrib!!
if list:
click.echo('\n'.join(resource_id_list))
return
orphaned_table_count = len(resource_id_list)
click.echo('%d orphaned tables found.' % orphaned_table_count)
if not orphaned_table_count:
return
# (canada fork only): more options and state=deleted handling
# TODO: upstream contrib!!
if not yes:
click.confirm('Proceed with purge?', abort=True)
# Drop the orphaned datastore tables. When datastore_delete is called
# without filters, it does a drop table cascade
drop_count = 0
for resource_id in resource_id_list:
logic.get_action('datastore_delete')(
{'user': site_user['name']},
{'resource_id': resource_id, 'force': True}
)
click.echo("Table '%s' dropped)" % resource_id)
drop_count += 1
click.echo('Dropped %s tables' % drop_count)
@datastore.command(
'migrate',
short_help='migrate datastore field info for plugin_data support'
)
def migrate():
'''Move field info to _info so that plugins may add private information
to each field for their own purposes.'''
site_user = logic.get_action('get_site_user')({'ignore_auth': True}, {})
result = logic.get_action('datastore_search')(
{'user': site_user['name']},
{'resource_id': '_table_metadata'}
)
count = 0
skipped = 0
noinfo = 0
read_connection = get_read_engine()
for record in result['records']:
if record['alias_of']:
continue
raw_fields = _get_field_info(read_connection, record['name'], raw=True)
if any('_info' in f for f in raw_fields.values()):
skipped += 1
continue
alter_sql = []
with get_write_engine().begin() as connection:
for fid, fvalue in raw_fields.items():
raw = {'_info': fvalue}
raw_sql = literal_string(json.dumps(
raw, ensure_ascii=False, separators=(',', ':')))
alter_sql.append(u'COMMENT ON COLUMN {0}.{1} is {2}'.format(
identifier(record['name']),
identifier(fid),
raw_sql))
if alter_sql:
connection.execute(sa.text(';'.join(alter_sql)))
count += 1
else:
noinfo += 1
click.echo('Migrated %d tables (%d already migrated, %d no info)' % (
count, skipped, noinfo))
def get_commands():
return (set_permissions, dump, purge, migrate)