Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[flake8]
max-line-length = 130
max-line-length = 120
ignore = E203, W503
4 changes: 3 additions & 1 deletion generator/explores/metric_definitions_explore.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ def _to_lookml(

explore_lookml: Dict[str, Any] = {
"name": self.name,
"always_filter": {"filters": [{"sampling": "1"}]},
"always_filter": {
"filters": [{"submission_date": "7 days"}, {"sampling": "1"}]
},
# The base view is the only view that exposes the date and client_id fields.
# All other views only expose the metric definitions.
"fields": exposed_fields,
Expand Down
258 changes: 18 additions & 240 deletions generator/views/metric_definitions_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@ class MetricDefinitionsView(View):

type: str = "metric_definitions_view"

# Time unit divisors for converting days to different granularities
TIME_UNITS = [
("date", 1), # Daily: no conversion needed
("week", 7), # Weekly: divide by 7
("month", 30), # Monthly: approximate 30 days per month
("quarter", 90), # Quarterly: approximate 90 days per quarter
("year", 365), # Yearly: approximate 365 days per year
]

def __init__(self, namespace: str, name: str, tables: List[Dict[str, str]]):
"""Get an instance of an MetricDefinitionsView."""
super().__init__(namespace, name, MetricDefinitionsView.type, tables)
Expand Down Expand Up @@ -210,33 +201,15 @@ def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]:
[
f"""
{data_source.name}.{data_source.submission_date_column or "submission_date"}
{{% if _filters['analysis_period'] != "" %}}
BETWEEN
DATE_SUB(
COALESCE(
SAFE_CAST(
{{% date_start analysis_period %}} AS DATE
), CURRENT_DATE()),
INTERVAL {{% parameter lookback_days %}} DAY
) AND
COALESCE(
SAFE_CAST(
{{% date_end analysis_period %}} AS DATE
), CURRENT_DATE())
{{% else %}}
BETWEEN
DATE_SUB(
COALESCE(
SAFE_CAST(
{{% date_start submission_date %}} AS DATE
), CURRENT_DATE()),
INTERVAL {{% parameter lookback_days %}} DAY
) AND
{{% date_start submission_date %}} AS DATE
), CURRENT_DATE()) AND
COALESCE(
SAFE_CAST(
{{% date_end submission_date %}} AS DATE
), CURRENT_DATE())
{{% endif %}}
"""
for data_source in [data_source_definition] + joined_data_sources
if data_source.submission_date_column != "NULL"
Expand Down Expand Up @@ -329,7 +302,6 @@ def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]:
)
view_defn["sets"] = self._get_sets()
view_defn["parameters"] = self._get_parameters(view_defn["dimensions"])
view_defn["filters"] = self._get_filters()

return {"views": [view_defn]}

Expand Down Expand Up @@ -378,9 +350,8 @@ def get_dimension_groups(self) -> List[Dict[str, Any]]:
{
"name": "submission",
"type": "time",
"datatype": "date",
"group_label": "Base Fields",
"sql": "${TABLE}.analysis_basis",
"sql": "CAST(${TABLE}.analysis_basis AS TIMESTAMP)",
"label": "Submission",
"timeframes": [
"raw",
Expand Down Expand Up @@ -441,35 +412,6 @@ def _get_parameters(self, dimensions: List[dict]):
"default_value": "100",
"hidden": hide_sampling,
},
{
"name": "lookback_days",
"label": "Lookback (Days)",
"type": "unquoted",
"description": "Number of days added before the filtered date range. "
+ "Useful for period-over-period comparisons.",
"default_value": "0",
},
{
"name": "date_groupby_position",
"label": "Date Group By Position",
"type": "unquoted",
"description": "Position of the date field in the group by clause. "
+ "Required when submission_week, submission_month, submission_quarter, submission_year "
+ "is selected as BigQuery can't correctly resolve the GROUP BY otherwise",
"default_value": "",
},
]

def _get_filters(self):
return [
{
"name": "analysis_period",
"type": "date",
"label": "Analysis Period (with Lookback)",
"description": "Use this filter to define the main analysis period. "
+ "The results will include the selected date range plus any additional "
+ "days specified by the 'Lookback days' setting.",
}
]

def get_measures(
Expand Down Expand Up @@ -620,185 +562,21 @@ def get_measures(
}
)
elif statistic_slug == "rolling_average":
# rolling averages are computed over existing statistics (e.g. sum, ratio)
aggregations = statistic_conf.get("aggregations", ["sum"])
for aggregation in aggregations:
# find measures that match the current dimension and aggregation type
matching_measures = [
m
for m in measures
if m["name"].startswith(
f"{dimension['name']}_{aggregation}"
aggregation = statistic_conf.get("aggregation", "sum")
if "window_sizes" in statistic_conf:
for window_size in statistic_conf["window_sizes"]:
measures.append(
{
"name": f"{dimension['name']}_{window_size}_day_{statistic_slug}",
"type": "number",
"label": f"{dimension_label} {window_size} Day Rolling Average",
"sql": f"""
AVG({aggregation}(${{TABLE}}.{dimension["name"]} * {sampling})) OVER (
ROWS {window_size} PRECEDING
)""",
"group_label": "Statistics",
"description": f"{window_size} day rolling average of {dimension_label}",
}
)
]
if "window_sizes" in statistic_conf:
for window_size in statistic_conf["window_sizes"]:
for matching_measure in matching_measures:
# these statistics require some time dimension to be selected
measures.append(
{
"name": f"{dimension['name']}_{statistic_slug}_{aggregation}_{window_size}_day",
"type": "number",
"label": f"{matching_measure['label']} {window_size} Day Rolling Average",
"sql": f"""
{{% if {self.name}.submission_date._is_selected or
{self.name}.submission_week._is_selected or
{self.name}.submission_month._is_selected or
{self.name}.submission_quarter._is_selected or
{self.name}.submission_year._is_selected %}}
AVG(${{{matching_measure['name']}}}) OVER (
{{% if date_groupby_position._parameter_value != "" %}}
ORDER BY {{% parameter date_groupby_position %}}
{{% elsif {self.name}.submission_date._is_selected %}}
ORDER BY ${{TABLE}}.analysis_basis
{{% else %}}
ERROR("date_groupby_position needs to be set when using submission_week,
submission_month, submission_quarter, or submission_year")
{{% endif %}}
ROWS BETWEEN {window_size} PRECEDING AND CURRENT ROW
{{% else %}}
ERROR('Please select a "submission_*" field to compute the rolling average')
{{% endif %}}
)""",
"group_label": "Statistics",
"description": f"{window_size} day rolling average of {dimension_label}",
}
)

# period-over-period measures compare current values with historical values
if "period_over_period" in statistic_conf:
# find all statistics that have period-over-period configured
matching_measures = [
m
for m in measures
if m["name"].startswith(
f"{dimension['name']}_{statistic_slug}"
)
and "_period_over_period_" not in m["name"]
]

# create period-over-period measures for each configured time period
for period in statistic_conf["period_over_period"].get(
"periods", []
):
for matching_measure in matching_measures:
original_sql = matching_measure["sql"]

# rolling averages need special handling to adjust window sizes
# based on the selected time granularity
if statistic_slug == "rolling_average":
sql = self._create_rolling_average_period_sql(
original_sql, period
)
else:
# standard measures use LAG function with time-adjusted periods
sql = self._create_lag_period_sql(
matching_measure, period
)

# generate different types of period-over-period comparisons
for kind in statistic_conf["period_over_period"].get(
"kinds", ["previous"]
):
if kind == "difference":
comparison_sql = f"({original_sql}) - ({sql})"
elif kind == "relative_change":
comparison_sql = f"SAFE_DIVIDE(({original_sql}), NULLIF(({sql}), 0)) - 1"
else:
comparison_sql = sql

measures.append(
{
"name": f"{matching_measure['name']}_{period}_day_period_over_period_{kind}",
"type": "number",
"label": f"{matching_measure['label']} "
+ f"{period} Day Period Over Period {kind.capitalize()}",
"description": f"Period over period {kind.capitalize()} of "
+ f"{matching_measure['label']} over {period} days",
"group_label": "Statistics",
"sql": comparison_sql,
}
)

return measures

def _create_rolling_average_period_sql(self, original_sql: str, period: int) -> str:
"""
Create period-over-period SQL for rolling average measures.

Rolling averages require adjusting the window size based on the selected time granularity.
"""
rows_match = re.search(
r"ROWS BETWEEN (\d+) PRECEDING AND CURRENT ROW",
original_sql,
)

if not rows_match:
return original_sql

original_window_size = int(rows_match.group(1))
time_conditions = []

for unit, divisor in self.TIME_UNITS:
# calculate adjusted window size for this time granularity
adjusted_window = (
(original_window_size + period) // divisor
if unit != "date"
else original_window_size + period
)

condition = (
f"{{% {'if' if unit == 'date' else 'elsif'} "
+ f"{self.name}.submission_{unit}._is_selected %}}"
)

# modify the ROWS clause to extend the window by the period
modified_sql = re.sub(
r"ROWS BETWEEN \d+ PRECEDING AND CURRENT ROW",
f"ROWS BETWEEN {adjusted_window} PRECEDING AND "
+ f"{adjusted_window - original_window_size} PRECEDING",
original_sql,
)
time_conditions.append(f"{condition}\n{modified_sql}")

return (
"\n".join(time_conditions)
+ f"\n{{% else %}}\n{original_sql}\n{{% endif %}}"
)

def _create_lag_period_sql(self, matching_measure: dict, period: int) -> str:
"""
Create period-over-period SQL using LAG function for standard measures.

LAG function looks back N periods to get historical values. The period is adjusted
based on the selected time granularity (daily, weekly, monthly, etc.).
"""
time_conditions = []

for unit, divisor in self.TIME_UNITS:
# calculate adjusted period for this time granularity
adjusted_period = period // divisor if unit != "date" else period

order_by = (
f"${{submission_{unit}}}" if unit != "date" else "${submission_date}"
)

condition = (
f"{{% {'if' if unit == 'date' else 'elsif'} "
+ f"{self.name}.submission_{unit}._is_selected %}}"
)

lag_sql = f"""LAG(${{{matching_measure['name']}}}, {adjusted_period}) OVER (
{{% if date_groupby_position._parameter_value != "" %}}
ORDER BY {{% parameter date_groupby_position %}}
{{% else %}}
ORDER BY {order_by}
{{% endif %}}
)"""
time_conditions.append(f"{condition}\n{lag_sql}")

return (
"\n".join(time_conditions)
+ f"\n{{% else %}}\nLAG({matching_measure['name']}, {period}) "
+ "OVER (ORDER BY ${submission_date})\n{% endif %}"
)