Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/datapilot/clients/altimate/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Constants for the Altimate API client."""

# Supported dbt artifact file types for onboarding
SUPPORTED_ARTIFACT_TYPES = {
"manifest",
"catalog",
"run_results",
"sources",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add semantic manifest, we can already collect it, we should not validate it.

"semantic_manifest",
}
24 changes: 23 additions & 1 deletion src/datapilot/clients/altimate/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from requests import Response

from datapilot.clients.altimate.client import APIClient
from datapilot.clients.altimate.constants import SUPPORTED_ARTIFACT_TYPES


def check_token_and_instance(
Expand Down Expand Up @@ -56,6 +57,27 @@ def validate_permissions(


def onboard_file(api_token, tenant, dbt_core_integration_id, dbt_core_integration_environment, file_type, file_path, backend_url) -> Dict:
"""
Upload a dbt artifact file to the Altimate backend.

Args:
api_token: API authentication token
tenant: Tenant/instance name
dbt_core_integration_id: ID of the dbt integration
dbt_core_integration_environment: Environment type (e.g., PROD)
file_type: Type of artifact - one of: manifest, catalog, run_results, sources, semantic_manifest
file_path: Path to the artifact file
backend_url: URL of the Altimate backend

Returns:
Dict with 'ok' boolean and optional 'message' on failure
"""
Comment on lines +60 to +74
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation added for the onboard_file function is helpful and clear. However, it should note that validation is the caller's responsibility. Currently, semantic_manifest files are not validated before being uploaded, which differs from other artifact types.

Copilot uses AI. Check for mistakes.
if file_type not in SUPPORTED_ARTIFACT_TYPES:
return {
"ok": False,
"message": f"Unsupported file type: {file_type}. Supported types: {', '.join(sorted(SUPPORTED_ARTIFACT_TYPES))}",
}

api_client = APIClient(api_token, base_url=backend_url, tenant=tenant)

params = {
Expand Down Expand Up @@ -84,7 +106,7 @@ def onboard_file(api_token, tenant, dbt_core_integration_id, dbt_core_integratio
api_client.log("Error getting signed URL.")
return {
"ok": False,
"message": "Error in uploading the manifest. ",
"message": f"Error in uploading the {file_type}.",
}


Expand Down
83 changes: 74 additions & 9 deletions src/datapilot/core/platforms/dbt/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from datapilot.core.platforms.dbt.formatting import generate_project_insights_table
from datapilot.core.platforms.dbt.utils import load_catalog
from datapilot.core.platforms.dbt.utils import load_manifest
from datapilot.core.platforms.dbt.utils import load_run_results
from datapilot.core.platforms.dbt.utils import load_sources
from datapilot.utils.formatting.utils import tabulate_data
from datapilot.utils.utils import map_url_to_instance

Expand Down Expand Up @@ -155,6 +157,9 @@ def project_health(
)
@click.option("--manifest-path", required=True, prompt="Manifest Path", help="Path to the manifest file.")
@click.option("--catalog-path", required=False, prompt=False, help="Path to the catalog file.")
@click.option("--run-results-path", required=False, prompt=False, help="Path to the run_results.json file.")
@click.option("--sources-path", required=False, prompt=False, help="Path to the sources.json file (source freshness results).")
@click.option("--semantic-manifest-path", required=False, prompt=False, help="Path to the semantic_manifest.json file.")
def onboard(
token,
instance_name,
Expand All @@ -164,6 +169,9 @@ def onboard(
dbt_integration_environment,
manifest_path,
catalog_path,
run_results_path,
sources_path,
semantic_manifest_path,
):
"""Onboard a manifest file to DBT. You can specify either --dbt_integration_id or --dbt_integration_name."""

Expand Down Expand Up @@ -198,34 +206,91 @@ def onboard(
elif dbt_integration_name and dbt_integration_id:
click.echo("Warning: Both integration ID and name provided. Using ID and ignoring name.")

# Validate manifest (required)
try:
load_manifest(manifest_path)
except Exception as e:
click.echo(f"Error: {e}")
return

# Validate optional artifacts if provided
if catalog_path:
try:
load_catalog(catalog_path)
except Exception as e:
click.echo(f"Error validating catalog: {e}")
return

if run_results_path:
try:
load_run_results(run_results_path)
except Exception as e:
click.echo(f"Error validating run_results: {e}")
return

if sources_path:
try:
load_sources(sources_path)
except Exception as e:
click.echo(f"Error validating sources: {e}")
return

# Onboard manifest (required)
response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "manifest", manifest_path, backend_url)
if response["ok"]:
click.echo("Manifest onboarded successfully!")
else:
click.echo(f"{response['message']}")

if not catalog_path:
return

response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "catalog", catalog_path, backend_url)
if response["ok"]:
click.echo("Catalog onboarded successfully!")
else:
click.echo(f"{response['message']}")
# Onboard optional artifacts
artifacts_uploaded = ["manifest"]

if catalog_path:
response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "catalog", catalog_path, backend_url)
if response["ok"]:
click.echo("Catalog onboarded successfully!")
artifacts_uploaded.append("catalog")
else:
click.echo(f"{response['message']}")

if run_results_path:
response = onboard_file(
token, instance_name, dbt_integration_id, dbt_integration_environment, "run_results", run_results_path, backend_url
)
if response["ok"]:
click.echo("Run results onboarded successfully!")
artifacts_uploaded.append("run_results")
else:
click.echo(f"{response['message']}")

if sources_path:
response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "sources", sources_path, backend_url)
if response["ok"]:
click.echo("Sources onboarded successfully!")
artifacts_uploaded.append("sources")
else:
click.echo(f"{response['message']}")

if semantic_manifest_path:
response = onboard_file(
token, instance_name, dbt_integration_id, dbt_integration_environment, "semantic_manifest", semantic_manifest_path, backend_url
)
if response["ok"]:
click.echo("Semantic manifest onboarded successfully!")
artifacts_uploaded.append("semantic_manifest")
else:
click.echo(f"{response['message']}")
Comment on lines +275 to +283

This comment was marked as outdated.

Comment on lines +275 to +283
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantic_manifest_path artifact is uploaded without validation, unlike the other artifacts (manifest, catalog, run_results, sources). This inconsistency could lead to invalid files being uploaded. Consider adding validation for semantic_manifest before uploading, similar to lines 217-236.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for semantic_manifest only. dbt-artifacts-library doesn't have a parser for it, so are storing it as-it-is.

Comment on lines +275 to +283

This comment was marked as outdated.


# Start ingestion
response = start_dbt_ingestion(token, instance_name, dbt_integration_id, dbt_integration_environment, backend_url)
if response["ok"]:
url = map_url_to_instance(backend_url, instance_name)
artifacts_str = ", ".join(artifacts_uploaded)
if not url:
click.echo("Manifest and catalog ingestion has started.")
click.echo(f"Ingestion has started for: {artifacts_str}")
else:
url = f"{url}/settings/integrations/{dbt_integration_id}/{dbt_integration_environment}"
click.echo(f"Manifest and catalog ingestion has started. You can check the status at {url}")
click.echo(f"Ingestion has started for: {artifacts_str}. You can check the status at {url}")
else:
click.echo(f"{response['message']}")
17 changes: 17 additions & 0 deletions src/datapilot/core/platforms/dbt/schemas/run_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import Union

from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v1 import RunResultsV1
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v2 import RunResultsV2
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v3 import RunResultsV3
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v4 import RunResultsV4
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v5 import RunResultsV5
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v6 import RunResultsV6

RunResults = Union[
RunResultsV6,
RunResultsV5,
RunResultsV4,
RunResultsV3,
RunResultsV2,
RunResultsV1,
]
11 changes: 11 additions & 0 deletions src/datapilot/core/platforms/dbt/schemas/sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Union

from vendor.dbt_artifacts_parser.parsers.sources.sources_v1 import SourcesV1
from vendor.dbt_artifacts_parser.parsers.sources.sources_v2 import SourcesV2
from vendor.dbt_artifacts_parser.parsers.sources.sources_v3 import SourcesV3

Sources = Union[
SourcesV3,
SourcesV2,
SourcesV1,
]
36 changes: 34 additions & 2 deletions src/datapilot/core/platforms/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
from datapilot.core.platforms.dbt.schemas.manifest import AltimateManifestSourceNode
from datapilot.core.platforms.dbt.schemas.manifest import AltimateManifestTestNode
from datapilot.core.platforms.dbt.schemas.manifest import Manifest
from datapilot.core.platforms.dbt.schemas.run_results import RunResults
from datapilot.core.platforms.dbt.schemas.sources import Sources
from datapilot.exceptions.exceptions import AltimateFileNotFoundError
from datapilot.exceptions.exceptions import AltimateInvalidJSONError
from datapilot.utils.utils import extract_dir_name_from_file_path
from datapilot.utils.utils import extract_folders_in_path
from datapilot.utils.utils import is_superset_path
from datapilot.utils.utils import load_json
from vendor.dbt_artifacts_parser.parser import parse_manifest
from vendor.dbt_artifacts_parser.parser import parse_run_results
from vendor.dbt_artifacts_parser.parser import parse_sources

MODEL_TYPE_PATTERNS = {
STAGING: r"^stg_.*", # Example: models starting with 'stg_'
Expand Down Expand Up @@ -94,8 +98,36 @@ def load_catalog(catalog_path: str) -> Catalog:
return catalog


def load_run_results(run_results_path: str) -> Manifest:
raise NotImplementedError
def load_run_results(run_results_path: str) -> RunResults:
try:
run_results_dict = load_json(run_results_path)
except FileNotFoundError as e:
raise AltimateFileNotFoundError(f"Run results file not found: {run_results_path}. Error: {e}") from e
except ValueError as e:
raise AltimateInvalidJSONError(f"Invalid JSON file: {run_results_path}. Error: {e}") from e

try:
run_results: RunResults = parse_run_results(run_results_dict)
except ValueError as e:
raise AltimateInvalidManifestError(f"Invalid run results file: {run_results_path}. Error: {e}") from e
Comment on lines +110 to +112

This comment was marked as outdated.

Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception type AltimateInvalidManifestError is misleading when used for non-manifest files like run_results. Consider creating a more generic exception type such as AltimateInvalidArtifactError or using specific exceptions for each artifact type.

Copilot uses AI. Check for mistakes.

return run_results


def load_sources(sources_path: str) -> Sources:
try:
sources_dict = load_json(sources_path)
except FileNotFoundError as e:
raise AltimateFileNotFoundError(f"Sources file not found: {sources_path}. Error: {e}") from e
except ValueError as e:
raise AltimateInvalidJSONError(f"Invalid JSON file: {sources_path}. Error: {e}") from e

try:
sources: Sources = parse_sources(sources_dict)
except ValueError as e:
raise AltimateInvalidManifestError(f"Invalid sources file: {sources_path}. Error: {e}") from e
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception type AltimateInvalidManifestError is misleading when used for non-manifest files like sources. Consider creating a more generic exception type such as AltimateInvalidArtifactError or using specific exceptions for each artifact type.

Copilot uses AI. Check for mistakes.

return sources


# TODO: Add tests!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

class BaseArtifactMetadata(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
extra="allow",
)
dbt_schema_version: str
dbt_version: Optional[str] = "0.19.0"
Expand Down Expand Up @@ -47,7 +47,7 @@ class Status2(Enum):

class TimingInfo(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
extra="allow",
)
name: str
started_at: Optional[datetime] = None
Expand All @@ -56,7 +56,7 @@ class TimingInfo(BaseParserModel):

class RunResultOutput(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
extra="allow",
)
status: Union[Status, Status1, Status2]
timing: list[TimingInfo]
Expand All @@ -69,7 +69,7 @@ class RunResultOutput(BaseParserModel):

class RunResultsV1(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
extra="allow",
)
metadata: BaseArtifactMetadata
results: list[RunResultOutput]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

class BaseArtifactMetadata(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
extra="allow",
)
dbt_schema_version: str
dbt_version: Optional[str] = "0.20.0rc1"
Expand Down Expand Up @@ -47,7 +47,7 @@ class Status2(Enum):

class TimingInfo(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
extra="allow",
)
name: str
started_at: Optional[datetime] = None
Expand All @@ -56,7 +56,7 @@ class TimingInfo(BaseParserModel):

class RunResultOutput(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
extra="allow",
)
status: Union[Status, Status1, Status2]
timing: list[TimingInfo]
Expand All @@ -70,7 +70,7 @@ class RunResultOutput(BaseParserModel):

class RunResultsV2(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
extra="allow",
)
metadata: BaseArtifactMetadata
results: list[RunResultOutput]
Expand Down
Loading