- 
                Notifications
    You must be signed in to change notification settings 
- Fork 207
Support BigQuery STRUCT schema change #1385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
f08cf69
              6bb74ba
              68fec0b
              8d86b5c
              676cdcb
              956b7e2
              10981f6
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| kind: Under the Hood | ||
| body: Breakdown schema synchronization macros to enable more detailed operations like nested structures schema modifications | ||
| time: 2025-10-23T11:50:28.048588+02:00 | ||
| custom: | ||
| Author: Kayrnt | ||
| Issue: "599" | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| kind: Features | ||
| body: Support BigQuery STRUCT schema change | ||
| time: 2025-10-12T13:44:19.542878+02:00 | ||
| custom: | ||
| Author: Kayrnt | ||
| Issue: "599" | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| import copy | ||
| from dataclasses import dataclass | ||
| from datetime import datetime | ||
| from multiprocessing.context import SpawnContext | ||
|  | @@ -8,11 +9,13 @@ | |
| FrozenSet, | ||
| Iterable, | ||
| List, | ||
| Mapping, | ||
| Optional, | ||
| Sequence, | ||
| Set, | ||
| Tuple, | ||
| TYPE_CHECKING, | ||
| Type, | ||
| Set, | ||
| Union, | ||
| ) | ||
|  | ||
|  | @@ -651,19 +654,275 @@ def update_table_description( | |
|  | ||
| @available.parse_none | ||
| def alter_table_add_columns(self, relation, columns): | ||
| logger.debug('Adding columns ({}) to table {}".'.format(columns, relation)) | ||
| logger.debug('Adding columns ({}) to table "{}".'.format(columns, relation)) | ||
| self.alter_table_add_remove_columns(relation, columns, None) | ||
|  | ||
| @available.parse_none | ||
| def alter_table_add_remove_columns(self, relation, add_columns, remove_columns): | ||
| conn = self.connections.get_thread_connection() | ||
| client = conn.handle | ||
|  | ||
| table_ref = self.get_table_ref_from_relation(relation) | ||
| table = client.get_table(table_ref) | ||
|  | ||
| new_columns = [col.column_to_bq_schema() for col in columns] | ||
| new_schema = table.schema + new_columns | ||
| schema_as_dicts = [field.to_api_repr() for field in table.schema] | ||
|  | ||
| # BigQuery only supports dropping top-level columns via ALTER TABLE. | ||
| # Track names so nested removals can still be logged for visibility. | ||
| drop_candidates: List[BigQueryColumn] = [] | ||
| nested_removals: List[str] = [] | ||
|  | ||
| if remove_columns: | ||
| for column in remove_columns: | ||
| if "." in column.name: | ||
| nested_removals.append(column.name) | ||
| else: | ||
| drop_candidates.append(column) | ||
|  | ||
| if nested_removals: | ||
| logger.warning( | ||
| "BigQuery limitation: Cannot remove nested fields via schema update. " | ||
| "Attempted to remove: {}. Consider using 'append_new_columns' mode " | ||
| "or recreating the table with full_refresh.".format(nested_removals) | ||
| ) | ||
|  | ||
| new_table = google.cloud.bigquery.Table(table_ref, schema=new_schema) | ||
| client.update_table(new_table, ["schema"]) | ||
| if drop_candidates: | ||
| relation_name = relation.render() | ||
| for column in drop_candidates: | ||
| drop_sql = f"ALTER TABLE {relation_name} DROP COLUMN {self.quote(column.name)}" | ||
|          | ||
| logger.debug( | ||
| 'Dropping column `{}` from table "{}".'.format(column.name, relation_name) | ||
| ) | ||
| client.query(drop_sql).result() | ||
|          | ||
|  | ||
| # Refresh schema after drops so additions operate on the latest definition | ||
| table = client.get_table(table_ref) | ||
| schema_as_dicts = [field.to_api_repr() for field in table.schema] | ||
|  | ||
| apply_schema_patch = False | ||
|  | ||
| if add_columns: | ||
| additions = self._build_nested_additions(add_columns) | ||
| schema_as_dicts = self._merge_nested_fields(schema_as_dicts, additions) | ||
| apply_schema_patch = True | ||
|  | ||
| if apply_schema_patch: | ||
| new_schema = [SchemaField.from_api_repr(field) for field in schema_as_dicts] | ||
| new_table = google.cloud.bigquery.Table(table_ref, schema=new_schema) | ||
| client.update_table(new_table, ["schema"]) | ||
|  | ||
| def _build_nested_additions( | ||
| self, add_columns: Sequence[BigQueryColumn] | ||
| ) -> Dict[str, Dict[str, Any]]: | ||
| additions: Dict[str, Dict[str, Any]] = {} | ||
|  | ||
| for column in add_columns: | ||
| schema_field = column.column_to_bq_schema().to_api_repr() | ||
| additions[column.name] = schema_field | ||
|  | ||
| return additions | ||
|  | ||
| def _merge_nested_fields( | ||
| self, | ||
| existing_fields: Sequence[Dict[str, Any]], | ||
| additions: Mapping[str, Dict[str, Any]], | ||
| prefix: str = "", | ||
| ) -> List[Dict[str, Any]]: | ||
| """Merge new fields into existing STRUCT fields, appending at each nesting level. | ||
|  | ||
| Note: Primarily used for field removal. For adding fields, sync_struct_columns | ||
| uses the source schema directly to preserve field order. | ||
| """ | ||
| merged_fields: List[Dict[str, Any]] = [] | ||
| addition_lookup = dict(additions) | ||
|  | ||
| for field in existing_fields: | ||
| field_name = field["name"] | ||
| qualified_name = f"{prefix}.{field_name}" if prefix else field_name | ||
|  | ||
| direct_addition = addition_lookup.pop(qualified_name, None) | ||
| if direct_addition is not None: | ||
| merged_fields.append(copy.deepcopy(direct_addition)) | ||
| continue | ||
|  | ||
| nested_additions = { | ||
| key: value | ||
| for key, value in list(addition_lookup.items()) | ||
| if key.startswith(f"{qualified_name}.") | ||
| } | ||
|  | ||
| if nested_additions and field.get("type") == "RECORD": | ||
| for key in nested_additions: | ||
| addition_lookup.pop(key, None) | ||
|  | ||
| stripped_additions = { | ||
| key.split(".", 1)[1]: value for key, value in nested_additions.items() | ||
| } | ||
|  | ||
| merged_children = self._merge_nested_fields( | ||
| field.get("fields", []) or [], | ||
| stripped_additions, | ||
| prefix="", | ||
| ) | ||
|  | ||
| merged_field = copy.deepcopy(field) | ||
| merged_field["fields"] = merged_children | ||
| merged_fields.append(merged_field) | ||
| else: | ||
| merged_fields.append(copy.deepcopy(field)) | ||
|  | ||
| for path, addition in addition_lookup.items(): | ||
| if "." not in path: | ||
| merged_fields.append(copy.deepcopy(addition)) | ||
|  | ||
| return merged_fields | ||
|  | ||
| def _collect_field_dicts( | ||
| self, fields: Sequence[Dict[str, Any]], prefix: str = "" | ||
| ) -> Dict[str, Dict[str, Any]]: | ||
| collected: Dict[str, Dict[str, Any]] = {} | ||
| for field in fields: | ||
| name = field["name"] | ||
| path = f"{prefix}.{name}" if prefix else name | ||
| collected[path] = field | ||
| if field.get("type") == "RECORD": | ||
| collected.update(self._collect_field_dicts(field.get("fields", []) or [], path)) | ||
| return collected | ||
|  | ||
| def _find_missing_fields( | ||
| self, | ||
| source_fields: Sequence[Dict[str, Any]], | ||
| target_fields: Sequence[Dict[str, Any]], | ||
| ) -> Dict[str, Dict[str, Any]]: | ||
| source_map = self._collect_field_dicts(source_fields) | ||
| target_map = self._collect_field_dicts(target_fields) | ||
| return { | ||
| path: copy.deepcopy(field) | ||
| for path, field in source_map.items() | ||
| if path not in target_map | ||
| } | ||
|  | ||
| @available.parse(lambda *a, **k: {}) | ||
| def sync_struct_columns( | ||
| self, | ||
| on_schema_change: str, | ||
| source_relation: BigQueryRelation, | ||
| target_relation: BigQueryRelation, | ||
| schema_changes_dict: Dict[str, Any], | ||
| ) -> Dict[str, Any]: | ||
| if on_schema_change not in ("append_new_columns", "sync_all_columns"): | ||
| return schema_changes_dict | ||
|  | ||
| logger.debug( | ||
| "BigQuery STRUCT sync invoked: mode=%s target=%s", | ||
| on_schema_change, | ||
| target_relation.render(), | ||
| ) | ||
|  | ||
| conn = self.connections.get_thread_connection() | ||
| client = conn.handle | ||
|  | ||
| source_table = client.get_table(self.get_table_ref_from_relation(source_relation)) | ||
| target_table = client.get_table(self.get_table_ref_from_relation(target_relation)) | ||
|  | ||
| source_schema = [field.to_api_repr() for field in source_table.schema] | ||
| target_schema = [field.to_api_repr() for field in target_table.schema] | ||
| # Identify nested fields that exist in the source schema but not the target. | ||
| missing_fields = self._find_missing_fields(source_schema, target_schema) | ||
| nested_additions = { | ||
| path: field_def for path, field_def in missing_fields.items() if "." in path | ||
| } | ||
|  | ||
| # Also include struct columns flagged by diff_column_data_types so we cover | ||
| # cases where only the STRUCT signature changed. | ||
| struct_type_changes: Set[str] = set() | ||
| for change in schema_changes_dict.get("new_target_types", []): | ||
| column_name = change.get("column_name") | ||
| if not column_name: | ||
| continue | ||
| new_type = change.get("new_type", "") | ||
| if "STRUCT<" in new_type.upper() or "RECORD" in new_type.upper(): | ||
| struct_type_changes.add(column_name.split(".", 1)[0]) | ||
|  | ||
| struct_columns_to_update: Set[str] = { | ||
| path.split(".", 1)[0] for path in nested_additions.keys() | ||
| } | ||
| struct_columns_to_update.update(struct_type_changes) | ||
|  | ||
| logger.debug( | ||
| "BigQuery STRUCT sync details: target=%s nested_additions=%s struct_columns=%s", | ||
| target_relation.render(), | ||
| sorted(nested_additions.keys()), | ||
| sorted(struct_columns_to_update), | ||
| ) | ||
|  | ||
| if not struct_columns_to_update: | ||
| return schema_changes_dict | ||
|  | ||
| updated_schema: List[Dict[str, Any]] = [] | ||
| handled_columns: Set[str] = set() | ||
|  | ||
| for field in target_schema: | ||
| field_name = field["name"] | ||
| if field_name in struct_columns_to_update: | ||
| source_field = next((f for f in source_schema if f["name"] == field_name), None) | ||
| if source_field: | ||
| updated_schema.append(copy.deepcopy(source_field)) | ||
| handled_columns.add(field_name) | ||
| else: | ||
| logger.debug( | ||
| "BigQuery STRUCT sync: unable to locate source definition for %s on %s", | ||
| field_name, | ||
| target_relation.render(), | ||
| ) | ||
| updated_schema.append(copy.deepcopy(field)) | ||
| else: | ||
| updated_schema.append(copy.deepcopy(field)) | ||
|  | ||
| if not handled_columns: | ||
| return schema_changes_dict | ||
|  | ||
| try: | ||
| new_schema = [SchemaField.from_api_repr(field) for field in updated_schema] | ||
| target_table.schema = new_schema | ||
| client.update_table(target_table, ["schema"]) | ||
| logger.debug( | ||
| "BigQuery STRUCT sync applied for %s columns=%s", | ||
| target_relation.render(), | ||
| sorted(handled_columns), | ||
| ) | ||
|  | ||
| if schema_changes_dict.get("source_not_in_target"): | ||
| schema_changes_dict["source_not_in_target"] = [ | ||
| column | ||
| for column in schema_changes_dict["source_not_in_target"] | ||
| if column.name.split(".", 1)[0] not in handled_columns | ||
| ] | ||
|  | ||
| schema_changes_dict["target_columns"] = [ | ||
| BigQueryColumn.create_from_field(field) for field in new_schema | ||
| ] | ||
| except google.api_core.exceptions.BadRequest as exc: | ||
| logger.warning("Failed to update STRUCT column schema: %s", exc) | ||
| return schema_changes_dict | ||
|  | ||
| # Remove handled STRUCT type changes so downstream logic does not retry | ||
| if schema_changes_dict.get("new_target_types"): | ||
| schema_changes_dict["new_target_types"] = [ | ||
| change | ||
| for change in schema_changes_dict["new_target_types"] | ||
| if not change.get("column_name") | ||
| or change.get("column_name").split(".", 1)[0] not in handled_columns | ||
| ] | ||
|  | ||
| if ( | ||
| not schema_changes_dict.get("source_not_in_target") | ||
| and not schema_changes_dict.get("target_not_in_source") | ||
| and not schema_changes_dict.get("new_target_types") | ||
| ): | ||
| schema_changes_dict["schema_changed"] = False | ||
|  | ||
| return schema_changes_dict | ||
|  | ||
| @available.parse_none | ||
| def load_dataframe( | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI though not directly visible here: the 2 variables are called in
that is after the if block.
So it was not consistent because they would not be initialized.