|
| 1 | +# Copyright 2025 Google LLC |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +# This DBT Python model processes EPA historical air quality data from BigQuery |
| 16 | +# using BigFrames. The primary goal is to merge several hourly summary |
| 17 | +# tables into a single, unified DataFrame for later prediction. It includes the |
| 18 | +# following steps: |
| 19 | +# 1. Reading and Cleaning: It reads individual hourly summary tables from |
| 20 | +# BigQuery for various atmospheric parameters (like CO, O3, temperature, |
| 21 | +# and wind speed). Each table is cleaned by sorting, removing duplicates, |
| 22 | +# and renaming columns for clarity. |
| 23 | +# 2. Combining Data: It then merges these cleaned tables into a single, |
| 24 | +# comprehensive DataFrame. An inner join is used to ensure the final output |
| 25 | +# only includes records with complete data across all parameters. |
| 26 | +# 3. Final Output: The unified DataFrame is returned as the model's output, |
| 27 | +# creating a corresponding BigQuery table for future use. |
| 28 | +# |
| 29 | +# See more details from the related blog post: https://docs.getdbt.com/blog/train-linear-dbt-bigframes |
| 30 | + |
| 31 | + |
| 32 | +import bigframes.pandas as bpd |
| 33 | + |
| 34 | +def model(dbt, session): |
| 35 | + # Optional: override settings from dbt_project.yml. |
| 36 | + # When both are set, dbt.config takes precedence over dbt_project.yml. |
| 37 | + dbt.config(submission_method="bigframes", timeout=6000) |
| 38 | + |
| 39 | + # Define the dataset and the columns of interest representing various parameters |
| 40 | + # in the atmosphere. |
| 41 | + dataset = "bigquery-public-data.epa_historical_air_quality" |
| 42 | + index_columns = ["state_name", "county_name", "site_num", "date_local", "time_local"] |
| 43 | + param_column = "parameter_name" |
| 44 | + value_column = "sample_measurement" |
| 45 | + |
| 46 | + # Initialize a list for collecting dataframes from individual parameters. |
| 47 | + params_dfs = [] |
| 48 | + |
| 49 | + # Collect dataframes from tables which contain data for single parameter. |
| 50 | + table_param_dict = { |
| 51 | + "co_hourly_summary" : "co", |
| 52 | + "no2_hourly_summary" : "no2", |
| 53 | + "o3_hourly_summary" : "o3", |
| 54 | + "pressure_hourly_summary" : "pressure", |
| 55 | + "so2_hourly_summary" : "so2", |
| 56 | + "temperature_hourly_summary" : "temperature", |
| 57 | + } |
| 58 | + |
| 59 | + for table, param in table_param_dict.items(): |
| 60 | + param_df = bpd.read_gbq( |
| 61 | + f"{dataset}.{table}", |
| 62 | + columns=index_columns + [value_column] |
| 63 | + ) |
| 64 | + param_df = param_df\ |
| 65 | + .sort_values(index_columns)\ |
| 66 | + .drop_duplicates(index_columns)\ |
| 67 | + .set_index(index_columns)\ |
| 68 | + .rename(columns={value_column : param}) |
| 69 | + params_dfs.append(param_df) |
| 70 | + |
| 71 | + # Collect dataframes from the table containing wind speed. |
| 72 | + # Optionally: collect dataframes from other tables containing |
| 73 | + # wind direction, NO, NOx, and NOy data as needed. |
| 74 | + wind_table = f"{dataset}.wind_hourly_summary" |
| 75 | + bpd.read_gbq(wind_table, columns=[param_column]).value_counts() |
| 76 | + |
| 77 | + wind_speed_df = bpd.read_gbq( |
| 78 | + wind_table, |
| 79 | + columns=index_columns + [value_column], |
| 80 | + filters=[(param_column, "==", "Wind Speed - Resultant")] |
| 81 | + ) |
| 82 | + wind_speed_df = wind_speed_df\ |
| 83 | + .sort_values(index_columns)\ |
| 84 | + .drop_duplicates(index_columns)\ |
| 85 | + .set_index(index_columns)\ |
| 86 | + .rename(columns={value_column: "wind_speed"}) |
| 87 | + params_dfs.append(wind_speed_df) |
| 88 | + |
| 89 | + # Combine data for all the selected parameters. |
| 90 | + df = bpd.concat(params_dfs, axis=1, join="inner") |
| 91 | + df = df.reset_index() |
| 92 | + |
| 93 | + return df |
0 commit comments