diff --git a/dbt-adapters/.changes/unreleased/Under the Hood-20251023-115028.yaml b/dbt-adapters/.changes/unreleased/Under the Hood-20251023-115028.yaml new file mode 100644 index 000000000..5f8f07bc4 --- /dev/null +++ b/dbt-adapters/.changes/unreleased/Under the Hood-20251023-115028.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Breakdown schema synchronization macros to enable more detailed operations like nested structures schema modifications +time: 2025-10-23T11:50:28.048588+02:00 +custom: + Author: Kayrnt + Issue: "599" diff --git a/dbt-adapters/src/dbt/include/global_project/macros/materializations/models/incremental/on_schema_change.sql b/dbt-adapters/src/dbt/include/global_project/macros/materializations/models/incremental/on_schema_change.sql index 76fe372f4..9b6a6a6a4 100644 --- a/dbt-adapters/src/dbt/include/global_project/macros/materializations/models/incremental/on_schema_change.sql +++ b/dbt-adapters/src/dbt/include/global_project/macros/materializations/models/incremental/on_schema_change.sql @@ -17,6 +17,11 @@ {% macro check_for_schema_changes(source_relation, target_relation) %} + {{ return(adapter.dispatch('check_for_schema_changes', 'dbt')(source_relation, target_relation)) }} +{% endmacro %} + + +{% macro default__check_for_schema_changes(source_relation, target_relation) %} {% set schema_changed = False %} @@ -60,8 +65,15 @@ {% macro sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %} + {{ return(adapter.dispatch('sync_column_schemas', 'dbt')(on_schema_change, target_relation, schema_changes_dict)) }} +{% endmacro %} + + +{% macro default__sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %} {%- set add_to_target_arr = schema_changes_dict['source_not_in_target'] -%} + {%- set remove_from_target_arr = schema_changes_dict['target_not_in_source'] -%} + {%- set new_target_types = schema_changes_dict['new_target_types'] -%} {%- if on_schema_change == 'append_new_columns'-%} {%- if add_to_target_arr | length > 0 -%} @@ -69,8 +81,6 @@ {%- endif -%} {% elif on_schema_change == 'sync_all_columns' %} - {%- set remove_from_target_arr = schema_changes_dict['target_not_in_source'] -%} - {%- set new_target_types = schema_changes_dict['new_target_types'] -%} {% if add_to_target_arr | length > 0 or remove_from_target_arr | length > 0 %} {%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, remove_from_target_arr) -%} @@ -100,6 +110,11 @@ {% macro process_schema_changes(on_schema_change, source_relation, target_relation) %} + {{ return(adapter.dispatch('process_schema_changes', 'dbt')(on_schema_change, source_relation, target_relation)) }} +{% endmacro %} + + +{% macro default__process_schema_changes(on_schema_change, source_relation, target_relation) %} {% if on_schema_change == 'ignore' %} diff --git a/dbt-bigquery/.changes/unreleased/Features-20251012-134419.yaml b/dbt-bigquery/.changes/unreleased/Features-20251012-134419.yaml new file mode 100644 index 000000000..7fa6054c4 --- /dev/null +++ b/dbt-bigquery/.changes/unreleased/Features-20251012-134419.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Support BigQuery STRUCT schema change +time: 2025-10-12T13:44:19.542878+02:00 +custom: + Author: Kayrnt + Issue: "599" diff --git a/dbt-bigquery/src/dbt/adapters/bigquery/impl.py b/dbt-bigquery/src/dbt/adapters/bigquery/impl.py index 8efc62ab5..adbe844fd 100644 --- a/dbt-bigquery/src/dbt/adapters/bigquery/impl.py +++ b/dbt-bigquery/src/dbt/adapters/bigquery/impl.py @@ -1,3 +1,4 @@ +import copy from dataclasses import dataclass from datetime import datetime from multiprocessing.context import SpawnContext @@ -8,11 +9,13 @@ FrozenSet, Iterable, List, + Mapping, Optional, + Sequence, + Set, Tuple, TYPE_CHECKING, Type, - Set, Union, ) @@ -651,19 +654,277 @@ def update_table_description( @available.parse_none def alter_table_add_columns(self, relation, columns): - logger.debug('Adding columns ({}) to table {}".'.format(columns, relation)) + logger.debug('Adding columns ({}) to table "{}".'.format(columns, relation)) + self.alter_table_add_remove_columns(relation, columns, None) + @available.parse_none + def alter_table_add_remove_columns(self, relation, add_columns, remove_columns): conn = self.connections.get_thread_connection() client = conn.handle table_ref = self.get_table_ref_from_relation(relation) table = client.get_table(table_ref) - new_columns = [col.column_to_bq_schema() for col in columns] - new_schema = table.schema + new_columns + schema_as_dicts = [field.to_api_repr() for field in table.schema] + + # BigQuery only supports dropping top-level columns via ALTER TABLE. + # Track names so nested removals can still be logged for visibility. + drop_candidates: List[BigQueryColumn] = [] + nested_removals: List[str] = [] + + if remove_columns: + for column in remove_columns: + if "." in column.name: + nested_removals.append(column.name) + else: + drop_candidates.append(column) + + if nested_removals: + logger.warning( + "BigQuery limitation: Cannot remove nested fields via schema update. " + "Attempted to remove: {}. Consider using 'append_new_columns' mode " + "or recreating the table with full_refresh.".format(nested_removals) + ) - new_table = google.cloud.bigquery.Table(table_ref, schema=new_schema) - client.update_table(new_table, ["schema"]) + if drop_candidates: + relation_name = relation.render() + drop_clauses = [f"DROP COLUMN {self.quote(column.name)}" for column in drop_candidates] + drop_sql = f"ALTER TABLE {relation_name} {', '.join(drop_clauses)}" + + column_names = [column.name for column in drop_candidates] + logger.debug( + 'Dropping columns `{}` from table "{}".'.format(column_names, relation_name) + ) + self.execute(drop_sql, fetch=False) + + # Refresh schema after drops so additions operate on the latest definition + table = client.get_table(table_ref) + schema_as_dicts = [field.to_api_repr() for field in table.schema] + + apply_schema_patch = False + + if add_columns: + additions = self._build_nested_additions(add_columns) + schema_as_dicts = self._merge_nested_fields(schema_as_dicts, additions) + apply_schema_patch = True + + if apply_schema_patch: + new_schema = [SchemaField.from_api_repr(field) for field in schema_as_dicts] + new_table = google.cloud.bigquery.Table(table_ref, schema=new_schema) + client.update_table(new_table, ["schema"]) + + def _build_nested_additions( + self, add_columns: Sequence[BigQueryColumn] + ) -> Dict[str, Dict[str, Any]]: + additions: Dict[str, Dict[str, Any]] = {} + + for column in add_columns: + schema_field = column.column_to_bq_schema().to_api_repr() + additions[column.name] = schema_field + + return additions + + def _merge_nested_fields( + self, + existing_fields: Sequence[Dict[str, Any]], + additions: Mapping[str, Dict[str, Any]], + prefix: str = "", + ) -> List[Dict[str, Any]]: + """Merge new fields into existing STRUCT fields, appending at each nesting level. + + Note: Primarily used for field removal. For adding fields, sync_struct_columns + uses the source schema directly to preserve field order. + """ + merged_fields: List[Dict[str, Any]] = [] + addition_lookup = dict(additions) + + for field in existing_fields: + field_name = field["name"] + qualified_name = f"{prefix}.{field_name}" if prefix else field_name + + direct_addition = addition_lookup.pop(qualified_name, None) + if direct_addition is not None: + merged_fields.append(copy.deepcopy(direct_addition)) + continue + + nested_additions = { + key: value + for key, value in list(addition_lookup.items()) + if key.startswith(f"{qualified_name}.") + } + + if nested_additions and field.get("type") == "RECORD": + for key in nested_additions: + addition_lookup.pop(key, None) + + stripped_additions = { + key.split(".", 1)[1]: value for key, value in nested_additions.items() + } + + merged_children = self._merge_nested_fields( + field.get("fields", []) or [], + stripped_additions, + prefix="", + ) + + merged_field = copy.deepcopy(field) + merged_field["fields"] = merged_children + merged_fields.append(merged_field) + else: + merged_fields.append(copy.deepcopy(field)) + + for path, addition in addition_lookup.items(): + if "." not in path: + merged_fields.append(copy.deepcopy(addition)) + + return merged_fields + + def _collect_field_dicts( + self, fields: Sequence[Dict[str, Any]], prefix: str = "" + ) -> Dict[str, Dict[str, Any]]: + collected: Dict[str, Dict[str, Any]] = {} + for field in fields: + name = field["name"] + path = f"{prefix}.{name}" if prefix else name + collected[path] = field + if field.get("type") == "RECORD": + collected.update(self._collect_field_dicts(field.get("fields", []) or [], path)) + return collected + + def _find_missing_fields( + self, + source_fields: Sequence[Dict[str, Any]], + target_fields: Sequence[Dict[str, Any]], + ) -> Dict[str, Dict[str, Any]]: + source_map = self._collect_field_dicts(source_fields) + target_map = self._collect_field_dicts(target_fields) + return { + path: copy.deepcopy(field) + for path, field in source_map.items() + if path not in target_map + } + + @available.parse(lambda *a, **k: {}) + def sync_struct_columns( + self, + on_schema_change: str, + source_relation: BigQueryRelation, + target_relation: BigQueryRelation, + schema_changes_dict: Dict[str, Any], + ) -> Dict[str, Any]: + if on_schema_change not in ("append_new_columns", "sync_all_columns"): + return schema_changes_dict + + logger.debug( + "BigQuery STRUCT sync invoked: mode=%s target=%s", + on_schema_change, + target_relation.render(), + ) + + conn = self.connections.get_thread_connection() + client = conn.handle + + source_table = client.get_table(self.get_table_ref_from_relation(source_relation)) + target_table = client.get_table(self.get_table_ref_from_relation(target_relation)) + + source_schema = [field.to_api_repr() for field in source_table.schema] + target_schema = [field.to_api_repr() for field in target_table.schema] + # Identify nested fields that exist in the source schema but not the target. + missing_fields = self._find_missing_fields(source_schema, target_schema) + nested_additions = { + path: field_def for path, field_def in missing_fields.items() if "." in path + } + + # Also include struct columns flagged by diff_column_data_types so we cover + # cases where only the STRUCT signature changed. + struct_type_changes: Set[str] = set() + for change in schema_changes_dict.get("new_target_types", []): + column_name = change.get("column_name") + if not column_name: + continue + new_type = change.get("new_type", "") + if "STRUCT<" in new_type.upper() or "RECORD" in new_type.upper(): + struct_type_changes.add(column_name.split(".", 1)[0]) + + struct_columns_to_update: Set[str] = { + path.split(".", 1)[0] for path in nested_additions.keys() + } + struct_columns_to_update.update(struct_type_changes) + + logger.debug( + "BigQuery STRUCT sync details: target=%s nested_additions=%s struct_columns=%s", + target_relation.render(), + sorted(nested_additions.keys()), + sorted(struct_columns_to_update), + ) + + if not struct_columns_to_update: + return schema_changes_dict + + updated_schema: List[Dict[str, Any]] = [] + handled_columns: Set[str] = set() + + for field in target_schema: + field_name = field["name"] + if field_name in struct_columns_to_update: + source_field = next((f for f in source_schema if f["name"] == field_name), None) + if source_field: + updated_schema.append(copy.deepcopy(source_field)) + handled_columns.add(field_name) + else: + logger.debug( + "BigQuery STRUCT sync: unable to locate source definition for %s on %s", + field_name, + target_relation.render(), + ) + updated_schema.append(copy.deepcopy(field)) + else: + updated_schema.append(copy.deepcopy(field)) + + if not handled_columns: + return schema_changes_dict + + try: + new_schema = [SchemaField.from_api_repr(field) for field in updated_schema] + target_table.schema = new_schema + client.update_table(target_table, ["schema"]) + logger.debug( + "BigQuery STRUCT sync applied for %s columns=%s", + target_relation.render(), + sorted(handled_columns), + ) + + if schema_changes_dict.get("source_not_in_target"): + schema_changes_dict["source_not_in_target"] = [ + column + for column in schema_changes_dict["source_not_in_target"] + if column.name.split(".", 1)[0] not in handled_columns + ] + + schema_changes_dict["target_columns"] = [ + BigQueryColumn.create_from_field(field) for field in new_schema + ] + except google.api_core.exceptions.BadRequest as exc: + logger.warning("Failed to update STRUCT column schema: %s", exc) + return schema_changes_dict + + # Remove handled STRUCT type changes so downstream logic does not retry + if schema_changes_dict.get("new_target_types"): + schema_changes_dict["new_target_types"] = [ + change + for change in schema_changes_dict["new_target_types"] + if not change.get("column_name") + or change.get("column_name").split(".", 1)[0] not in handled_columns + ] + + if ( + not schema_changes_dict.get("source_not_in_target") + and not schema_changes_dict.get("target_not_in_source") + and not schema_changes_dict.get("new_target_types") + ): + schema_changes_dict["schema_changed"] = False + + return schema_changes_dict @available.parse_none def load_dataframe( diff --git a/dbt-bigquery/src/dbt/include/bigquery/macros/adapters.sql b/dbt-bigquery/src/dbt/include/bigquery/macros/adapters.sql index 6fd441228..1f43e858b 100644 --- a/dbt-bigquery/src/dbt/include/bigquery/macros/adapters.sql +++ b/dbt-bigquery/src/dbt/include/bigquery/macros/adapters.sql @@ -154,6 +154,11 @@ {% endmacro %} +{% macro bigquery__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %} + {% do adapter.alter_table_add_remove_columns(relation, add_columns, remove_columns) %} +{% endmacro %} + + {% macro bigquery__alter_column_type(relation, column_name, new_column_type) -%} {#-- Changing a column's data type using a query requires you to scan the entire table. The query charges can be significant if the table is very large. diff --git a/dbt-bigquery/src/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt-bigquery/src/dbt/include/bigquery/macros/materializations/incremental.sql index 25a83b0c6..3be46c0be 100644 --- a/dbt-bigquery/src/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt-bigquery/src/dbt/include/bigquery/macros/materializations/incremental.sql @@ -141,7 +141,7 @@ {%- endcall -%} {% set tmp_relation_exists = true %} {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} - {% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} + {% set dest_columns = adapter.dispatch('process_schema_changes', 'dbt')(on_schema_change, tmp_relation, existing_relation) %} {% endif %} {% if not dest_columns %} diff --git a/dbt-bigquery/src/dbt/include/bigquery/macros/materializations/models/incremental/on_schema_change.sql b/dbt-bigquery/src/dbt/include/bigquery/macros/materializations/models/incremental/on_schema_change.sql new file mode 100644 index 000000000..fdc73e17f --- /dev/null +++ b/dbt-bigquery/src/dbt/include/bigquery/macros/materializations/models/incremental/on_schema_change.sql @@ -0,0 +1,136 @@ +{# BigQuery-specific schema change handling that augments the core macro with +# STRUCT column synchronization logic. #} +{% macro bigquery__check_for_schema_changes(source_relation, target_relation) %} + + {% set schema_changed = False %} + + {%- set source_columns = adapter.get_columns_in_relation(source_relation) -%} + {%- set target_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set source_not_in_target = diff_columns(source_columns, target_columns) -%} + {%- set target_not_in_source = diff_columns(target_columns, source_columns) -%} + + {% set new_target_types = diff_column_data_types(source_columns, target_columns) %} + + {% if source_not_in_target != [] %} + {% set schema_changed = True %} + {% elif target_not_in_source != [] or new_target_types != [] %} + {% set schema_changed = True %} + {% elif new_target_types != [] %} + {% set schema_changed = True %} + {% endif %} + + {% set changes_dict = { + 'schema_changed': schema_changed, + 'source_not_in_target': source_not_in_target, + 'target_not_in_source': target_not_in_source, + 'source_columns': source_columns, + 'target_columns': target_columns, + 'new_target_types': new_target_types + } %} + + {% do changes_dict.update({'source_relation': source_relation}) %} + + {{ return(changes_dict) }} + +{% endmacro %} + + +{% macro bigquery__process_schema_changes(on_schema_change, source_relation, target_relation) %} + + {% if on_schema_change == 'ignore' %} + + {{ return({}) }} + + {% else %} + + {% set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) %} + {% do schema_changes_dict.update({'source_relation': source_relation}) %} + + {% if schema_changes_dict['schema_changed'] %} + + {% if on_schema_change == 'fail' %} + + {% set fail_msg %} + The source and target schemas on this incremental model are out of sync! + They can be reconciled in several ways: + - set the `on_schema_change` config to either append_new_columns or sync_all_columns, depending on your situation. + - Re-run the incremental model with `full_refresh: True` to update the target schema. + - update the schema manually and re-run the process. + + Additional troubleshooting context: + Source columns not in target: {{ schema_changes_dict['source_not_in_target'] }} + Target columns not in source: {{ schema_changes_dict['target_not_in_source'] }} + New column types: {{ schema_changes_dict['new_target_types'] }} + {% endset %} + + {% do exceptions.raise_compiler_error(fail_msg) %} + + {% else %} + + {% do bigquery__sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %} + + {% endif %} + + {% endif %} + + {{ return(schema_changes_dict['source_columns']) }} + + {% endif %} + +{% endmacro %} + +{% macro bigquery__sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %} + + {% set struct_sync_dict = schema_changes_dict %} + {% set source_relation = schema_changes_dict.get('source_relation') %} + + {% if source_relation is not none %} + {% set struct_sync_result = adapter.sync_struct_columns( + on_schema_change, + source_relation, + target_relation, + schema_changes_dict, + ) %} + {% if struct_sync_result is not none %} + {% set struct_sync_dict = struct_sync_result %} + {% endif %} + {% endif %} + + {%- set add_to_target_arr = struct_sync_dict['source_not_in_target'] -%} + + {%- if on_schema_change == 'append_new_columns' -%} + {%- if add_to_target_arr | length > 0 -%} + {%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, none) -%} + {%- endif -%} + + {% elif on_schema_change == 'sync_all_columns' %} + {%- set remove_from_target_arr = struct_sync_dict['target_not_in_source'] -%} + {%- set new_target_types = struct_sync_dict['new_target_types'] -%} + + {% if add_to_target_arr | length > 0 or remove_from_target_arr | length > 0 %} + {%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, remove_from_target_arr) -%} + {% endif %} + + {% if new_target_types != [] %} + {% for ntt in new_target_types %} + {% set column_name = ntt['column_name'] %} + {% set new_type = ntt['new_type'] %} + {% do alter_column_type(target_relation, column_name, new_type) %} + {% endfor %} + {% endif %} + + {% endif %} + + {% set schema_change_message %} + In {{ target_relation }}: + Schema change approach: {{ on_schema_change }} + Columns added: {{ add_to_target_arr }} + Columns removed: {{ struct_sync_dict['target_not_in_source'] }} + Data types changed: {{ struct_sync_dict['new_target_types'] }} + {% endset %} + + {% do log(schema_change_message) %} + + {% do struct_sync_dict.pop('source_relation', none) %} + +{% endmacro %} diff --git a/dbt-bigquery/tests/conftest.py b/dbt-bigquery/tests/conftest.py index 332572cd5..ee505d776 100644 --- a/dbt-bigquery/tests/conftest.py +++ b/dbt-bigquery/tests/conftest.py @@ -42,13 +42,15 @@ def service_account_target(): if _is_base64(credentials_json_str): credentials_json_str = _base64_to_string(credentials_json_str) credentials = json.loads(credentials_json_str) - project_id = credentials.get("project_id") + project_id = os.getenv("BIGQUERY_TEST_PROJECT") or credentials.get("project_id") + execution_project = os.getenv("BIGQUERY_TEST_EXECUTION_PROJECT") or project_id return { "type": "bigquery", "method": "service-account-json", "threads": 4, "job_retries": 2, "project": project_id, + "execution_project": execution_project, "keyfile_json": credentials, # following 3 for python model "compute_region": os.getenv("COMPUTE_REGION") or os.getenv("DATAPROC_REGION"), diff --git a/dbt-bigquery/tests/functional/adapter/incremental/test_incremental_on_schema_change.py b/dbt-bigquery/tests/functional/adapter/incremental/test_incremental_on_schema_change.py index 7239b313b..dbfa14212 100644 --- a/dbt-bigquery/tests/functional/adapter/incremental/test_incremental_on_schema_change.py +++ b/dbt-bigquery/tests/functional/adapter/incremental/test_incremental_on_schema_change.py @@ -147,6 +147,85 @@ class TestIncrementalOnSchemaChange(BaseIncrementalOnSchemaChange): order by id """ +_MODELS__STRUCT_BASE = """ +{{ + config(materialized='table') +}} + +with source_data as ( + select 1 as id, struct('foo' as nested_field) as payload union all + select 2 as id, struct('bar' as nested_field) as payload +) + +select * from source_data +""" + +_MODELS__INCREMENTAL_STRUCT_APPEND = """ +{{ + config( + materialized='incremental', + unique_key='id', + on_schema_change='append_new_columns' + ) +}} + +with source_data as ( + select 1 as id, struct('foo' as nested_field, cast(null as string) as extra_field) as payload union all + select 2 as id, struct('bar' as nested_field, 'baz' as extra_field) as payload union all + select 3 as id, struct('baz' as nested_field, 'qux' as extra_field) as payload +) + +{% if is_incremental() %} + select id, struct(payload.nested_field as nested_field, payload.extra_field as extra_field) as payload from source_data +{% else %} + select id, struct(payload.nested_field as nested_field) as payload from source_data where id <= 2 +{% endif %} +""" + +_MODELS__INCREMENTAL_STRUCT_APPEND_EXPECTED = """ +{{ + config(materialized='table') +}} + +select + id, + struct(payload.nested_field as nested_field, + payload.extra_field as extra_field) as payload +from {{ ref('incremental_struct_append') }} +""" + +_MODELS__INCREMENTAL_STRUCT_SYNC = """ +{{ + config( + materialized='incremental', + unique_key='id', + on_schema_change='sync_all_columns' + ) +}} + +with source_data as ( + select 1 as id, struct('foo' as nested_field, 'baz' as extra_field) as payload union all + select 2 as id, struct('bar' as nested_field, 'qux' as extra_field) as payload +) + +{% if is_incremental() %} + select id, struct(payload.nested_field as nested_field) as payload from source_data +{% else %} + select * from source_data +{% endif %} +""" + +_MODELS__INCREMENTAL_STRUCT_SYNC_EXPECTED = """ +{{ + config(materialized='table') +}} + +select + id, + struct(payload.nested_field as nested_field) as payload +from {{ ref('incremental_struct_sync') }} +""" + class TestIncrementalOnSchemaChangeBigQuerySpecific(BaseIncrementalOnSchemaChangeSetup): @@ -288,3 +367,290 @@ def test_incremental_append_new_columns_with_special_characters(self, project): "incremental_append_new_special_chars_target", ], ) + + +_MODELS__DEEPLY_NESTED_STRUCT_BASE = """ +{{ + config(materialized='table') +}} + +with source_data as ( + select 1 as id, + struct( + 'level1' as l1_field, + struct( + 'level2' as l2_field, + struct( + 'level3' as l3_field + ) as level3 + ) as level2 + ) as payload + union all + select 2 as id, + struct( + 'level1_b' as l1_field, + struct( + 'level2_b' as l2_field, + struct( + 'level3_b' as l3_field + ) as level3 + ) as level2 + ) as payload +) + +select * from source_data +""" + +_MODELS__INCREMENTAL_DEEPLY_NESTED_STRUCT_APPEND = """ +{{ + config( + materialized='incremental', + unique_key='id', + on_schema_change='append_new_columns' + ) +}} + +with source_data as ( + select 1 as id, + struct( + 'level1' as l1_field, + 'new_l1' as l1_new_field, + struct( + 'level2' as l2_field, + 'new_l2' as l2_new_field, + struct( + 'level3' as l3_field, + 'new_l3' as l3_new_field + ) as level3 + ) as level2 + ) as payload + union all + select 2 as id, + struct( + 'level1_b' as l1_field, + 'new_l1_b' as l1_new_field, + struct( + 'level2_b' as l2_field, + 'new_l2_b' as l2_new_field, + struct( + 'level3_b' as l3_field, + 'new_l3_b' as l3_new_field + ) as level3 + ) as level2 + ) as payload + union all + select 3 as id, + struct( + 'level1_c' as l1_field, + 'new_l1_c' as l1_new_field, + struct( + 'level2_c' as l2_field, + 'new_l2_c' as l2_new_field, + struct( + 'level3_c' as l3_field, + 'new_l3_c' as l3_new_field + ) as level3 + ) as level2 + ) as payload +) + +{% if is_incremental() %} + -- Explicitly construct STRUCT with fields in BigQuery's "append at end" order + select id, + struct( + payload.l1_field as l1_field, + struct( + payload.level2.l2_field as l2_field, + struct( + payload.level2.level3.l3_field as l3_field, + payload.level2.level3.l3_new_field as l3_new_field + ) as level3, + payload.level2.l2_new_field as l2_new_field + ) as level2, + payload.l1_new_field as l1_new_field + ) as payload + from source_data +{% else %} + select id, + struct( + payload.l1_field as l1_field, + struct( + payload.level2.l2_field as l2_field, + struct( + payload.level2.level3.l3_field as l3_field + ) as level3 + ) as level2 + ) as payload + from source_data where id <= 2 +{% endif %} +""" + +_MODELS__INCREMENTAL_DEEPLY_NESTED_STRUCT_APPEND_EXPECTED = """ +{{ + config(materialized='table') +}} + +with source_data as ( + select 1 as id, + struct( + 'level1' as l1_field, + struct( + 'level2' as l2_field, + struct( + 'level3' as l3_field, + cast(null as string) as l3_new_field + ) as level3, + cast(null as string) as l2_new_field + ) as level2, + cast(null as string) as l1_new_field + ) as payload + union all + select 2 as id, + struct( + 'level1_b' as l1_field, + struct( + 'level2_b' as l2_field, + struct( + 'level3_b' as l3_field, + cast(null as string) as l3_new_field + ) as level3, + cast(null as string) as l2_new_field + ) as level2, + cast(null as string) as l1_new_field + ) as payload + union all + select 3 as id, + struct( + 'level1_c' as l1_field, + struct( + 'level2_c' as l2_field, + struct( + 'level3_c' as l3_field, + 'new_l3_c' as l3_new_field + ) as level3, + 'new_l2_c' as l2_new_field + ) as level2, + 'new_l1_c' as l1_new_field + ) as payload +) + +select * from source_data +""" + + +class TestIncrementalStructOnSchemaChange(BaseIncrementalOnSchemaChangeSetup): + + @pytest.fixture(scope="class") + def models(self): + return { + "struct_base.sql": _MODELS__STRUCT_BASE, + "incremental_struct_append.sql": _MODELS__INCREMENTAL_STRUCT_APPEND, + "incremental_struct_append_expected.sql": _MODELS__INCREMENTAL_STRUCT_APPEND_EXPECTED, + "incremental_struct_sync.sql": _MODELS__INCREMENTAL_STRUCT_SYNC, + "incremental_struct_sync_expected.sql": _MODELS__INCREMENTAL_STRUCT_SYNC_EXPECTED, + } + + def test_incremental_append_struct_fields(self, project): + run_dbt( + [ + "run", + "--models", + "struct_base incremental_struct_append", + ] + ) + # Second run should update the schema and succeed + run_dbt( + [ + "run", + "--models", + "struct_base incremental_struct_append", + ] + ) + # If the model runs successfully, the schema update worked. + # The expected model verifies the data is correct + run_dbt( + [ + "run", + "--models", + "incremental_struct_append_expected", + ] + ) + + @pytest.mark.skip( + reason="BigQuery does not support removing fields from STRUCT columns via schema update" + ) + def test_incremental_sync_struct_fields(self, project): + # Note: This test demonstrates a BigQuery limitation. + # BigQuery allows ADDING fields to STRUCT columns but not REMOVING them. + # To remove fields, you would need to drop and recreate the column (losing data) + # or recreate the entire table. + run_dbt( + [ + "run", + "--models", + "struct_base incremental_struct_sync", + ] + ) + run_dbt( + [ + "run", + "--models", + "struct_base incremental_struct_sync", + ] + ) + from dbt.tests.util import check_relations_equal + + check_relations_equal( + project.adapter, + ["incremental_struct_sync", "incremental_struct_sync_expected"], + ) + + +class TestIncrementalDeeplyNestedStructOnSchemaChange(BaseIncrementalOnSchemaChangeSetup): + """Test that BigQuery supports schema updates for deeply nested STRUCT columns. + + BigQuery supports arbitrary levels of nesting (soft limit ~100 levels). + This test verifies that the recursive implementation in _merge_nested_fields + correctly handles adding fields at multiple nesting levels. + """ + + @pytest.fixture(scope="class") + def models(self): + return { + "deeply_nested_struct_base.sql": _MODELS__DEEPLY_NESTED_STRUCT_BASE, + "incremental_deeply_nested_struct_append.sql": _MODELS__INCREMENTAL_DEEPLY_NESTED_STRUCT_APPEND, + "incremental_deeply_nested_struct_append_expected.sql": _MODELS__INCREMENTAL_DEEPLY_NESTED_STRUCT_APPEND_EXPECTED, + } + + def test_incremental_append_deeply_nested_struct_fields(self, project): + """Test adding fields at multiple nesting levels simultaneously.""" + # First run - creates initial table with 3-level nested STRUCT + results = run_dbt( + [ + "run", + "--models", + "deeply_nested_struct_base incremental_deeply_nested_struct_append", + ] + ) + assert len(results) == 2 + + # Second run - should add new fields at all 3 nesting levels + results = run_dbt( + [ + "run", + "--models", + "deeply_nested_struct_base incremental_deeply_nested_struct_append", + ] + ) + assert len(results) == 2 + + # Verify row count - should have 3 rows (2 from first run, 1 new from second) + relation = project.adapter.Relation.create( + database=project.database, + schema=project.test_schema, + identifier="incremental_deeply_nested_struct_append", + ) + + result = project.run_sql(f"SELECT COUNT(*) as cnt FROM {relation}", fetch="one") + + assert result[0] == 3, f"Expected 3 rows, got {result[0]}"