diff --git a/src/dbt_osmosis/core/transforms.py b/src/dbt_osmosis/core/transforms.py index f0b09b20..ee4b6bed 100644 --- a/src/dbt_osmosis/core/transforms.py +++ b/src/dbt_osmosis/core/transforms.py @@ -330,33 +330,50 @@ def inject_missing_columns( from dbt_osmosis.core.introspection import normalize_column_name + incoming_columns = get_columns(context, node) + output_to_upper = _get_setting_for_node( + "output-to-upper", node, fallback=context.settings.output_to_upper + ) + output_to_lower = _get_setting_for_node( + "output-to-lower", node, fallback=context.settings.output_to_lower + ) + case_insensitive = output_to_upper or output_to_lower current_columns = { - normalize_column_name(c.name, context.project.runtime_cfg.credentials.type) + normalize_column_name(c.name, context.project.runtime_cfg.credentials.type).lower() + if case_insensitive + else normalize_column_name(c.name, context.project.runtime_cfg.credentials.type) for c in node.columns.values() } - incoming_columns = get_columns(context, node) + for incoming_name, incoming_meta in incoming_columns.items(): - if incoming_name not in current_columns: + compare_name = incoming_name.lower() if case_insensitive else incoming_name + if compare_name not in current_columns: logger.info( ":heavy_plus_sign: Reconciling missing column => %s in node => %s", incoming_name, node.unique_id, ) - gen_col = {"name": incoming_name, "description": incoming_meta.comment or ""} + final_name = incoming_name + if output_to_upper: + final_name = incoming_name.upper() + elif output_to_lower: + final_name = incoming_name.lower() + + gen_col = {"name": final_name, "description": incoming_meta.comment or ""} if (dtype := incoming_meta.type) and not _get_setting_for_node( "skip-add-data-types", node, fallback=context.settings.skip_add_data_types, ): - if context.settings.output_to_upper: + if output_to_upper: gen_col["data_type"] = dtype.upper() - elif context.settings.output_to_lower: + elif output_to_lower: gen_col["data_type"] = dtype.lower() else: gen_col["data_type"] = dtype - node.columns[incoming_name] = ColumnInfo.from_dict(gen_col) - if hasattr(node.columns[incoming_name], "config"): - delattr(node.columns[incoming_name], "config") + node.columns[final_name] = ColumnInfo.from_dict(gen_col) + if hasattr(node.columns[final_name], "config"): + delattr(node.columns[final_name], "config") @_transform_op("Remove Extra Columns") @@ -365,7 +382,7 @@ def remove_columns_not_in_database( node: ResultNode | None = None, ) -> None: """Remove columns from a dbt node and it's corresponding yaml section that are not present in the database. Changes are implicitly buffered until commit_yamls is called.""" - from dbt_osmosis.core.introspection import get_columns, normalize_column_name + from dbt_osmosis.core.introspection import _get_setting_for_node, get_columns, normalize_column_name from dbt_osmosis.core.node_filters import _iter_candidate_nodes if node is None: @@ -376,8 +393,19 @@ def remove_columns_not_in_database( ): ... return + output_to_upper = _get_setting_for_node( + "output-to-upper", node, fallback=context.settings.output_to_upper + ) + output_to_lower = _get_setting_for_node( + "output-to-lower", node, fallback=context.settings.output_to_lower + ) + case_insensitive = output_to_upper or output_to_lower current_columns = { - normalize_column_name(c.name, context.project.runtime_cfg.credentials.type): key + ( + normalize_column_name(c.name, context.project.runtime_cfg.credentials.type).lower() + if case_insensitive + else normalize_column_name(c.name, context.project.runtime_cfg.credentials.type) + ): key for key, c in node.columns.items() } incoming_columns = get_columns(context, node) @@ -387,7 +415,10 @@ def remove_columns_not_in_database( node.unique_id, ) return - extra_columns = set(current_columns.keys()) - set(incoming_columns.keys()) + incoming_keys = ( + {k.lower() for k in incoming_columns} if case_insensitive else set(incoming_columns.keys()) + ) + extra_columns = set(current_columns.keys()) - incoming_keys for extra_column in extra_columns: logger.info( ":heavy_minus_sign: Removing extra column => %s in node => %s", @@ -527,6 +558,7 @@ def synchronize_data_types( return logger.info(":1234: Synchronizing data types => %s", node.unique_id) incoming_columns = get_columns(context, node) + incoming_columns_lower = {k.lower(): v for k, v in incoming_columns.items()} if _get_setting_for_node("skip-add-data-types", node, fallback=False): return for name, column in node.columns.items(): @@ -549,9 +581,11 @@ def synchronize_data_types( name, fallback=context.settings.output_to_upper, ) - if inc_c := incoming_columns.get( - normalize_column_name(name, context.project.runtime_cfg.credentials.type), - ): + normalized = normalize_column_name(name, context.project.runtime_cfg.credentials.type) + inc_c = incoming_columns.get(normalized) + if inc_c is None and (lowercase or uppercase): + inc_c = incoming_columns_lower.get(normalized.lower()) + if inc_c: is_lower = column.data_type and column.data_type.islower() if inc_c.type: if uppercase: diff --git a/tests/core/test_transforms.py b/tests/core/test_transforms.py index 13f02252..85e27ff7 100644 --- a/tests/core/test_transforms.py +++ b/tests/core/test_transforms.py @@ -170,3 +170,306 @@ def test_sort_columns_alphabetically_without_case_conversion( assert column_names == ["Banana", "ZEBRA", "apple"], ( f"Columns should be sorted by original name (ASCII order), got {column_names}" ) + + +def test_inject_missing_columns_idempotent_with_output_to_upper_on_postgres(fresh_caches): + """Test that inject_missing_columns is idempotent on non-Snowflake DBs with output-to-upper. + + Scenario (PostgreSQL + output-to-upper): + 1st run: DB returns 'zebra' → injected as 'ZEBRA' + 2nd run: current_columns has 'ZEBRA', incoming has 'zebra' + → Should NOT re-add the column (idempotent) + """ + from collections import OrderedDict + + from dbt.contracts.graph.nodes import ColumnInfo + + mock_node = mock.MagicMock() + mock_node.unique_id = "model.test.test_model" + mock_node.resource_type = "model" + # Simulate state after first run: columns stored with uppercase keys + mock_node.columns = OrderedDict({ + "ZEBRA": ColumnInfo.from_dict({"name": "ZEBRA", "description": "existing"}), + }) + + context = mock.MagicMock() + context.settings.skip_add_columns = False + context.settings.skip_add_source_columns = False + context.settings.skip_add_data_types = True + context.settings.output_to_lower = False + context.settings.output_to_upper = True + context.project.runtime_cfg.credentials.type = "postgres" + + mock_col = mock.MagicMock() + mock_col.type = None + mock_col.comment = "" + + # DB returns lowercase (PostgreSQL behavior) + incoming = OrderedDict([("zebra", mock_col)]) + + with ( + mock.patch( + "dbt_osmosis.core.introspection.get_columns", + return_value=incoming, + ), + mock.patch( + "dbt_osmosis.core.introspection._get_setting_for_node", + side_effect=lambda *args, fallback=None, **kw: fallback, + ), + ): + inject_missing_columns(context, mock_node) + + # Should still have exactly one column with original description preserved + assert list(mock_node.columns.keys()) == ["ZEBRA"] + assert mock_node.columns["ZEBRA"].description == "existing" + + +def test_remove_columns_not_in_database_with_output_to_upper_on_postgres(fresh_caches): + """Test that remove_columns_not_in_database doesn't incorrectly remove columns + when output-to-upper is active on a non-Snowflake DB. + + Scenario (PostgreSQL + output-to-upper): + node.columns has 'ZEBRA' (uppercased), DB returns 'zebra' (lowercase). + normalize_column_name('ZEBRA', 'postgres') = 'ZEBRA' ≠ 'zebra' + → Without fix: 'ZEBRA' flagged as extra and removed incorrectly. + → With fix: case-insensitive comparison prevents incorrect removal. + """ + from collections import OrderedDict + + from dbt.contracts.graph.nodes import ColumnInfo + + mock_node = mock.MagicMock() + mock_node.unique_id = "model.test.test_model" + mock_node.columns = OrderedDict({ + "ZEBRA": ColumnInfo.from_dict({"name": "ZEBRA", "description": "a column"}), + "APPLE": ColumnInfo.from_dict({"name": "APPLE", "description": "another column"}), + }) + + context = mock.MagicMock() + context.settings.output_to_lower = False + context.settings.output_to_upper = True + context.project.runtime_cfg.credentials.type = "postgres" + + mock_col_z = mock.MagicMock() + mock_col_z.type = "VARCHAR" + mock_col_z.comment = "" + mock_col_a = mock.MagicMock() + mock_col_a.type = "INTEGER" + mock_col_a.comment = "" + + # DB returns lowercase (PostgreSQL behavior) + incoming = OrderedDict([("zebra", mock_col_z), ("apple", mock_col_a)]) + + with ( + mock.patch( + "dbt_osmosis.core.introspection.get_columns", + return_value=incoming, + ), + mock.patch( + "dbt_osmosis.core.introspection._get_setting_for_node", + side_effect=lambda *args, fallback=None, **kw: fallback, + ), + ): + remove_columns_not_in_database(context, mock_node) + + # Both columns should be preserved (not removed) + assert set(mock_node.columns.keys()) == {"ZEBRA", "APPLE"} + + +def test_remove_columns_not_in_database_removes_truly_extra_columns(fresh_caches): + """Test that truly extra columns are still removed even with case conversion.""" + from collections import OrderedDict + + from dbt.contracts.graph.nodes import ColumnInfo + + mock_node = mock.MagicMock() + mock_node.unique_id = "model.test.test_model" + mock_node.columns = OrderedDict({ + "ZEBRA": ColumnInfo.from_dict({"name": "ZEBRA", "description": ""}), + "STALE": ColumnInfo.from_dict({"name": "STALE", "description": "removed from DB"}), + }) + + context = mock.MagicMock() + context.settings.output_to_lower = False + context.settings.output_to_upper = True + context.project.runtime_cfg.credentials.type = "postgres" + + mock_col = mock.MagicMock() + mock_col.type = "VARCHAR" + mock_col.comment = "" + + # DB only has 'zebra', not 'stale' + incoming = OrderedDict([("zebra", mock_col)]) + + with ( + mock.patch( + "dbt_osmosis.core.introspection.get_columns", + return_value=incoming, + ), + mock.patch( + "dbt_osmosis.core.introspection._get_setting_for_node", + side_effect=lambda *args, fallback=None, **kw: fallback, + ), + ): + remove_columns_not_in_database(context, mock_node) + + # STALE should be removed, ZEBRA should remain + assert list(mock_node.columns.keys()) == ["ZEBRA"] + + +def test_synchronize_data_types_with_output_to_upper_on_postgres(fresh_caches): + """Test that synchronize_data_types matches columns correctly when output-to-upper + is active on a non-Snowflake DB. + + Scenario (PostgreSQL + output-to-upper): + node.columns has 'ZEBRA', DB returns column 'zebra' with type 'varchar'. + normalize_column_name('ZEBRA', 'postgres') = 'ZEBRA', but incoming key is 'zebra'. + → Without fix: lookup fails, data type not synced. + → With fix: case-insensitive fallback finds the match. + """ + from collections import OrderedDict + + from dbt.contracts.graph.nodes import ColumnInfo + + mock_node = mock.MagicMock() + mock_node.unique_id = "model.test.test_model" + col = ColumnInfo.from_dict({"name": "ZEBRA", "description": "", "data_type": ""}) + mock_node.columns = OrderedDict({"ZEBRA": col}) + + context = mock.MagicMock() + context.settings.skip_add_data_types = False + context.settings.output_to_lower = False + context.settings.output_to_upper = True + context.project.runtime_cfg.credentials.type = "postgres" + + mock_col = mock.MagicMock() + mock_col.type = "varchar" + mock_col.comment = "" + + incoming = OrderedDict([("zebra", mock_col)]) + + with ( + mock.patch( + "dbt_osmosis.core.introspection.get_columns", + return_value=incoming, + ), + mock.patch( + "dbt_osmosis.core.introspection._get_setting_for_node", + side_effect=lambda *args, fallback=None, **kw: fallback, + ), + ): + synchronize_data_types(context, mock_node) + + # Data type should be synced and uppercased (output-to-upper) + assert mock_node.columns["ZEBRA"].data_type == "VARCHAR" + + +def test_inject_missing_columns_applies_output_to_lower(fresh_caches): + """Test that inject_missing_columns converts new column keys to lowercase + when output-to-lower is enabled. + + This fixes the issue where Snowflake returns uppercase column names, + which are injected as uppercase keys. Subsequent alphabetical sorting + uses lowercase comparison, but the keys remain uppercase, causing + incorrect sort order on the first run. + """ + from collections import OrderedDict + + mock_node = mock.MagicMock() + mock_node.unique_id = "model.test.test_model" + mock_node.resource_type = "model" + mock_node.columns = OrderedDict() + + context = mock.MagicMock() + context.settings.skip_add_columns = False + context.settings.skip_add_source_columns = False + context.settings.skip_add_data_types = False + context.settings.output_to_lower = True + context.settings.output_to_upper = False + context.project.runtime_cfg.credentials.type = "snowflake" + + # Simulate database returning uppercase column names (Snowflake behavior) + mock_col_a = mock.MagicMock() + mock_col_a.type = "VARCHAR" + mock_col_a.comment = "" + mock_col_b = mock.MagicMock() + mock_col_b.type = "INTEGER" + mock_col_b.comment = "" + mock_col_c = mock.MagicMock() + mock_col_c.type = "BOOLEAN" + mock_col_c.comment = "" + + incoming = OrderedDict([("ZEBRA", mock_col_a), ("APPLE", mock_col_b), ("BANANA", mock_col_c)]) + + # Patch at dbt_osmosis.core.introspection (source module) because + # inject_missing_columns uses local imports (from ... import get_columns), + # which re-resolve the module attribute on each call. + with ( + mock.patch( + "dbt_osmosis.core.introspection.get_columns", + return_value=incoming, + ), + mock.patch( + "dbt_osmosis.core.introspection._get_setting_for_node", + side_effect=lambda *args, fallback=None, **kw: fallback, + ), + ): + inject_missing_columns(context, mock_node) + + # Keys should be lowercase + column_keys = list(mock_node.columns.keys()) + assert all(k == k.lower() for k in column_keys), ( + f"All column keys should be lowercase, got {column_keys}" + ) + assert set(column_keys) == {"zebra", "apple", "banana"} + + +def test_inject_missing_columns_with_lower_then_sort_alphabetically(fresh_caches): + """End-to-end test: inject with output-to-lower followed by alphabetical sort + should produce correctly ordered lowercase keys on the first run. + """ + from collections import OrderedDict + + mock_node = mock.MagicMock() + mock_node.unique_id = "model.test.test_model" + mock_node.resource_type = "model" + mock_node.columns = OrderedDict() + + context = mock.MagicMock() + context.settings.skip_add_columns = False + context.settings.skip_add_source_columns = False + context.settings.skip_add_data_types = True + context.settings.output_to_lower = True + context.settings.output_to_upper = False + context.project.runtime_cfg.credentials.type = "snowflake" + + mock_col = mock.MagicMock() + mock_col.type = None + mock_col.comment = "" + + incoming = OrderedDict([ + ("ZEBRA", mock_col), + ("APPLE", mock_col), + ("BANANA", mock_col), + ]) + + # Patch at source module; see comment in test_inject_missing_columns_applies_output_to_lower + with ( + mock.patch( + "dbt_osmosis.core.introspection.get_columns", + return_value=incoming, + ), + mock.patch( + "dbt_osmosis.core.introspection._get_setting_for_node", + side_effect=lambda *args, fallback=None, **kw: fallback, + ), + ): + inject_missing_columns(context, mock_node) + + # Now sort alphabetically + sort_columns_alphabetically(context, mock_node) + + column_keys = list(mock_node.columns.keys()) + assert column_keys == ["apple", "banana", "zebra"], ( + f"Columns should be alphabetically sorted lowercase on first run, got {column_keys}" + )