diff --git a/RELEASE.md b/RELEASE.md index 84dd79c..7d296f0 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,6 +1,8 @@ # Release instructions -This application is hosted on [Heroku](https://www.heroku.com), but it can also be executed in production mode locally. +This application runs on [GitHub Actions](https://github.com/features/actions) and can also be executed in production mode locally. + +**Note**: This application was previously hosted on Heroku. See [GitHub Actions Setup Guide](GITHUB_ACTIONS_SETUP.md) for current deployment instructions. ## Design overview It is designed to do the following: diff --git a/hubcap/records.py b/hubcap/records.py index 9937ec4..3ad4718 100644 --- a/hubcap/records.py +++ b/hubcap/records.py @@ -16,6 +16,124 @@ from hubcap import version +def check_fusion_schema_compatibility(repo_path: Path) -> bool: + """ + Check if a dbt package is fusion schema compatible by running 'dbtf parse'. + + Args: + repo_path: Path to the dbt package repository + + Returns: + True if fusion compatible (dbtf parse exits with code 0), False otherwise + """ + # Add a test profiles.yml to the current directory + profiles_path = repo_path / Path("profiles.yml") + try: + with open(profiles_path, "a") as f: + f.write( + "\n" + "test_schema_compat:\n" + " target: dev\n" + " outputs:\n" + " dev:\n" + " type: postgres\n" + " host: localhost\n" + " port: 5432\n" + " user: postgres\n" + " password: postgres\n" + " dbname: postgres\n" + " schema: public\n" + ) + + # Ensure the `_DBT_FUSION_STRICT_MODE` is set (this will ensure fusion errors on schema violations) + os.environ["_DBT_FUSION_STRICT_MODE"] = "1" + + # Run dbtf parse command (try dbtf first, fall back to dbt) + try: + # Try dbtf first (without shell=True to get proper FileNotFoundError) + result = subprocess.run( + [ + "dbtf", + "parse", + "--profile", + "test_schema_compat", + "--project-dir", + str(repo_path), + ], + capture_output=True, + timeout=60, + ) + # If dbtf command exists but returns error mentioning it's not found, fall back to dbt + if ( + result.returncode != 0 + and result.stderr + and b"not found" in result.stderr + ): + raise FileNotFoundError("dbtf command not found") + except FileNotFoundError: + # Fall back to dbt command, but validate that this is dbt-fusion + version_result = subprocess.run( + ["dbt", "--version"], capture_output=True, timeout=60 + ) + if b"dbt-fusion" not in version_result.stdout: + raise FileNotFoundError( + "dbt-fusion command not found - regular dbt-core detected instead" + ) + + # Run dbt parse since we have dbt-fusion + result = subprocess.run( + [ + "dbt", + "parse", + "--profile", + "test_schema_compat", + "--project-dir", + str(repo_path), + ], + capture_output=True, + timeout=60, + ) + + # Return True if exit code is 0 (success) + is_compatible = result.returncode == 0 + + if is_compatible: + logging.info(f"Package at {repo_path} is fusion schema compatible") + else: + logging.info(f"Package at {repo_path} is not fusion schema compatible") + + # Remove the test profile + os.remove(profiles_path) + + return is_compatible + + except subprocess.TimeoutExpired: + logging.warning(f"dbtf parse timed out for package at {repo_path}") + try: + os.remove(profiles_path) + except Exception: + pass + return False + except FileNotFoundError: + logging.warning( + f"dbtf command not found - skipping fusion compatibility check for {repo_path}" + ) + try: + os.remove(profiles_path) + except Exception: + pass + return False + except Exception as e: + logging.warning( + f"Error checking fusion compatibility for {repo_path}: {str(e)}" + ) + try: + os.remove(profiles_path) + except Exception: + pass + return False + + class PullRequestStrategy(ABC): @abstractmethod def pull_request_title(self, org: str, repo: str) -> str: @@ -89,6 +207,8 @@ def __init__( self.package_name = package_name self.existing_tags = existing_tags self.new_tags = new_tags + # Track fusion compatibility for each tag + self.fusion_compatibility = {} def run(self, main_dir, pr_strategy): os.chdir(main_dir) @@ -101,16 +221,6 @@ def run(self, main_dir, pr_strategy): index_filepath = ( Path(os.path.dirname(self.hub_version_index_path)) / "index.json" ) - new_index_entry = self.make_index( - self.github_username, - self.github_repo_name, - self.package_name, - self.fetch_index_file_contents(index_filepath), - set(self.new_tags) | set(self.existing_tags), - ) - with open(index_filepath, "w") as f: - logging.info(f"writing index.json to {index_filepath}") - f.write(str(json.dumps(new_index_entry, indent=4))) # create a version spec for each tag for tag in self.new_tags: @@ -119,9 +229,21 @@ def run(self, main_dir, pr_strategy): git_helper.run_cmd(f"git checkout tags/{tag}") packages = package.parse_pkgs(Path(os.getcwd())) require_dbt_version = package.parse_require_dbt_version(Path(os.getcwd())) + os.chdir(main_dir) - # return to hub and build spec + # check fusion compatibility + is_fusion_compatible = check_fusion_schema_compatibility( + self.local_path_to_repo + ) + self.fusion_compatibility[tag] = is_fusion_compatible + + # Reset and clean the repo to ensure clean state after fusion check + os.chdir(self.local_path_to_repo) + git_helper.run_cmd("git reset --hard HEAD") + git_helper.run_cmd("git clean -fd") os.chdir(main_dir) + + # return to hub and build spec package_spec = self.make_spec( self.github_username, self.github_repo_name, @@ -129,6 +251,7 @@ def run(self, main_dir, pr_strategy): packages, require_dbt_version, tag, + is_fusion_compatible, ) version_path = self.hub_version_index_path / Path(f"{tag}.json") @@ -141,7 +264,25 @@ def run(self, main_dir, pr_strategy): git_helper.run_cmd("git add -A") subprocess.run(args=["git", "commit", "-am", f"{msg}"], capture_output=True) - # if succesful return branchname + new_index_entry = self.make_index( + self.github_username, + self.github_repo_name, + self.package_name, + self.fetch_index_file_contents(index_filepath), + set(self.new_tags) | set(self.existing_tags), + self.fusion_compatibility, + ) + with open(index_filepath, "w") as f: + logging.info(f"writing index.json to {index_filepath}") + f.write(str(json.dumps(new_index_entry, indent=4))) + + # Commit the updated index.json file + msg = f"hubcap: Update index.json for {self.github_username}/{self.github_repo_name}" + logging.info(msg) + git_helper.run_cmd("git add -A") + subprocess.run(args=["git", "commit", "-am", f"{msg}"], capture_output=True) + + # if successful return branchname return branch_name, self.github_username, self.github_repo_name def cut_version_branch(self, pr_strategy): @@ -159,7 +300,9 @@ def cut_version_branch(self, pr_strategy): return branch_name - def make_index(self, org_name, repo, package_name, existing, tags): + def make_index( + self, org_name, repo, package_name, existing, tags, fusion_compatibility + ): description = "dbt models for {}".format(repo) assets = {"logo": "logos/placeholder.svg"} @@ -176,6 +319,9 @@ def make_index(self, org_name, repo, package_name, existing, tags): "namespace": org_name, "description": description, "latest": latest_version.replace("=", ""), # LOL + "latest-fusion-schema-compat": fusion_compatibility.get( + latest_version, False + ), "assets": assets, } @@ -210,7 +356,14 @@ def get_sha1(self, url): return digest def make_spec( - self, org, repo, package_name, packages, require_dbt_version, version + self, + org, + repo, + package_name, + packages, + require_dbt_version, + version, + fusion_schema_compat=False, ): """The hub needs these specs for packages to be discoverable by deps and on the web""" tarball_url = "https://codeload.github.com/{}/{}/tar.gz/{}".format( @@ -235,4 +388,5 @@ def make_spec( ), }, "downloads": {"tarball": tarball_url, "format": "tgz", "sha1": sha1}, + "fusion-schema-compat": fusion_schema_compat, } diff --git a/tests/test_fusion_compatibility.py b/tests/test_fusion_compatibility.py new file mode 100644 index 0000000..dec2e0c --- /dev/null +++ b/tests/test_fusion_compatibility.py @@ -0,0 +1,308 @@ +"""Unit tests for the check_fusion_schema_compatibility function""" + +import os +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch, MagicMock +import subprocess + +# Import the actual function from the hubcap.records module +from hubcap.records import check_fusion_schema_compatibility + + +class TestCheckFusionSchemaCompatibility(unittest.TestCase): + """Test cases for check_fusion_schema_compatibility function""" + + def setUp(self): + """Set up test fixtures""" + # Create a temporary directory for each test + self.temp_dir = tempfile.mkdtemp() + self.repo_path = Path(self.temp_dir) + + # Create a minimal dbt_project.yml file + dbt_project_content = """ +name: 'test_package' +version: '1.0.0' +profile: 'test_schema_compat' + +model-paths: ["models"] +analysis-paths: ["analyses"] +test-paths: ["tests"] +seed-paths: ["seeds"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_packages" +""" + with open(self.repo_path / "dbt_project.yml", "w") as f: + f.write(dbt_project_content) + + # Store original environment state + self.original_env = os.environ.get("_DBT_FUSION_STRICT_MODE") + + def tearDown(self): + """Clean up test fixtures""" + # Restore original environment + if self.original_env is not None: + os.environ["_DBT_FUSION_STRICT_MODE"] = self.original_env + elif "_DBT_FUSION_STRICT_MODE" in os.environ: + del os.environ["_DBT_FUSION_STRICT_MODE"] + + # Clean up temp directory + import shutil + + shutil.rmtree(self.temp_dir, ignore_errors=True) + + @patch("hubcap.records.subprocess.run") + def test_fusion_compatible_success(self, mock_subprocess): + """Test successful fusion compatibility check""" + # Mock successful dbtf parse command + mock_result = MagicMock() + mock_result.returncode = 0 + mock_subprocess.return_value = mock_result + + # Run the function + result = check_fusion_schema_compatibility(self.repo_path) + + # Assertions + self.assertTrue(result) + + # Verify subprocess was called with correct arguments + mock_subprocess.assert_called_once_with( + [ + "dbtf", + "parse", + "--profile", + "test_schema_compat", + "--project-dir", + str(self.repo_path), + ], + capture_output=True, + timeout=60, + ) + + # Verify environment variable was set + self.assertEqual(os.environ.get("_DBT_FUSION_STRICT_MODE"), "1") + + # Verify profiles.yml was cleaned up + profiles_path = self.repo_path / "profiles.yml" + self.assertFalse(profiles_path.exists()) + + @patch("hubcap.records.subprocess.run") + def test_fusion_incompatible_failure(self, mock_subprocess): + """Test failed fusion compatibility check""" + # Mock failed dbtf parse command + mock_result = MagicMock() + mock_result.returncode = 1 + mock_subprocess.return_value = mock_result + + # Run the function + result = check_fusion_schema_compatibility(self.repo_path) + + # Assertions + self.assertFalse(result) + + # Verify subprocess was called + mock_subprocess.assert_called_once() + + # Verify profiles.yml was cleaned up + profiles_path = self.repo_path / "profiles.yml" + self.assertFalse(profiles_path.exists()) + + @patch("hubcap.records.subprocess.run") + def test_profiles_yml_creation_and_content(self, mock_subprocess): + """Test that profiles.yml is created with correct content""" + # Mock successful command + mock_result = MagicMock() + mock_result.returncode = 0 + mock_subprocess.return_value = mock_result + + # Capture the profiles.yml content during execution + profiles_content = None + + def capture_profiles_content(*args, **kwargs): + nonlocal profiles_content + profiles_path = self.repo_path / "profiles.yml" + if profiles_path.exists(): + with open(profiles_path, "r") as f: + profiles_content = f.read() + return mock_result + + mock_subprocess.side_effect = capture_profiles_content + + # Run the function + check_fusion_schema_compatibility(self.repo_path) + + # Verify profiles.yml content + self.assertIsNotNone(profiles_content) + self.assertIn("test_schema_compat:", profiles_content) + self.assertIn("type: postgres", profiles_content) + self.assertIn("host: localhost", profiles_content) + self.assertIn("port: 5432", profiles_content) + self.assertIn("user: postgres", profiles_content) + self.assertIn("password: postgres", profiles_content) + self.assertIn("dbname: postgres", profiles_content) + self.assertIn("schema: public", profiles_content) + + @patch("hubcap.records.subprocess.run") + def test_timeout_handling(self, mock_subprocess): + """Test timeout scenario""" + # Mock timeout exception + mock_subprocess.side_effect = subprocess.TimeoutExpired( + cmd=["dbtf", "parse"], timeout=60 + ) + + # Run the function + result = check_fusion_schema_compatibility(self.repo_path) + + # Assertions + self.assertFalse(result) + + # Verify profiles.yml was cleaned up even after timeout + profiles_path = self.repo_path / "profiles.yml" + self.assertFalse(profiles_path.exists()) + + @patch("hubcap.records.subprocess.run") + def test_file_not_found_handling(self, mock_subprocess): + """Test FileNotFoundError scenario (dbtf command not available)""" + # Mock FileNotFoundError + mock_subprocess.side_effect = FileNotFoundError("dbtf command not found") + + # Run the function + result = check_fusion_schema_compatibility(self.repo_path) + + # Assertions + self.assertFalse(result) + + # Verify profiles.yml was cleaned up + profiles_path = self.repo_path / "profiles.yml" + self.assertFalse(profiles_path.exists()) + + @patch("hubcap.records.subprocess.run") + def test_general_exception_handling(self, mock_subprocess): + """Test general exception handling""" + # Mock a general exception + mock_subprocess.side_effect = Exception("Unexpected error") + + # Run the function + result = check_fusion_schema_compatibility(self.repo_path) + + # Assertions + self.assertFalse(result) + + # Verify profiles.yml was cleaned up + profiles_path = self.repo_path / "profiles.yml" + self.assertFalse(profiles_path.exists()) + + @patch("hubcap.records.subprocess.run") + def test_existing_profiles_yml_handling(self, mock_subprocess): + """Test behavior when profiles.yml already exists""" + # Create an existing profiles.yml file + existing_content = "existing_profile:\n target: dev\n" + profiles_path = self.repo_path / "profiles.yml" + with open(profiles_path, "w") as f: + f.write(existing_content) + + # Mock successful command + mock_result = MagicMock() + mock_result.returncode = 0 + mock_subprocess.return_value = mock_result + + # Run the function + result = check_fusion_schema_compatibility(self.repo_path) + + # Assertions + self.assertTrue(result) + + # Verify the original file is gone (it gets removed) + self.assertFalse(profiles_path.exists()) + + @patch("hubcap.records.logging.info") + @patch("hubcap.records.subprocess.run") + def test_logging_success(self, mock_subprocess, mock_logging): + """Test that success is logged correctly""" + # Mock successful command + mock_result = MagicMock() + mock_result.returncode = 0 + mock_subprocess.return_value = mock_result + + # Run the function + check_fusion_schema_compatibility(self.repo_path) + + # Verify logging was called with success message + mock_logging.assert_called_with( + f"Package at {self.repo_path} is fusion schema compatible" + ) + + @patch("hubcap.records.logging.info") + @patch("hubcap.records.subprocess.run") + def test_logging_failure(self, mock_subprocess, mock_logging): + """Test that failure is logged correctly""" + # Mock failed command + mock_result = MagicMock() + mock_result.returncode = 1 + mock_subprocess.return_value = mock_result + + # Run the function + check_fusion_schema_compatibility(self.repo_path) + + # Verify logging was called with failure message + mock_logging.assert_called_with( + f"Package at {self.repo_path} is not fusion schema compatible" + ) + + @patch("hubcap.records.logging.warning") + @patch("hubcap.records.subprocess.run") + def test_logging_timeout(self, mock_subprocess, mock_logging): + """Test that timeout is logged correctly""" + # Mock timeout + mock_subprocess.side_effect = subprocess.TimeoutExpired( + cmd=["dbtf", "parse"], timeout=60 + ) + + # Run the function + check_fusion_schema_compatibility(self.repo_path) + + # Verify logging was called with timeout message + mock_logging.assert_called_with( + f"dbtf parse timed out for package at {self.repo_path}" + ) + + def test_environment_variable_set(self): + """Test that _DBT_FUSION_STRICT_MODE environment variable is properly set""" + with patch("hubcap.records.subprocess.run") as mock_subprocess: + mock_result = MagicMock() + mock_result.returncode = 0 + mock_subprocess.return_value = mock_result + + # Ensure env var is not set initially + if "_DBT_FUSION_STRICT_MODE" in os.environ: + del os.environ["_DBT_FUSION_STRICT_MODE"] + + # Run the function + check_fusion_schema_compatibility(self.repo_path) + + # Verify environment variable was set to "1" + self.assertEqual(os.environ.get("_DBT_FUSION_STRICT_MODE"), "1") + + def test_profiles_yml_cleanup_on_file_creation_failure(self): + """Test that cleanup works even if file creation fails""" + # Make the directory read-only to cause file creation to fail + self.repo_path.chmod(0o444) + + try: + result = check_fusion_schema_compatibility(self.repo_path) + # Should return False due to the exception + self.assertFalse(result) + finally: + # Restore permissions for cleanup + self.repo_path.chmod(0o755) + + +if __name__ == "__main__": + # Run with verbose output + unittest.main(verbosity=2) diff --git a/tests/test_fusion_integration.py b/tests/test_fusion_integration.py new file mode 100644 index 0000000..1ed0e59 --- /dev/null +++ b/tests/test_fusion_integration.py @@ -0,0 +1,290 @@ +"""Integration tests for the check_fusion_schema_compatibility function""" + +import os +import tempfile +import unittest +import shutil +from pathlib import Path + +# Import the actual function from the hubcap.records module +from hubcap.records import check_fusion_schema_compatibility + + +class TestFusionSchemaCompatibilityIntegration(unittest.TestCase): + """Integration test cases that actually run dbtf parse""" + + def setUp(self): + """Set up test fixtures""" + # Create a temporary directory for each test + self.temp_dir = tempfile.mkdtemp() + self.repo_path = Path(self.temp_dir) + + # Store original environment state + self.original_env = os.environ.get("_DBT_FUSION_STRICT_MODE") + + def tearDown(self): + """Clean up test fixtures""" + # Restore original environment + if self.original_env is not None: + os.environ["_DBT_FUSION_STRICT_MODE"] = self.original_env + elif "_DBT_FUSION_STRICT_MODE" in os.environ: + del os.environ["_DBT_FUSION_STRICT_MODE"] + + # Clean up temp directory + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_basic_dbt_project(self, project_name="test_package"): + """Create a basic dbt project structure""" + # Create dbt_project.yml + dbt_project_content = f""" +name: '{project_name}' +version: '1.0.0' + +model-paths: ["models"] +analysis-paths: ["analyses"] +test-paths: ["tests"] +seed-paths: ["seeds"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_packages" + +models: + {project_name}: + +schema: public + +materialized: table + +vars: + # Variables for testing + test_var: "test_value" +""" + with open(self.repo_path / "dbt_project.yml", "w") as f: + f.write(dbt_project_content) + + # Create directories + for dir_name in ["models", "tests", "macros", "seeds", "snapshots", "analyses"]: + (self.repo_path / dir_name).mkdir(exist_ok=True) + + def _create_simple_model(self, model_name="test_model"): + """Create a simple dbt model""" + models_dir = self.repo_path / "models" + models_dir.mkdir(exist_ok=True) + + model_content = """ +{{ config(materialized='table') }} + +select + 1 as id, + 'test' as name, + current_timestamp as created_at +""" + with open(models_dir / f"{model_name}.sql", "w") as f: + f.write(model_content) + + def _create_fusion_compatible_model(self): + """Create a model that should be fusion schema compatible""" + models_dir = self.repo_path / "models" + models_dir.mkdir(exist_ok=True) + + # Simple select statement that should be fusion compatible + model_content = """ +{{ config(materialized='table') }} + +select + cast(1 as integer) as id, + cast('test' as varchar(50)) as name, + cast(current_timestamp as timestamp) as created_at +""" + with open(models_dir / "fusion_compatible_model.sql", "w") as f: + f.write(model_content) + + def _create_potentially_incompatible_model(self): + """Create a model that might have fusion compatibility issues""" + models_dir = self.repo_path / "models" + models_dir.mkdir(exist_ok=True) + + # Model with complex transformations that might cause fusion issues + model_content = """ +{{ config(materialized='table') }} + +select + id, + name, + case + when length(name) > 10 then 'long' + else 'short' + end as name_category, + row_number() over (partition by name order by id) as row_num +from ( + select 1 as id, 'test' as name + union all + select 2 as id, 'another_test_name' as name +) base_data +""" + with open(models_dir / "complex_model.sql", "w") as f: + f.write(model_content) + + def test_dbtf_command_available(self): + """Test if dbtf command or dbt-fusion is available in the environment""" + import subprocess + + try: + # Try dbtf first + result = subprocess.run( + ["dbtf", "--version"], capture_output=True, timeout=10 + ) + if result.returncode == 0: + print(f"dbtf version: {result.stdout.decode().strip()}") + return True + except FileNotFoundError: + pass + + try: + # Fall back to dbt command, but it must be dbt-fusion + result = subprocess.run( + ["dbt", "--version"], capture_output=True, timeout=10 + ) + if result.returncode == 0: + output = result.stdout.decode().strip() + print(f"dbt version: {output}") + if "dbt-fusion" in output: + return True + else: + self.fail( + "dbt-fusion is required for fusion compatibility integration tests, but found regular dbt-core instead" + ) + else: + self.fail("dbt command returned non-zero exit code") + except FileNotFoundError: + self.fail( + "Neither dbtf nor dbt command found in PATH - dbt-fusion is required for integration tests" + ) + except subprocess.TimeoutExpired: + self.fail("dbt command timed out") + + def test_fusion_compatibility_simple_project(self): + """Test fusion compatibility with a simple dbt project""" + # Check if dbtf is available + self.test_dbtf_command_available() + + # Create a basic project + self._create_basic_dbt_project("simple_test") + self._create_fusion_compatible_model() + + # Test fusion compatibility + result = check_fusion_schema_compatibility(self.repo_path) + + # Since we have dbt-fusion available and this is a simple valid project, it should be compatible + self.assertTrue(result, "Simple dbt project should be fusion schema compatible") + print(f"Simple project fusion compatibility: {result}") + + def test_fusion_compatibility_complex_project(self): + """Test fusion compatibility with a more complex dbt project""" + # Check if dbtf is available + self.test_dbtf_command_available() + + # Create a project with potentially complex models + self._create_basic_dbt_project("complex_test") + self._create_fusion_compatible_model() + self._create_potentially_incompatible_model() + + # Test fusion compatibility + result = check_fusion_schema_compatibility(self.repo_path) + + # Since we have dbt-fusion available and this is a valid project, it should be compatible + self.assertTrue( + result, "Complex dbt project should be fusion schema compatible" + ) + print(f"Complex project fusion compatibility: {result}") + + def test_fusion_compatibility_empty_project(self): + """Test fusion compatibility with an empty dbt project""" + # Check if dbtf is available + self.test_dbtf_command_available() + + # Create a basic project with no models + self._create_basic_dbt_project("empty_test") + + # Test fusion compatibility - empty project should be compatible + result = check_fusion_schema_compatibility(self.repo_path) + + self.assertTrue(result, "Empty dbt project should be fusion schema compatible") + print(f"Empty project fusion compatibility: {result}") + + def test_profiles_yml_creation_and_cleanup(self): + """Test that profiles.yml is created and cleaned up properly""" + # Check if dbtf is available + self.test_dbtf_command_available() + + self._create_basic_dbt_project("cleanup_test") + self._create_simple_model() + + profiles_path = self.repo_path / "profiles.yml" + + # Ensure profiles.yml doesn't exist before + self.assertFalse(profiles_path.exists()) + + # Run the function + result = check_fusion_schema_compatibility(self.repo_path) + + # Ensure profiles.yml is cleaned up after + self.assertFalse(profiles_path.exists()) + # Since this is a valid project with dbt-fusion available, it should be compatible + self.assertTrue(result, "Valid dbt project should be fusion schema compatible") + + def test_environment_variable_setting(self): + """Test that _DBT_FUSION_STRICT_MODE is set during execution""" + # Check if dbtf is available + self.test_dbtf_command_available() + + self._create_basic_dbt_project("env_test") + self._create_simple_model() + + # Clear environment variable if set + if "_DBT_FUSION_STRICT_MODE" in os.environ: + del os.environ["_DBT_FUSION_STRICT_MODE"] + + # Run the function + result = check_fusion_schema_compatibility(self.repo_path) + + # Since this is a valid project with dbt-fusion available, it should be compatible + self.assertTrue(result, "Valid dbt project should be fusion schema compatible") + # The function should have set the environment variable during execution + self.assertEqual(os.environ.get("_DBT_FUSION_STRICT_MODE"), "1") + + def test_invalid_dbt_project(self): + """Test behavior with an invalid dbt project""" + # Check if dbtf is available + self.test_dbtf_command_available() + + # Create an invalid dbt_project.yml + invalid_content = "invalid: yaml: content:" + with open(self.repo_path / "dbt_project.yml", "w") as f: + f.write(invalid_content) + + # This should return False due to the invalid project + result = check_fusion_schema_compatibility(self.repo_path) + + # Should return False for invalid projects + self.assertFalse(result) + + def test_missing_dbt_project_yml(self): + """Test behavior when dbt_project.yml is missing""" + # Check if dbtf is available + self.test_dbtf_command_available() + + # Don't create dbt_project.yml - just use empty directory + + # This should return False due to missing dbt_project.yml + result = check_fusion_schema_compatibility(self.repo_path) + + # Should return False for missing project file + self.assertFalse(result) + + +if __name__ == "__main__": + # Run with verbose output + unittest.main(verbosity=2) diff --git a/tests/test_update_task_fusion.py b/tests/test_update_task_fusion.py new file mode 100644 index 0000000..6d22c5b --- /dev/null +++ b/tests/test_update_task_fusion.py @@ -0,0 +1,500 @@ +"""Tests for UpdateTask fusion compatibility JSON generation""" + +import json +import os +import tempfile +import unittest +import shutil +from pathlib import Path +from unittest.mock import patch, MagicMock + +# Import the classes we need to test +from hubcap.records import UpdateTask + + +class TestUpdateTaskFusionCompatibility(unittest.TestCase): + """Test UpdateTask's generation of index.json and version.json with fusion compatibility""" + + def setUp(self): + """Set up test fixtures""" + # Create temporary directories + self.temp_dir = tempfile.mkdtemp() + self.hub_dir = Path(self.temp_dir) / "hub.getdbt.com" + self.package_dir = Path(self.temp_dir) / "test_org_test_package" + + # Create hub directory structure + self.hub_dir.mkdir(parents=True) + self.package_dir.mkdir(parents=True) + + # Create a basic dbt project in the package directory + self._create_test_package() + + # Create UpdateTask with test data + self.update_task = UpdateTask( + github_username="test_org", + github_repo_name="test_package", + local_path_to_repo=self.package_dir, + package_name="test_package", + existing_tags=[], + new_tags=["1.0.0", "1.1.0"], + hub_repo="hub.getdbt.com", + ) + + def tearDown(self): + """Clean up test fixtures""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_test_package(self): + """Create a test dbt package structure""" + # Create dbt_project.yml + dbt_project_content = """ +name: 'test_package' +version: '1.0.0' + +model-paths: ["models"] +analysis-paths: ["analyses"] +test-paths: ["tests"] +seed-paths: ["seeds"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_packages" + +models: + test_package: + +schema: public + +materialized: table +""" + with open(self.package_dir / "dbt_project.yml", "w") as f: + f.write(dbt_project_content) + + # Create models directory and a simple model + models_dir = self.package_dir / "models" + models_dir.mkdir(exist_ok=True) + + model_content = """ +{{ config(materialized='table') }} + +select + 1 as id, + 'test' as name, + current_timestamp as created_at +""" + with open(models_dir / "test_model.sql", "w") as f: + f.write(model_content) + + # Create a packages.yml file (sometimes needed) + packages_content = """ +packages: + - package: dbt-labs/dbt_utils + version: ">=1.0.0" +""" + with open(self.package_dir / "packages.yml", "w") as f: + f.write(packages_content) + + @patch("hubcap.records.git_helper.run_cmd") + @patch("hubcap.records.subprocess.run") + @patch("hubcap.records.package.parse_pkgs") + @patch("hubcap.records.package.parse_require_dbt_version") + @patch("hubcap.records.check_fusion_schema_compatibility") + @patch("hubcap.records.UpdateTask.get_sha1") + def test_fusion_compatible_package_json_generation( + self, + mock_sha1, + mock_fusion_check, + mock_parse_dbt_version, + mock_parse_pkgs, + mock_subprocess, + mock_git_cmd, + ): + """Test that fusion-compatible packages generate correct JSON files""" + # Mock returns + mock_sha1.return_value = "abc123def456" + mock_fusion_check.return_value = True # Fusion compatible + mock_parse_dbt_version.return_value = ">=1.0.0" + mock_parse_pkgs.return_value = [ + {"name": "dbt-labs/dbt_utils", "version": ">=1.0.0"} + ] + mock_subprocess.return_value = MagicMock(returncode=0) + + # Mock PR strategy + mock_pr_strategy = MagicMock() + mock_pr_strategy.branch_name.return_value = "test-branch" + + # Run the UpdateTask + os.chdir(self.temp_dir) + branch_name, org_name, package_name = self.update_task.run( + str(self.hub_dir.parent), mock_pr_strategy + ) + + # Verify the branch and basic info + self.assertEqual(branch_name, "test-branch") + self.assertEqual(org_name, "test_org") + self.assertEqual(package_name, "test_package") + + # Check that fusion compatibility was checked for each tag + self.assertEqual(mock_fusion_check.call_count, 2) # Called for both tags + + # Verify version-specific JSON files were created with correct fusion compatibility + version_dir = ( + self.hub_dir + / "data" + / "packages" + / "test_org" + / "test_package" + / "versions" + ) + + # Check 1.0.0.json + version_1_0_0_file = version_dir / "1.0.0.json" + self.assertTrue(version_1_0_0_file.exists()) + + with open(version_1_0_0_file, "r") as f: + version_1_0_0_spec = json.load(f) + + self.assertEqual(version_1_0_0_spec["fusion-schema-compat"], True) + self.assertEqual(version_1_0_0_spec["name"], "test_package") + self.assertEqual(version_1_0_0_spec["version"], "1.0.0") + self.assertIn("downloads", version_1_0_0_spec) + self.assertEqual(version_1_0_0_spec["downloads"]["sha1"], "abc123def456") + + # Check 1.1.0.json + version_1_1_0_file = version_dir / "1.1.0.json" + self.assertTrue(version_1_1_0_file.exists()) + + with open(version_1_1_0_file, "r") as f: + version_1_1_0_spec = json.load(f) + + self.assertEqual(version_1_1_0_spec["fusion-schema-compat"], True) + self.assertEqual(version_1_1_0_spec["name"], "test_package") + self.assertEqual(version_1_1_0_spec["version"], "1.1.0") + + # Check index.json + index_file = ( + self.hub_dir + / "data" + / "packages" + / "test_org" + / "test_package" + / "index.json" + ) + self.assertTrue(index_file.exists()) + + with open(index_file, "r") as f: + index_spec = json.load(f) + + # Index should have fusion compatibility of the latest version (1.1.0) + self.assertEqual(index_spec["latest-fusion-schema-compat"], True) + self.assertEqual(index_spec["name"], "test_package") + self.assertEqual(index_spec["namespace"], "test_org") + self.assertEqual(index_spec["latest"], "1.1.0") + + @patch("hubcap.records.git_helper.run_cmd") + @patch("hubcap.records.subprocess.run") + @patch("hubcap.records.package.parse_pkgs") + @patch("hubcap.records.package.parse_require_dbt_version") + @patch("hubcap.records.check_fusion_schema_compatibility") + @patch("hubcap.records.UpdateTask.get_sha1") + def test_fusion_incompatible_package_json_generation( + self, + mock_sha1, + mock_fusion_check, + mock_parse_dbt_version, + mock_parse_pkgs, + mock_subprocess, + mock_git_cmd, + ): + """Test that fusion-incompatible packages generate correct JSON files""" + # Mock returns + mock_sha1.return_value = "xyz789uvw012" + mock_fusion_check.return_value = False # Fusion incompatible + mock_parse_dbt_version.return_value = ">=1.0.0" + mock_parse_pkgs.return_value = [ + {"name": "dbt-labs/dbt_utils", "version": ">=1.0.0"} + ] + mock_subprocess.return_value = MagicMock(returncode=0) + + # Mock PR strategy + mock_pr_strategy = MagicMock() + mock_pr_strategy.branch_name.return_value = "test-branch-incompatible" + + # Run the UpdateTask + os.chdir(self.temp_dir) + branch_name, org_name, package_name = self.update_task.run( + str(self.hub_dir.parent), mock_pr_strategy + ) + + # Verify basic info + self.assertEqual(branch_name, "test-branch-incompatible") + self.assertEqual(org_name, "test_org") + self.assertEqual(package_name, "test_package") + + # Check that fusion compatibility was checked for each tag + self.assertEqual(mock_fusion_check.call_count, 2) # Called for both tags + + # Verify version-specific JSON files were created with correct fusion compatibility + version_dir = ( + self.hub_dir + / "data" + / "packages" + / "test_org" + / "test_package" + / "versions" + ) + + # Check 1.0.0.json + version_1_0_0_file = version_dir / "1.0.0.json" + self.assertTrue(version_1_0_0_file.exists()) + + with open(version_1_0_0_file, "r") as f: + version_1_0_0_spec = json.load(f) + + self.assertEqual(version_1_0_0_spec["fusion-schema-compat"], False) + self.assertEqual(version_1_0_0_spec["name"], "test_package") + self.assertEqual(version_1_0_0_spec["version"], "1.0.0") + + # Check 1.1.0.json + version_1_1_0_file = version_dir / "1.1.0.json" + self.assertTrue(version_1_1_0_file.exists()) + + with open(version_1_1_0_file, "r") as f: + version_1_1_0_spec = json.load(f) + + self.assertEqual(version_1_1_0_spec["fusion-schema-compat"], False) + self.assertEqual(version_1_1_0_spec["name"], "test_package") + self.assertEqual(version_1_1_0_spec["version"], "1.1.0") + + # Check index.json + index_file = ( + self.hub_dir + / "data" + / "packages" + / "test_org" + / "test_package" + / "index.json" + ) + self.assertTrue(index_file.exists()) + + with open(index_file, "r") as f: + index_spec = json.load(f) + + # Index should have fusion compatibility of the latest version (1.1.0) + self.assertEqual(index_spec["latest-fusion-schema-compat"], False) + self.assertEqual(index_spec["name"], "test_package") + self.assertEqual(index_spec["namespace"], "test_org") + self.assertEqual(index_spec["latest"], "1.1.0") + + @patch("hubcap.records.git_helper.run_cmd") + @patch("hubcap.records.subprocess.run") + @patch("hubcap.records.package.parse_pkgs") + @patch("hubcap.records.package.parse_require_dbt_version") + @patch("hubcap.records.check_fusion_schema_compatibility") + @patch("hubcap.records.UpdateTask.get_sha1") + def test_mixed_fusion_compatibility_versions( + self, + mock_sha1, + mock_fusion_check, + mock_parse_dbt_version, + mock_parse_pkgs, + mock_subprocess, + mock_git_cmd, + ): + """Test packages with mixed fusion compatibility across versions""" + # Mock returns - different compatibility for different versions + mock_sha1.return_value = "mixed123compat" + mock_parse_dbt_version.return_value = ">=1.0.0" + mock_parse_pkgs.return_value = [] + mock_subprocess.return_value = MagicMock(returncode=0) + + # Mock fusion compatibility check to return different values for different calls + # First call (1.0.0): incompatible, Second call (1.1.0): compatible + mock_fusion_check.side_effect = [False, True] + + # Mock PR strategy + mock_pr_strategy = MagicMock() + mock_pr_strategy.branch_name.return_value = "test-branch-mixed" + + # Run the UpdateTask + os.chdir(self.temp_dir) + branch_name, org_name, package_name = self.update_task.run( + str(self.hub_dir.parent), mock_pr_strategy + ) + + # Verify basic info + self.assertEqual(branch_name, "test-branch-mixed") + + # Check that fusion compatibility was checked for each tag + self.assertEqual(mock_fusion_check.call_count, 2) + + # Verify version-specific JSON files have correct individual compatibility + version_dir = ( + self.hub_dir + / "data" + / "packages" + / "test_org" + / "test_package" + / "versions" + ) + + # Check 1.0.0.json (should be incompatible) + with open(version_dir / "1.0.0.json", "r") as f: + version_1_0_0_spec = json.load(f) + self.assertEqual(version_1_0_0_spec["fusion-schema-compat"], False) + + # Check 1.1.0.json (should be compatible) + with open(version_dir / "1.1.0.json", "r") as f: + version_1_1_0_spec = json.load(f) + self.assertEqual(version_1_1_0_spec["fusion-schema-compat"], True) + + # Check index.json (should reflect latest version - 1.1.0 - which is compatible) + index_file = ( + self.hub_dir + / "data" + / "packages" + / "test_org" + / "test_package" + / "index.json" + ) + with open(index_file, "r") as f: + index_spec = json.load(f) + + self.assertEqual( + index_spec["latest-fusion-schema-compat"], True + ) # Latest version (1.1.0) is compatible + self.assertEqual(index_spec["latest"], "1.1.0") + + @patch("hubcap.records.git_helper.run_cmd") + @patch("hubcap.records.subprocess.run") + @patch("hubcap.records.package.parse_pkgs") + @patch("hubcap.records.package.parse_require_dbt_version") + @patch("hubcap.records.check_fusion_schema_compatibility") + @patch("hubcap.records.UpdateTask.get_sha1") + def test_existing_index_file_fusion_compatibility_update( + self, + mock_sha1, + mock_fusion_check, + mock_parse_dbt_version, + mock_parse_pkgs, + mock_subprocess, + mock_git_cmd, + ): + """Test that existing index.json files are properly updated with new fusion compatibility""" + # Create existing index.json with old data + index_dir = self.hub_dir / "data" / "packages" / "test_org" / "test_package" + index_dir.mkdir(parents=True, exist_ok=True) + + existing_index = { + "name": "test_package", + "namespace": "test_org", + "description": "A test package for fusion compatibility", + "latest": "0.9.0", + "latest-fusion-schema-compat": False, # Old version was incompatible + "assets": {"logo": "logos/custom.svg"}, + } + + with open(index_dir / "index.json", "w") as f: + json.dump(existing_index, f) + + # Mock returns for new version + mock_sha1.return_value = "update123test" + mock_fusion_check.return_value = True # New version is compatible + mock_parse_dbt_version.return_value = ">=1.0.0" + mock_parse_pkgs.return_value = [] + mock_subprocess.return_value = MagicMock(returncode=0) + + # Mock PR strategy + mock_pr_strategy = MagicMock() + mock_pr_strategy.branch_name.return_value = "test-branch-update" + + # Update existing tags to include the old version + self.update_task.existing_tags = ["0.9.0"] + + # Run the UpdateTask + os.chdir(self.temp_dir) + self.update_task.run(str(self.hub_dir.parent), mock_pr_strategy) + + # Check updated index.json + with open(index_dir / "index.json", "r") as f: + updated_index = json.load(f) + + # Should preserve existing description and assets + self.assertEqual( + updated_index["description"], "A test package for fusion compatibility" + ) + self.assertEqual(updated_index["assets"]["logo"], "logos/custom.svg") + + # Should update latest version and fusion compatibility + self.assertEqual(updated_index["latest"], "1.1.0") # Latest new tag + self.assertEqual( + updated_index["latest-fusion-schema-compat"], True + ) # New latest version is compatible + self.assertEqual(updated_index["name"], "test_package") + self.assertEqual(updated_index["namespace"], "test_org") + + def test_fusion_compatibility_directory_bug(self): + """Test that fusion compatibility is checked on the correct directory""" + # This test is designed to catch the bug where fusion compatibility + # is checked on the wrong directory (hub dir instead of package dir) + + with patch( + "hubcap.records.check_fusion_schema_compatibility" + ) as mock_fusion_check: + with patch("hubcap.records.git_helper.run_cmd"): + with patch("hubcap.records.subprocess.run") as mock_subprocess: + with patch("hubcap.records.package.parse_pkgs") as mock_parse_pkgs: + with patch( + "hubcap.records.package.parse_require_dbt_version" + ) as mock_parse_dbt_version: + with patch( + "hubcap.records.UpdateTask.get_sha1" + ) as mock_sha1: + # Setup mocks + mock_fusion_check.return_value = True + mock_subprocess.return_value = MagicMock(returncode=0) + mock_parse_pkgs.return_value = [] + mock_parse_dbt_version.return_value = ">=1.0.0" + mock_sha1.return_value = "test123" + + # Mock PR strategy + mock_pr_strategy = MagicMock() + mock_pr_strategy.branch_name.return_value = ( + "test-branch" + ) + + # Set up a single tag for simpler testing + self.update_task.new_tags = ["1.0.0"] + + # Run the UpdateTask + os.chdir(self.temp_dir) + self.update_task.run( + str(self.hub_dir.parent), mock_pr_strategy + ) + + # Verify that fusion compatibility was called with the package directory + # NOT the hub directory + mock_fusion_check.assert_called_once() + + # Get the actual call argument + actual_call_path = mock_fusion_check.call_args[0][0] + + # Verify that fusion compatibility was called with the package directory + print( + f"Fusion compatibility called with path: {actual_call_path}" + ) + print( + f"Expected package path: {self.update_task.local_path_to_repo}" + ) + print(f"Current working directory: {os.getcwd()}") + + # This should now pass after fixing the bug + self.assertEqual( + str(actual_call_path), + str(self.update_task.local_path_to_repo), + ) + + +if __name__ == "__main__": + unittest.main(verbosity=2)