Skip to content

Commit 6a9f097

Browse files
authored
Prefixes (#514)
1 parent 5409f69 commit 6a9f097

File tree

5 files changed

+155
-42
lines changed

5 files changed

+155
-42
lines changed

cid/common.py

Lines changed: 139 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -190,25 +190,55 @@ def __loadPlugins(self) -> dict:
190190
logger.info('Finished loading plugins')
191191
return plugins
192192

193+
def resources_with_global_parameters(self, resources):
194+
""" render resources with global parameters """
195+
params = self.get_template_parameters(self.resources.get('parameters', {}))
196+
def _recursively_process_strings(item, str_func):
197+
""" recursively update elements of a dict """
198+
if isinstance(item, str):
199+
return str_func(item)
200+
elif isinstance(item, dict):
201+
res = {}
202+
for key, value in item.items():
203+
res[_recursively_process_strings(key, str_func)] = _recursively_process_strings(value, str_func)
204+
return res
205+
elif isinstance(item, list):
206+
return [_recursively_process_strings(value, str_func) for value in item]
207+
return item
208+
def _str_func(text):
209+
return Template(text).safe_substitute(params)
210+
return _recursively_process_strings(resources, _str_func)
211+
212+
193213
def getPlugin(self, plugin) -> dict:
194214
return self.plugins.get(plugin)
195215

196216

197217
def get_definition(self, type: str, name: str=None, id: str=None) -> dict:
198218
""" return resource definition that matches parameters """
219+
res = None
199220
if type not in ['dashboard', 'dataset', 'view']:
200-
print(f'Error: {type} is not a valid type')
201221
raise ValueError(f'{type} is not a valid definition type')
202222
if type in ['dataset', 'view'] and name:
203-
return self.resources.get(f'{type}s').get(name)
223+
res = self.resources.get(f'{type}s').get(name)
204224
elif type in ['dashboard']:
205225
for definition in self.resources.get(f'{type}s').values():
206226
if name is not None and definition.get('name') != name:
207227
continue
208228
if id is not None and definition.get('dashboardId') != id:
209229
continue
210-
return definition
211-
return None
230+
res = definition
231+
break
232+
233+
# template
234+
if isinstance(res, dict):
235+
name = name or res.get('name')
236+
params = self.get_template_parameters(res.get('parameters', {}), param_prefix=f'{type}-{name}-')
237+
# FIXME: can be recursive?
238+
for key, value in res.items():
239+
if isinstance(value, str):
240+
res[key] = Template(value).safe_substitute(params)
241+
return res
212242

213243

214244
@command
@@ -258,6 +288,38 @@ def load_resources(self):
258288
except Exception as exc:
259289
raise CidCritical(f'Failed to load resources from {source}: {type(exc)} {exc}')
260290
self.resources = always_merger.merge(self.resources, resources)
291+
self.resources = self.resources_with_global_parameters(self.resources)
292+
293+
294+
def get_template_parameters(self, parameters: dict, param_prefix: str='', others: dict={}):
295+
""" Get template parameters. """
296+
params = get_parameters()
297+
for key, value in parameters.items():
298+
logger.debug(f'reading template parameter: {key} / {value}')
299+
prefix = '' if value.get('global') else param_prefix
300+
if isinstance(value, str):
301+
params[key] = value
302+
elif isinstance(value, dict) and value.get('type') == 'cur.tag_and_cost_category_fields':
303+
params[key] = get_parameter(
304+
param_name=prefix + key,
305+
message=f"Required parameter: {key} ({value.get('description')})",
306+
choices=self.cur.tag_and_cost_category_fields + ["'none'"],
307+
)
308+
elif isinstance(value, dict):
309+
params[key] = value.get('value')
310+
while params[key] == None:
311+
if value.get('silentDefault') != None and get_parameters().get(key) == None:
312+
params[key] = value.get('silentDefault')
313+
else:
314+
params[key] = get_parameter(
315+
param_name=prefix + key,
316+
message=f"Required parameter: {key} ({value.get('description')})",
317+
default=value.get('default'),
318+
template_variables=dict(account_id=self.base.account_id),
319+
)
320+
else:
321+
raise CidCritical(f'Unknown parameter type for "{key}". Must be a string or a dict with value or with default key')
322+
return always_merger.merge(params, others)
261323

262324

263325
@command
@@ -271,6 +333,11 @@ def _deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs
271333

272334
self.qs.ensure_subscription()
273335

336+
# In case if we cannot discover datasets, we need to discover dashboards
337+
# TODO: check if datasets returns explicit permission denied and only then discover dashboards as a workaround
338+
self.qs.discover_dashboards()
339+
340+
274341
if dashboard_id is None:
275342
dashboard_id = get_parameter(
276343
param_name='dashboard-id',
@@ -298,7 +365,9 @@ def _deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs
298365
else:
299366
raise ValueError(f'Cannot find dashboard with id={dashboard_id} in resources file.')
300367

301-
required_datasets_names = dashboard_definition.get('dependsOn', dict()).get('datasets', list())
368+
definition_dependency_datasets = dashboard_definition.get('dependsOn', {}).get('datasets', [])
369+
required_datasets_names = [dsname for dsname in definition_dependency_datasets]
370+
ds_map = definition_dependency_datasets if isinstance(definition_dependency_datasets, dict) else {}
302371

303372
dashboard_datasets = dashboard.datasets if dashboard else {}
304373

@@ -323,10 +392,12 @@ def _deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs
323392
compatible = self.check_dashboard_version_compatibility(dashboard_id)
324393
elif dashboard_definition.get('data'):
325394
data = dashboard_definition.get('data')
395+
params = self.get_template_parameters(dashboard_definition.get('parameters', dict()))
326396
if isinstance(data, dict):
397+
#TODO: need to apply template to data structure as well
327398
data = yaml.safe_dump(data)
328399
if isinstance(data, str):
329-
data = Template(data).safe_substitute(get_parameters())
400+
data = Template(data).safe_substitute(params)
330401
dashboard_definition['definition'] = yaml.safe_load(data)
331402
elif dashboard_definition.get('file'):
332403
raise NotImplementedError('File option is not implemented')
@@ -369,7 +440,8 @@ def _deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs
369440
if dashboard_definition.get('templateId'):
370441
# For templates we can additionaly verify dataset fields
371442
dataset_fields = {col.get('Name'): col.get('Type') for col in ds.columns}
372-
required_fileds = {col.get('Name'): col.get('DataType') for col in source_template.datasets.get(dataset_name)}
443+
src_fields = source_template.datasets.get(ds_map.get(dataset_name, dataset_name) )
444+
required_fileds = {col.get('Name'): col.get('DataType') for col in src_fields}
373445
unmatched = {}
374446
for k, v in required_fileds.items():
375447
if k not in dataset_fields or dataset_fields[k] != v:
@@ -386,7 +458,7 @@ def _deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs
386458
if not matching_datasets:
387459
reco = ''
388460
logger.warning(f'Dataset {dataset_name} is not found')
389-
if exec_env()['shell'] == 'lambda':
461+
if utils.exec_env()['shell'] == 'lambda':
390462
# We are in lambda
391463
reco = 'You can try deleting existing dataset and re-run.'
392464
else:
@@ -404,6 +476,9 @@ def _deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs
404476
print(f'Using dataset {dataset_name}: {ds.id}')
405477
dashboard_definition['datasets'][dataset_name] = ds.arn
406478

479+
# Update datasets to the mapping name if needed
480+
# Dashboard definition must contain names that are specific to template.
481+
dashboard_definition['datasets'] = {ds_map.get(name, name): arn for name, arn in dashboard_definition['datasets'].items() }
407482
logger.debug(f"datasets: {dashboard_definition['datasets']}")
408483
#FIXME: this code looks absolete
409484
kwargs = dict()
@@ -563,6 +638,7 @@ def delete_dataset(self, name: str, id: str=None):
563638
# try to get the database name from the dataset (might need this for later)
564639
schema = next(iter(dataset.schemas), None) # FIXME: manage choice if multiple data sources
565640
if schema:
641+
logger.debug(f'Picking the first of dataset databases: {dataset.schemas}')
566642
self.athena.DatabaseName = schema
567643

568644
if get_parameter(
@@ -943,15 +1019,18 @@ def create_datasets(self, _datasets: list, known_datasets: dict={}, recursive: b
9431019
_ds_id = get_parameters().get(f'{dataset_name.replace("_", "-")}-dataset-id')
9441020
if _ds_id:
9451021
self.qs.describe_dataset(_ds_id)
946-
1022+
9471023
found_datasets = utils.intersection(required_datasets, [v.name for v in self.qs.datasets.values()])
9481024
missing_datasets = utils.difference(required_datasets, found_datasets)
9491025

9501026
# Update existing datasets
9511027
if update:
9521028
for dataset_name in found_datasets[:]:
9531029
if dataset_name in known_datasets.keys():
954-
dataset_id = self.qs.get_datasets(id=known_datasets.get(dataset_name))[0].id
1030+
_found_dsc = self.qs.get_datasets(id=known_datasets.get(dataset_name))
1031+
if len(_found_dsc) != 1:
1032+
logger.warning(f'Found more than one dataset in known datasets with name {dataset_name} {len(_found_dsc)}. Taking the first one.')
1033+
dataset_id = _found_dsc[0].id
9551034
else:
9561035
datasets = self.qs.get_datasets(name=dataset_name)
9571036
if not datasets:
@@ -1025,6 +1104,8 @@ def create_datasets(self, _datasets: list, known_datasets: dict={}, recursive: b
10251104
print(f'Creating dataset: {dataset_name}')
10261105
try:
10271106
dataset_definition = self.get_definition("dataset", name=dataset_name)
1107+
if not dataset_definition:
1108+
raise Exception(f'Failed to find dataset {dataset_name}. Check if Datasets section in your resources file has that.')
10281109
except Exception as e:
10291110
logger.critical('dashboard definition is broken, unable to proceed.')
10301111
logger.critical(f'dataset definition not found: {dataset_name}')
@@ -1154,12 +1235,15 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non
11541235
datasources = self.qs.get_datasets(id=dataset_id)[0].datasources
11551236
else: # try to find dataset and get athena database
11561237
found_datasets = self.qs.get_datasets(name=dataset_name)
1238+
logger.debug(f'Related to dataset {dataset_name}: {[ds.id for ds in found_datasets]}')
11571239
if found_datasets:
11581240
schemas = list(set(sum([d.schemas for d in found_datasets], [])))
11591241
datasources = list(set(sum([d.datasources for d in found_datasets], [])))
1242+
logger.debug(f'Found following schemas={schemas}, related to dataset with name {dataset_name}')
11601243
logger.info(f'Found {len(datasources)} Athena DataSources related to the DataSet {dataset_name}')
11611244

1162-
if len(schemas) == 1:
1245+
if not get_parameters().get('athena-database') and len(schemas) == 1 and schemas[0]:
1246+
logger.debug(f'Picking the database={schemas[0]}')
11631247
self.athena.DatabaseName = schemas[0]
11641248
# else user will be suggested to choose database anyway
11651249

@@ -1237,6 +1321,15 @@ def create_or_update_dataset(self, dataset_definition: dict, dataset_id: str=Non
12371321
'cur_table_name': self.cur.tableName if cur_required else None
12381322
}
12391323

1324+
logger.debug(columns_tpl)
1325+
1326+
columns_tpl = self.get_template_parameters(
1327+
dataset_definition.get('parameters', dict()),
1328+
f'dataset-{dataset_id}-',
1329+
columns_tpl,
1330+
)
1331+
logger.debug(columns_tpl)
1332+
12401333
compiled_dataset = json.loads(template.safe_substitute(columns_tpl))
12411334
if dataset_id:
12421335
compiled_dataset.update({'DataSetId': dataset_id})
@@ -1307,11 +1400,14 @@ def create_or_update_view(self, view_name: str, recursive: bool=True, update: bo
13071400
return
13081401

13091402
# Create a view
1310-
logger.info(f'Getting view definition')
1403+
logger.info(f'Getting view definition {view_name}')
13111404
view_definition = self.get_definition("view", name=view_name)
13121405
if not view_definition and view_name in self.athena._metadata.keys():
13131406
logger.info(f"Definition is unavailable but view exists: {view_name}, skipping")
13141407
return
1408+
if not view_definition:
1409+
logger.info(f"Definition is unavailable {view_name}")
1410+
return
13151411
logger.debug(f'View definition: {view_definition}')
13161412

13171413
if recursive:
@@ -1335,7 +1431,10 @@ def create_or_update_view(self, view_name: str, recursive: bool=True, update: bo
13351431
print(f'Updating table {view_name}')
13361432
self.glue.create_or_update_table(view_name, view_query)
13371433
else:
1338-
if 'CREATE OR REPLACE' in view_query.upper():
1434+
if 'CREATE EXTERNAL TABLE' in view_query.upper():
1435+
logger.warning('Cannot recreate table {view_name}')
1436+
1437+
elif 'CREATE OR REPLACE' in view_query.upper():
13391438
update_view = False
13401439
while get_parameters().get('on-drift', 'show').lower() != 'override' and isatty():
13411440
cid_print(f'Analysing view {view_name}')
@@ -1374,18 +1473,28 @@ def create_or_update_view(self, view_name: str, recursive: bool=True, update: bo
13741473
self.athena.execute_query(view_query)
13751474
else:
13761475
print(f'View "{view_name}" is not compatible with update. Skipping.')
1377-
assert self.athena.wait_for_view(view_name), f"Failed to update a view {view_name}"
1378-
logger.info(f'View "{view_name}" updated')
1476+
if 'CREATE OR REPLACE VIEW' in view_query.upper() or 'CREATE VIEW' in view_query.upper():
1477+
logger.debug('Start waiting')
1478+
assert self.athena.wait_for_view(view_name), f"Failed to update a view {view_name}"
1479+
logger.info(f'View "{view_name}" updated')
13791480
else:
13801481
return
13811482
else: # No found -> creation
13821483
logger.info(f'Creating view: "{view_name}"')
13831484
if view_definition.get('type') == 'Glue_Table':
13841485
self.glue.create_or_update_table(view_name, view_query)
1486+
logger.info(f'Table "{view_name}" created')
1487+
elif 'CREATE EXTERNAL TABLE' in view_query.upper():
1488+
print(f'Creating table: "{view_name}"')
1489+
try:
1490+
self.athena.execute_query(view_query)
1491+
except CidCritical as exc:
1492+
logger.exception(exc)
1493+
pass
13851494
else:
13861495
self.athena.execute_query(view_query)
1387-
assert self.athena.wait_for_view(view_name), f"Failed to create a view {view_name}"
1388-
logger.info(f'View "{view_name}" created')
1496+
assert self.athena.wait_for_view(view_name), f"Failed to create a view {view_name}"
1497+
logger.info(f'View "{view_name}" created')
13891498

13901499

13911500
def get_view_query(self, view_name: str) -> str:
@@ -1406,7 +1515,11 @@ def get_view_query(self, view_name: str) -> str:
14061515
raise Exception(f'\nCannot find view {view_name}')
14071516

14081517
# Load TPL file
1409-
template = Template(self.get_data_from_definition('view', view_definition))
1518+
data = self.get_data_from_definition('view', view_definition)
1519+
if isinstance(data, dict):
1520+
template = Template(json.dumps(data))
1521+
else:
1522+
template = Template(data)
14101523

14111524
# Prepare template parameters
14121525
columns_tpl = {
@@ -1415,24 +1528,14 @@ def get_view_query(self, view_name: str) -> str:
14151528
'athena_database_name': self.athena.DatabaseName,
14161529
}
14171530

1418-
for k, v in view_definition.get('parameters', dict()).items():
1419-
if isinstance(v, str):
1420-
param = {k:v}
1421-
elif isinstance(v, dict):
1422-
value = v.get('value')
1423-
while not value:
1424-
value = get_parameter(
1425-
param_name=f'view-{view_name}-{k}',
1426-
message=f"Required parameter: {k} ({v.get('description')})",
1427-
default=v.get('default'),
1428-
template_variables=dict(account_id=self.base.account_id),
1429-
)
1430-
param = {k:value}
1431-
else:
1432-
raise CidCritical(f'Unknown parameter type for "{k}". Must be a string or a dict with value or with default key')
1433-
# Add parameter
1434-
columns_tpl.update(param)
1435-
# Compile template
1531+
columns_tpl = self.get_template_parameters(
1532+
view_definition.get('parameters', dict()),
1533+
f'view-{view_name}-',
1534+
columns_tpl,
1535+
)
1536+
logger.debug(f"Rendering template for {view_name}")
1537+
logger.debug(str(columns_tpl))
1538+
columns_tpl = {key: str(value) for key, value in columns_tpl.items()}
14361539
compiled_query = template.safe_substitute(columns_tpl)
14371540

14381541
return compiled_query

cid/export.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,9 @@ def export_analysis(qs, athena):
221221
logger.info(f'Please replace manually location bucket with a parameter: s3://{location}')
222222
default = get_parameter(
223223
f'{key}-s3path',
224-
'please provide default value',
225-
default=location,
224+
'Please provide default value. (You can use {account_id} variable if needed)',
225+
default=re.sub(r'(\d{12})', '{account_id}', location),
226+
template_variables={'account_id': '{account_id}'},
226227
)
227228
view_data['parameters'] = {
228229
f's3path': {

cid/helpers/athena.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ def get_database(self, DatabaseName: str=None) -> bool:
172172
if 'AccessDeniedException' in str(exc):
173173
raise
174174
else:
175-
logger.debug(e, exc_info=True)
175+
logger.debug(exc, exc_info=True)
176176
return False
177177

178178
def list_table_metadata(self, DatabaseName: str=None, max_items: int=None) -> dict:
@@ -259,7 +259,7 @@ def execute_query(self, sql_query, sleep_duration=1, database: str=None, catalog
259259
if (current_status == "SUCCEEDED"):
260260
return query_id
261261
else:
262-
failure_reason = response['QueryExecution']['Status']['StateChangeReason']
262+
failure_reason = response.get('QueryExecution', {}).get('Status', {}).get('StateChangeReason',repr(response))
263263
logger.info(f'Athena query failed: {failure_reason}')
264264
logger.debug(f'Full query: {sql_query}')
265265
if fail:

0 commit comments

Comments
 (0)