-
Notifications
You must be signed in to change notification settings - Fork 1
feat: add new dbt artifacts support #89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| """Constants for the Altimate API client.""" | ||
|
|
||
| # Supported dbt artifact file types for onboarding | ||
| SUPPORTED_ARTIFACT_TYPES = { | ||
| "manifest", | ||
| "catalog", | ||
| "run_results", | ||
| "sources", | ||
| "semantic_manifest", | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |
| from requests import Response | ||
|
|
||
| from datapilot.clients.altimate.client import APIClient | ||
| from datapilot.clients.altimate.constants import SUPPORTED_ARTIFACT_TYPES | ||
|
|
||
|
|
||
| def check_token_and_instance( | ||
|
|
@@ -56,6 +57,27 @@ def validate_permissions( | |
|
|
||
|
|
||
| def onboard_file(api_token, tenant, dbt_core_integration_id, dbt_core_integration_environment, file_type, file_path, backend_url) -> Dict: | ||
| """ | ||
| Upload a dbt artifact file to the Altimate backend. | ||
|
|
||
| Args: | ||
| api_token: API authentication token | ||
| tenant: Tenant/instance name | ||
| dbt_core_integration_id: ID of the dbt integration | ||
| dbt_core_integration_environment: Environment type (e.g., PROD) | ||
| file_type: Type of artifact - one of: manifest, catalog, run_results, sources, semantic_manifest | ||
| file_path: Path to the artifact file | ||
| backend_url: URL of the Altimate backend | ||
|
|
||
| Returns: | ||
| Dict with 'ok' boolean and optional 'message' on failure | ||
| """ | ||
|
Comment on lines
+60
to
+74
|
||
| if file_type not in SUPPORTED_ARTIFACT_TYPES: | ||
| return { | ||
| "ok": False, | ||
| "message": f"Unsupported file type: {file_type}. Supported types: {', '.join(sorted(SUPPORTED_ARTIFACT_TYPES))}", | ||
| } | ||
|
|
||
| api_client = APIClient(api_token, base_url=backend_url, tenant=tenant) | ||
|
|
||
| params = { | ||
|
|
@@ -84,7 +106,7 @@ def onboard_file(api_token, tenant, dbt_core_integration_id, dbt_core_integratio | |
| api_client.log("Error getting signed URL.") | ||
| return { | ||
| "ok": False, | ||
| "message": "Error in uploading the manifest. ", | ||
| "message": f"Error in uploading the {file_type}.", | ||
| } | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,8 @@ | |
| from datapilot.core.platforms.dbt.formatting import generate_project_insights_table | ||
| from datapilot.core.platforms.dbt.utils import load_catalog | ||
| from datapilot.core.platforms.dbt.utils import load_manifest | ||
| from datapilot.core.platforms.dbt.utils import load_run_results | ||
| from datapilot.core.platforms.dbt.utils import load_sources | ||
| from datapilot.utils.formatting.utils import tabulate_data | ||
| from datapilot.utils.utils import map_url_to_instance | ||
|
|
||
|
|
@@ -155,6 +157,9 @@ def project_health( | |
| ) | ||
| @click.option("--manifest-path", required=True, prompt="Manifest Path", help="Path to the manifest file.") | ||
| @click.option("--catalog-path", required=False, prompt=False, help="Path to the catalog file.") | ||
| @click.option("--run-results-path", required=False, prompt=False, help="Path to the run_results.json file.") | ||
| @click.option("--sources-path", required=False, prompt=False, help="Path to the sources.json file (source freshness results).") | ||
| @click.option("--semantic-manifest-path", required=False, prompt=False, help="Path to the semantic_manifest.json file.") | ||
| def onboard( | ||
| token, | ||
| instance_name, | ||
|
|
@@ -164,6 +169,9 @@ def onboard( | |
| dbt_integration_environment, | ||
| manifest_path, | ||
| catalog_path, | ||
| run_results_path, | ||
| sources_path, | ||
| semantic_manifest_path, | ||
| ): | ||
| """Onboard a manifest file to DBT. You can specify either --dbt_integration_id or --dbt_integration_name.""" | ||
|
|
||
|
|
@@ -198,34 +206,91 @@ def onboard( | |
| elif dbt_integration_name and dbt_integration_id: | ||
| click.echo("Warning: Both integration ID and name provided. Using ID and ignoring name.") | ||
|
|
||
| # Validate manifest (required) | ||
| try: | ||
| load_manifest(manifest_path) | ||
| except Exception as e: | ||
| click.echo(f"Error: {e}") | ||
| return | ||
|
|
||
| # Validate optional artifacts if provided | ||
| if catalog_path: | ||
| try: | ||
| load_catalog(catalog_path) | ||
| except Exception as e: | ||
| click.echo(f"Error validating catalog: {e}") | ||
| return | ||
|
|
||
| if run_results_path: | ||
| try: | ||
| load_run_results(run_results_path) | ||
| except Exception as e: | ||
| click.echo(f"Error validating run_results: {e}") | ||
| return | ||
|
|
||
| if sources_path: | ||
| try: | ||
| load_sources(sources_path) | ||
| except Exception as e: | ||
| click.echo(f"Error validating sources: {e}") | ||
| return | ||
|
|
||
| # Onboard manifest (required) | ||
| response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "manifest", manifest_path, backend_url) | ||
| if response["ok"]: | ||
| click.echo("Manifest onboarded successfully!") | ||
| else: | ||
| click.echo(f"{response['message']}") | ||
|
|
||
| if not catalog_path: | ||
| return | ||
|
|
||
| response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "catalog", catalog_path, backend_url) | ||
| if response["ok"]: | ||
| click.echo("Catalog onboarded successfully!") | ||
| else: | ||
| click.echo(f"{response['message']}") | ||
| # Onboard optional artifacts | ||
| artifacts_uploaded = ["manifest"] | ||
|
|
||
| if catalog_path: | ||
| response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "catalog", catalog_path, backend_url) | ||
| if response["ok"]: | ||
| click.echo("Catalog onboarded successfully!") | ||
| artifacts_uploaded.append("catalog") | ||
| else: | ||
| click.echo(f"{response['message']}") | ||
|
|
||
| if run_results_path: | ||
| response = onboard_file( | ||
| token, instance_name, dbt_integration_id, dbt_integration_environment, "run_results", run_results_path, backend_url | ||
| ) | ||
| if response["ok"]: | ||
| click.echo("Run results onboarded successfully!") | ||
| artifacts_uploaded.append("run_results") | ||
| else: | ||
| click.echo(f"{response['message']}") | ||
|
|
||
| if sources_path: | ||
| response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "sources", sources_path, backend_url) | ||
| if response["ok"]: | ||
| click.echo("Sources onboarded successfully!") | ||
| artifacts_uploaded.append("sources") | ||
| else: | ||
| click.echo(f"{response['message']}") | ||
|
|
||
| if semantic_manifest_path: | ||
| response = onboard_file( | ||
| token, instance_name, dbt_integration_id, dbt_integration_environment, "semantic_manifest", semantic_manifest_path, backend_url | ||
| ) | ||
| if response["ok"]: | ||
| click.echo("Semantic manifest onboarded successfully!") | ||
| artifacts_uploaded.append("semantic_manifest") | ||
| else: | ||
| click.echo(f"{response['message']}") | ||
|
Comment on lines
+275
to
+283
This comment was marked as outdated.
Sorry, something went wrong.
Comment on lines
+275
to
+283
|
||
|
|
||
| # Start ingestion | ||
| response = start_dbt_ingestion(token, instance_name, dbt_integration_id, dbt_integration_environment, backend_url) | ||
| if response["ok"]: | ||
| url = map_url_to_instance(backend_url, instance_name) | ||
| artifacts_str = ", ".join(artifacts_uploaded) | ||
| if not url: | ||
| click.echo("Manifest and catalog ingestion has started.") | ||
| click.echo(f"Ingestion has started for: {artifacts_str}") | ||
| else: | ||
| url = f"{url}/settings/integrations/{dbt_integration_id}/{dbt_integration_environment}" | ||
| click.echo(f"Manifest and catalog ingestion has started. You can check the status at {url}") | ||
| click.echo(f"Ingestion has started for: {artifacts_str}. You can check the status at {url}") | ||
| else: | ||
| click.echo(f"{response['message']}") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| from typing import Union | ||
|
|
||
| from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v1 import RunResultsV1 | ||
| from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v2 import RunResultsV2 | ||
| from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v3 import RunResultsV3 | ||
| from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v4 import RunResultsV4 | ||
| from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v5 import RunResultsV5 | ||
| from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v6 import RunResultsV6 | ||
|
|
||
| RunResults = Union[ | ||
| RunResultsV6, | ||
| RunResultsV5, | ||
| RunResultsV4, | ||
| RunResultsV3, | ||
| RunResultsV2, | ||
| RunResultsV1, | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| from typing import Union | ||
|
|
||
| from vendor.dbt_artifacts_parser.parsers.sources.sources_v1 import SourcesV1 | ||
| from vendor.dbt_artifacts_parser.parsers.sources.sources_v2 import SourcesV2 | ||
| from vendor.dbt_artifacts_parser.parsers.sources.sources_v3 import SourcesV3 | ||
|
|
||
| Sources = Union[ | ||
| SourcesV3, | ||
| SourcesV2, | ||
| SourcesV1, | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,13 +22,17 @@ | |
| from datapilot.core.platforms.dbt.schemas.manifest import AltimateManifestSourceNode | ||
| from datapilot.core.platforms.dbt.schemas.manifest import AltimateManifestTestNode | ||
| from datapilot.core.platforms.dbt.schemas.manifest import Manifest | ||
| from datapilot.core.platforms.dbt.schemas.run_results import RunResults | ||
| from datapilot.core.platforms.dbt.schemas.sources import Sources | ||
| from datapilot.exceptions.exceptions import AltimateFileNotFoundError | ||
| from datapilot.exceptions.exceptions import AltimateInvalidJSONError | ||
| from datapilot.utils.utils import extract_dir_name_from_file_path | ||
| from datapilot.utils.utils import extract_folders_in_path | ||
| from datapilot.utils.utils import is_superset_path | ||
| from datapilot.utils.utils import load_json | ||
| from vendor.dbt_artifacts_parser.parser import parse_manifest | ||
| from vendor.dbt_artifacts_parser.parser import parse_run_results | ||
| from vendor.dbt_artifacts_parser.parser import parse_sources | ||
|
|
||
| MODEL_TYPE_PATTERNS = { | ||
| STAGING: r"^stg_.*", # Example: models starting with 'stg_' | ||
|
|
@@ -94,8 +98,36 @@ def load_catalog(catalog_path: str) -> Catalog: | |
| return catalog | ||
|
|
||
|
|
||
| def load_run_results(run_results_path: str) -> Manifest: | ||
| raise NotImplementedError | ||
| def load_run_results(run_results_path: str) -> RunResults: | ||
| try: | ||
| run_results_dict = load_json(run_results_path) | ||
| except FileNotFoundError as e: | ||
| raise AltimateFileNotFoundError(f"Run results file not found: {run_results_path}. Error: {e}") from e | ||
| except ValueError as e: | ||
| raise AltimateInvalidJSONError(f"Invalid JSON file: {run_results_path}. Error: {e}") from e | ||
|
|
||
| try: | ||
| run_results: RunResults = parse_run_results(run_results_dict) | ||
| except ValueError as e: | ||
| raise AltimateInvalidManifestError(f"Invalid run results file: {run_results_path}. Error: {e}") from e | ||
|
Comment on lines
+110
to
+112
This comment was marked as outdated.
Sorry, something went wrong.
|
||
|
|
||
| return run_results | ||
|
|
||
|
|
||
| def load_sources(sources_path: str) -> Sources: | ||
| try: | ||
| sources_dict = load_json(sources_path) | ||
| except FileNotFoundError as e: | ||
| raise AltimateFileNotFoundError(f"Sources file not found: {sources_path}. Error: {e}") from e | ||
| except ValueError as e: | ||
| raise AltimateInvalidJSONError(f"Invalid JSON file: {sources_path}. Error: {e}") from e | ||
|
|
||
| try: | ||
| sources: Sources = parse_sources(sources_dict) | ||
| except ValueError as e: | ||
| raise AltimateInvalidManifestError(f"Invalid sources file: {sources_path}. Error: {e}") from e | ||
|
||
|
|
||
| return sources | ||
|
|
||
|
|
||
| # TODO: Add tests! | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also add semantic manifest, we can already collect it, we should not validate it.