diff --git a/.changes/unreleased/Features-20250819-173500.yaml b/.changes/unreleased/Features-20250819-173500.yaml new file mode 100644 index 00000000000..896a0717e36 --- /dev/null +++ b/.changes/unreleased/Features-20250819-173500.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add unique suffix support for test failure tables to prevent conflicts in parallel execution +time: 2025-08-19T17:35:00.00000-00:00 +custom: + Author: andtrott + Issue: "11938" \ No newline at end of file diff --git a/core/dbt/artifacts/resources/v1/config.py b/core/dbt/artifacts/resources/v1/config.py index 903fbcb53cd..02fe669613a 100644 --- a/core/dbt/artifacts/resources/v1/config.py +++ b/core/dbt/artifacts/resources/v1/config.py @@ -175,6 +175,8 @@ class TestConfig(NodeAndTestConfig): severity: Annotated[Severity, Pattern(SEVERITY_PATTERN)] = Severity("ERROR") store_failures: Optional[bool] = None store_failures_as: Optional[str] = None + store_failures_unique: Optional[bool] = None + store_failures_suffix: Optional[str] = None where: Optional[str] = None limit: Optional[int] = None fail_calc: str = "count(*)" diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index 81ab849c8d1..79b25906dcc 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -3,6 +3,7 @@ import os import pickle from collections import defaultdict, deque +from datetime import datetime from typing import Any, Dict, Iterable, List, Optional, Set, Tuple import networkx as nx # type: ignore @@ -490,6 +491,34 @@ def add_ephemeral_prefix(self, name: str): relation_cls = adapter.Relation return relation_cls.add_ephemeral_prefix(name) + def _get_test_table_suffix(self, node: GenericTestNode) -> Optional[str]: + """Generate a unique suffix for test failure tables based on configuration.""" + if not node.config.store_failures_unique: + return None + + suffix_strategy = node.config.store_failures_suffix or 'invocation_id' + + if suffix_strategy == 'invocation_id': + # Use first 8 chars of invocation_id for reasonable table name length + invocation_id = get_invocation_id() + return invocation_id[:8] if invocation_id else None + + elif suffix_strategy == 'timestamp': + # Full timestamp: YYYYMMDD_HHMMSS + return datetime.now().strftime('%Y%m%d_%H%M%S') + + elif suffix_strategy == 'date': + # Date only: YYYYMMDD + return datetime.now().strftime('%Y%m%d') + + elif suffix_strategy == 'hour': + # Date and hour: YYYYMMDD_HH - useful for hourly DAGs + return datetime.now().strftime('%Y%m%d_%H') + + else: + # Treat as literal string - could be expanded to support templates + return suffix_strategy + def _recursively_prepend_ctes( self, model: ManifestSQLNode, @@ -623,6 +652,14 @@ def _compile_code( and node.relation_name is None and node.is_relational ): + # Apply unique suffix if configured for tests with store_failures + # This must be done BEFORE creating the relation_name + if isinstance(node, GenericTestNode) and node.config.store_failures: + suffix = self._get_test_table_suffix(node) + if suffix: + # Modify the alias which is used by materializations + node.alias = f"{node.alias}_{suffix}" + adapter = get_adapter(self.config) relation_cls = adapter.Relation relation_name = str(relation_cls.create_from(self.config, node)) diff --git a/tests/unit/test_compilation.py b/tests/unit/test_compilation.py index 0d5d4b2ea3c..7315bce53da 100644 --- a/tests/unit/test_compilation.py +++ b/tests/unit/test_compilation.py @@ -193,3 +193,100 @@ def test__find_cycles__no_cycles(self, linker: Linker) -> None: linker.dependency(l, r) assert linker.find_cycles() is None + + +class TestCompilerSuffixGeneration: + """Test the suffix generation for test failure tables.""" + + def test_suffix_generation_disabled(self): + """Test that no suffix is generated when store_failures_unique is False.""" + from dbt.compilation import Compiler + from dbt.contracts.graph.nodes import GenericTestNode + from dbt.artifacts.resources.v1.config import TestConfig + + mock_config = mock.MagicMock() + compiler = Compiler(mock_config) + + test_node = GenericTestNode( + unique_id="test.myproject.test_name", + name="test_name", + database="test_db", + schema="test_schema", + alias="test_name", + resource_type="test", + package_name="myproject", + path="test.sql", + original_file_path="test.sql", + config=TestConfig( + store_failures=True, + store_failures_unique=False + ), + fqn=["myproject", "test_name"], + checksum={"name": "sha256", "checksum": "abc123"} + ) + + suffix = compiler._get_test_table_suffix(test_node) + assert suffix is None + + def test_suffix_generation_invocation_id(self): + """Test invocation_id suffix strategy.""" + from dbt.compilation import Compiler + from dbt.contracts.graph.nodes import GenericTestNode + from dbt.artifacts.resources.v1.config import TestConfig + + mock_config = mock.MagicMock() + compiler = Compiler(mock_config) + + test_node = GenericTestNode( + unique_id="test.myproject.test_name", + name="test_name", + database="test_db", + schema="test_schema", + alias="test_name", + resource_type="test", + package_name="myproject", + path="test.sql", + original_file_path="test.sql", + config=TestConfig( + store_failures=True, + store_failures_unique=True, + store_failures_suffix='invocation_id' + ), + fqn=["myproject", "test_name"], + checksum={"name": "sha256", "checksum": "abc123"} + ) + + with mock.patch('dbt.compilation.get_invocation_id', return_value='abcd1234-5678-90ef'): + suffix = compiler._get_test_table_suffix(test_node) + assert suffix == 'abcd1234' + + def test_suffix_generation_custom(self): + """Test custom suffix strategy.""" + from dbt.compilation import Compiler + from dbt.contracts.graph.nodes import GenericTestNode + from dbt.artifacts.resources.v1.config import TestConfig + + mock_config = mock.MagicMock() + compiler = Compiler(mock_config) + + test_node = GenericTestNode( + unique_id="test.myproject.test_name", + name="test_name", + database="test_db", + schema="test_schema", + alias="test_name", + resource_type="test", + package_name="myproject", + path="test.sql", + original_file_path="test.sql", + config=TestConfig( + store_failures=True, + store_failures_unique=True, + store_failures_suffix='my_custom_suffix' + ), + fqn=["myproject", "test_name"], + checksum={"name": "sha256", "checksum": "abc123"} + ) + + suffix = compiler._get_test_table_suffix(test_node) + assert suffix == 'my_custom_suffix'