|
| 1 | +import inspect |
| 2 | +from typing import List |
| 3 | + |
| 4 | +from sqlglot import parse_one |
| 5 | +from sqlglot.optimizer.eliminate_ctes import eliminate_ctes |
| 6 | +from sqlglot.optimizer.eliminate_joins import eliminate_joins |
| 7 | +from sqlglot.optimizer.eliminate_subqueries import eliminate_subqueries |
| 8 | +from sqlglot.optimizer.normalize import normalize |
| 9 | +from sqlglot.optimizer.pushdown_projections import pushdown_projections |
| 10 | +from sqlglot.optimizer.qualify import qualify |
| 11 | +from sqlglot.optimizer.unnest_subqueries import unnest_subqueries |
| 12 | + |
| 13 | +from datapilot.core.insights.sql.base.insight import SqlInsight |
| 14 | +from datapilot.core.insights.utils import get_severity |
| 15 | +from datapilot.core.platforms.dbt.insights.schema import DBTInsightResult, DBTModelInsightResponse |
| 16 | + |
| 17 | +RULES = ( |
| 18 | + pushdown_projections, |
| 19 | + normalize, |
| 20 | + unnest_subqueries, |
| 21 | + eliminate_subqueries, |
| 22 | + eliminate_joins, |
| 23 | + eliminate_ctes, |
| 24 | +) |
| 25 | + |
| 26 | +class SqlCheck(SqlInsight): |
| 27 | + """ |
| 28 | + This class identifies DBT models with SQL optimization issues. |
| 29 | + """ |
| 30 | + |
| 31 | + NAME = "sql optimization issues" |
| 32 | + ALIAS = "check_sql_optimization" |
| 33 | + DESCRIPTION = "Checks if the model has SQL optimization issues. " |
| 34 | + REASON_TO_FLAG = "The query can be optimized." |
| 35 | + FAILURE_MESSAGE = "The query for model `{model_unique_id}` has optimization opportunities:\n{rule_name}. " |
| 36 | + RECOMMENDATION = "Please adapt the query of the model `{model_unique_id}` as in following example:\n{optimized_sql}" |
| 37 | + |
| 38 | + def _build_failure_result(self, model_unique_id: str, rule_name: str, optimized_sql: str) -> DBTInsightResult: |
| 39 | + """ |
| 40 | + Constructs a failure result for a given model with sql optimization issues. |
| 41 | + :param model_unique_id: The unique id of the dbt model. |
| 42 | + :param rule_name: The rule that generated this failure result. |
| 43 | + :param optimized_sql: The optimized sql. |
| 44 | + :return: An instance of DBTInsightResult containing failure details. |
| 45 | + """ |
| 46 | + failure_message = self.FAILURE_MESSAGE.format(model_unique_id=model_unique_id, rule_name=rule_name) |
| 47 | + recommendation = self.RECOMMENDATION.format(model_unique_id=model_unique_id, optimized_sql=optimized_sql) |
| 48 | + return DBTInsightResult( |
| 49 | + type=self.TYPE, |
| 50 | + name=self.NAME, |
| 51 | + message=failure_message, |
| 52 | + recommendation=recommendation, |
| 53 | + reason_to_flag=self.REASON_TO_FLAG, |
| 54 | + metadata={"model_unique_id": model_unique_id, "rule_name": rule_name}, |
| 55 | + ) |
| 56 | + |
| 57 | + def generate(self, *args, **kwargs) -> List[DBTModelInsightResponse]: |
| 58 | + """ |
| 59 | + Generates insights for each DBT model in the project, focusing on sql optimization issues. |
| 60 | +
|
| 61 | + :return: A list of DBTModelInsightResponse objects with insights for each model. |
| 62 | + """ |
| 63 | + self.logger.debug("Generating sql insights for DBT models") |
| 64 | + insights = [] |
| 65 | + |
| 66 | + possible_kwargs = { |
| 67 | + "db": None, |
| 68 | + "catalog": None, |
| 69 | + "dialect": self.adapter_type, |
| 70 | + "isolate_tables": True, # needed for other optimizations to perform well |
| 71 | + "quote_identifiers": False, |
| 72 | + **kwargs, |
| 73 | + } |
| 74 | + for node_id, node in self.nodes.items(): |
| 75 | + try: |
| 76 | + compiled_query = node.compiled_code |
| 77 | + if compiled_query: |
| 78 | + parsed_query = parse_one(compiled_query, dialect=self.adapter_type) |
| 79 | + qualified = qualify(parsed_query, **possible_kwargs) |
| 80 | + changed = qualified.copy() |
| 81 | + for rule in RULES: |
| 82 | + original = changed.copy() |
| 83 | + rule_params = inspect.getfullargspec(rule).args |
| 84 | + rule_kwargs = { |
| 85 | + param: possible_kwargs[param] |
| 86 | + for param in rule_params |
| 87 | + if param in possible_kwargs |
| 88 | + } |
| 89 | + changed = rule(changed, **rule_kwargs) |
| 90 | + if changed.sql() != original.sql(): |
| 91 | + insights.append( |
| 92 | + DBTModelInsightResponse( |
| 93 | + unique_id=node_id, |
| 94 | + package_name=node.package_name, |
| 95 | + path=node.original_file_path, |
| 96 | + original_file_path=node.original_file_path, |
| 97 | + insight=self._build_failure_result( |
| 98 | + node_id, |
| 99 | + rule.__name__, |
| 100 | + changed.sql() |
| 101 | + ), |
| 102 | + severity=get_severity(self.config, self.ALIAS, self.DEFAULT_SEVERITY), |
| 103 | + ) |
| 104 | + ) |
| 105 | + except Exception as e: |
| 106 | + self.logger.error(e) |
| 107 | + return insights |
0 commit comments