diff --git a/.flake8 b/.flake8 index 8166c398..11ac08cc 100644 --- a/.flake8 +++ b/.flake8 @@ -1,3 +1,3 @@ [flake8] -max-line-length = 120 +max-line-length = 130 ignore = E203, W503 diff --git a/generator/explores/metric_definitions_explore.py b/generator/explores/metric_definitions_explore.py index fb66312c..6e6d13c1 100644 --- a/generator/explores/metric_definitions_explore.py +++ b/generator/explores/metric_definitions_explore.py @@ -44,9 +44,7 @@ def _to_lookml( explore_lookml: Dict[str, Any] = { "name": self.name, - "always_filter": { - "filters": [{"submission_date": "7 days"}, {"sampling": "1"}] - }, + "always_filter": {"filters": [{"sampling": "1"}]}, # The base view is the only view that exposes the date and client_id fields. # All other views only expose the metric definitions. "fields": exposed_fields, diff --git a/generator/views/metric_definitions_view.py b/generator/views/metric_definitions_view.py index f9ed8b6b..dfef5c58 100644 --- a/generator/views/metric_definitions_view.py +++ b/generator/views/metric_definitions_view.py @@ -16,6 +16,15 @@ class MetricDefinitionsView(View): type: str = "metric_definitions_view" + # Time unit divisors for converting days to different granularities + TIME_UNITS = [ + ("date", 1), # Daily: no conversion needed + ("week", 7), # Weekly: divide by 7 + ("month", 30), # Monthly: approximate 30 days per month + ("quarter", 90), # Quarterly: approximate 90 days per quarter + ("year", 365), # Yearly: approximate 365 days per year + ] + def __init__(self, namespace: str, name: str, tables: List[Dict[str, str]]): """Get an instance of an MetricDefinitionsView.""" super().__init__(namespace, name, MetricDefinitionsView.type, tables) @@ -201,15 +210,33 @@ def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: [ f""" {data_source.name}.{data_source.submission_date_column or "submission_date"} + {{% if _filters['analysis_period'] != "" %}} BETWEEN + DATE_SUB( + COALESCE( + SAFE_CAST( + {{% date_start analysis_period %}} AS DATE + ), CURRENT_DATE()), + INTERVAL {{% parameter lookback_days %}} DAY + ) AND COALESCE( SAFE_CAST( - {{% date_start submission_date %}} AS DATE - ), CURRENT_DATE()) AND + {{% date_end analysis_period %}} AS DATE + ), CURRENT_DATE()) + {{% else %}} + BETWEEN + DATE_SUB( + COALESCE( + SAFE_CAST( + {{% date_start submission_date %}} AS DATE + ), CURRENT_DATE()), + INTERVAL {{% parameter lookback_days %}} DAY + ) AND COALESCE( SAFE_CAST( {{% date_end submission_date %}} AS DATE ), CURRENT_DATE()) + {{% endif %}} """ for data_source in [data_source_definition] + joined_data_sources if data_source.submission_date_column != "NULL" @@ -302,6 +329,7 @@ def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: ) view_defn["sets"] = self._get_sets() view_defn["parameters"] = self._get_parameters(view_defn["dimensions"]) + view_defn["filters"] = self._get_filters() return {"views": [view_defn]} @@ -350,8 +378,9 @@ def get_dimension_groups(self) -> List[Dict[str, Any]]: { "name": "submission", "type": "time", + "datatype": "date", "group_label": "Base Fields", - "sql": "CAST(${TABLE}.analysis_basis AS TIMESTAMP)", + "sql": "${TABLE}.analysis_basis", "label": "Submission", "timeframes": [ "raw", @@ -412,6 +441,35 @@ def _get_parameters(self, dimensions: List[dict]): "default_value": "100", "hidden": hide_sampling, }, + { + "name": "lookback_days", + "label": "Lookback (Days)", + "type": "unquoted", + "description": "Number of days added before the filtered date range. " + + "Useful for period-over-period comparisons.", + "default_value": "0", + }, + { + "name": "date_groupby_position", + "label": "Date Group By Position", + "type": "unquoted", + "description": "Position of the date field in the group by clause. " + + "Required when submission_week, submission_month, submission_quarter, submission_year " + + "is selected as BigQuery can't correctly resolve the GROUP BY otherwise", + "default_value": "", + }, + ] + + def _get_filters(self): + return [ + { + "name": "analysis_period", + "type": "date", + "label": "Analysis Period (with Lookback)", + "description": "Use this filter to define the main analysis period. " + + "The results will include the selected date range plus any additional " + + "days specified by the 'Lookback days' setting.", + } ] def get_measures( @@ -562,21 +620,185 @@ def get_measures( } ) elif statistic_slug == "rolling_average": - aggregation = statistic_conf.get("aggregation", "sum") - if "window_sizes" in statistic_conf: - for window_size in statistic_conf["window_sizes"]: - measures.append( - { - "name": f"{dimension['name']}_{window_size}_day_{statistic_slug}", - "type": "number", - "label": f"{dimension_label} {window_size} Day Rolling Average", - "sql": f""" - AVG({aggregation}(${{TABLE}}.{dimension["name"]} * {sampling})) OVER ( - ROWS {window_size} PRECEDING - )""", - "group_label": "Statistics", - "description": f"{window_size} day rolling average of {dimension_label}", - } + # rolling averages are computed over existing statistics (e.g. sum, ratio) + aggregations = statistic_conf.get("aggregations", ["sum"]) + for aggregation in aggregations: + # find measures that match the current dimension and aggregation type + matching_measures = [ + m + for m in measures + if m["name"].startswith( + f"{dimension['name']}_{aggregation}" ) + ] + if "window_sizes" in statistic_conf: + for window_size in statistic_conf["window_sizes"]: + for matching_measure in matching_measures: + # these statistics require some time dimension to be selected + measures.append( + { + "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_{window_size}_day", + "type": "number", + "label": f"{matching_measure['label']} {window_size} Day Rolling Average", + "sql": f""" + {{% if {self.name}.submission_date._is_selected or + {self.name}.submission_week._is_selected or + {self.name}.submission_month._is_selected or + {self.name}.submission_quarter._is_selected or + {self.name}.submission_year._is_selected %}} + AVG(${{{matching_measure['name']}}}) OVER ( + {{% if date_groupby_position._parameter_value != "" %}} + ORDER BY {{% parameter date_groupby_position %}} + {{% elsif {self.name}.submission_date._is_selected %}} + ORDER BY ${{TABLE}}.analysis_basis + {{% else %}} + ERROR("date_groupby_position needs to be set when using submission_week, + submission_month, submission_quarter, or submission_year") + {{% endif %}} + ROWS BETWEEN {window_size} PRECEDING AND CURRENT ROW + {{% else %}} + ERROR('Please select a "submission_*" field to compute the rolling average') + {{% endif %}} + )""", + "group_label": "Statistics", + "description": f"{window_size} day rolling average of {dimension_label}", + } + ) + + # period-over-period measures compare current values with historical values + if "period_over_period" in statistic_conf: + # find all statistics that have period-over-period configured + matching_measures = [ + m + for m in measures + if m["name"].startswith( + f"{dimension['name']}_{statistic_slug}" + ) + and "_period_over_period_" not in m["name"] + ] + + # create period-over-period measures for each configured time period + for period in statistic_conf["period_over_period"].get( + "periods", [] + ): + for matching_measure in matching_measures: + original_sql = matching_measure["sql"] + + # rolling averages need special handling to adjust window sizes + # based on the selected time granularity + if statistic_slug == "rolling_average": + sql = self._create_rolling_average_period_sql( + original_sql, period + ) + else: + # standard measures use LAG function with time-adjusted periods + sql = self._create_lag_period_sql( + matching_measure, period + ) + + # generate different types of period-over-period comparisons + for kind in statistic_conf["period_over_period"].get( + "kinds", ["previous"] + ): + if kind == "difference": + comparison_sql = f"({original_sql}) - ({sql})" + elif kind == "relative_change": + comparison_sql = f"SAFE_DIVIDE(({original_sql}), NULLIF(({sql}), 0)) - 1" + else: + comparison_sql = sql + + measures.append( + { + "name": f"{matching_measure['name']}_{period}_day_period_over_period_{kind}", + "type": "number", + "label": f"{matching_measure['label']} " + + f"{period} Day Period Over Period {kind.capitalize()}", + "description": f"Period over period {kind.capitalize()} of " + + f"{matching_measure['label']} over {period} days", + "group_label": "Statistics", + "sql": comparison_sql, + } + ) return measures + + def _create_rolling_average_period_sql(self, original_sql: str, period: int) -> str: + """ + Create period-over-period SQL for rolling average measures. + + Rolling averages require adjusting the window size based on the selected time granularity. + """ + rows_match = re.search( + r"ROWS BETWEEN (\d+) PRECEDING AND CURRENT ROW", + original_sql, + ) + + if not rows_match: + return original_sql + + original_window_size = int(rows_match.group(1)) + time_conditions = [] + + for unit, divisor in self.TIME_UNITS: + # calculate adjusted window size for this time granularity + adjusted_window = ( + (original_window_size + period) // divisor + if unit != "date" + else original_window_size + period + ) + + condition = ( + f"{{% {'if' if unit == 'date' else 'elsif'} " + + f"{self.name}.submission_{unit}._is_selected %}}" + ) + + # modify the ROWS clause to extend the window by the period + modified_sql = re.sub( + r"ROWS BETWEEN \d+ PRECEDING AND CURRENT ROW", + f"ROWS BETWEEN {adjusted_window} PRECEDING AND " + + f"{adjusted_window - original_window_size} PRECEDING", + original_sql, + ) + time_conditions.append(f"{condition}\n{modified_sql}") + + return ( + "\n".join(time_conditions) + + f"\n{{% else %}}\n{original_sql}\n{{% endif %}}" + ) + + def _create_lag_period_sql(self, matching_measure: dict, period: int) -> str: + """ + Create period-over-period SQL using LAG function for standard measures. + + LAG function looks back N periods to get historical values. The period is adjusted + based on the selected time granularity (daily, weekly, monthly, etc.). + """ + time_conditions = [] + + for unit, divisor in self.TIME_UNITS: + # calculate adjusted period for this time granularity + adjusted_period = period // divisor if unit != "date" else period + + order_by = ( + f"${{submission_{unit}}}" if unit != "date" else "${submission_date}" + ) + + condition = ( + f"{{% {'if' if unit == 'date' else 'elsif'} " + + f"{self.name}.submission_{unit}._is_selected %}}" + ) + + lag_sql = f"""LAG(${{{matching_measure['name']}}}, {adjusted_period}) OVER ( + {{% if date_groupby_position._parameter_value != "" %}} + ORDER BY {{% parameter date_groupby_position %}} + {{% else %}} + ORDER BY {order_by} + {{% endif %}} + )""" + time_conditions.append(f"{condition}\n{lag_sql}") + + return ( + "\n".join(time_conditions) + + f"\n{{% else %}}\nLAG({matching_measure['name']}, {period}) " + + "OVER (ORDER BY ${submission_date})\n{% endif %}" + )