Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/datapilot/clients/altimate/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Constants for the Altimate API client."""

# Supported dbt artifact file types for onboarding
SUPPORTED_ARTIFACT_TYPES = {
"manifest",
"catalog",
"run_results",
"sources",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add semantic manifest, we can already collect it, we should not validate it.

"semantic_manifest",
}
22 changes: 22 additions & 0 deletions src/datapilot/clients/altimate/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from requests import Response

from datapilot.clients.altimate.client import APIClient
from datapilot.clients.altimate.constants import SUPPORTED_ARTIFACT_TYPES


def check_token_and_instance(
Expand Down Expand Up @@ -56,6 +57,27 @@ def validate_permissions(


def onboard_file(api_token, tenant, dbt_core_integration_id, dbt_core_integration_environment, file_type, file_path, backend_url) -> Dict:
"""
Upload a dbt artifact file to the Altimate backend.

Args:
api_token: API authentication token
tenant: Tenant/instance name
dbt_core_integration_id: ID of the dbt integration
dbt_core_integration_environment: Environment type (e.g., PROD)
file_type: Type of artifact - one of: manifest, catalog, run_results, sources, semantic_manifest
file_path: Path to the artifact file
backend_url: URL of the Altimate backend

Returns:
Dict with 'ok' boolean and optional 'message' on failure
"""
Comment on lines +60 to +74
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation added for the onboard_file function is helpful and clear. However, it should note that validation is the caller's responsibility. Currently, semantic_manifest files are not validated before being uploaded, which differs from other artifact types.

Copilot uses AI. Check for mistakes.
if file_type not in SUPPORTED_ARTIFACT_TYPES:
return {
"ok": False,
"message": f"Unsupported file type: {file_type}. Supported types: {', '.join(sorted(SUPPORTED_ARTIFACT_TYPES))}",
}

api_client = APIClient(api_token, base_url=backend_url, tenant=tenant)

params = {
Expand Down
91 changes: 82 additions & 9 deletions src/datapilot/core/platforms/dbt/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from datapilot.core.platforms.dbt.formatting import generate_project_insights_table
from datapilot.core.platforms.dbt.utils import load_catalog
from datapilot.core.platforms.dbt.utils import load_manifest
from datapilot.core.platforms.dbt.utils import load_run_results
from datapilot.core.platforms.dbt.utils import load_semantic_manifest
from datapilot.core.platforms.dbt.utils import load_sources
from datapilot.utils.formatting.utils import tabulate_data
from datapilot.utils.utils import map_url_to_instance

Expand Down Expand Up @@ -155,6 +158,9 @@ def project_health(
)
@click.option("--manifest-path", required=True, prompt="Manifest Path", help="Path to the manifest file.")
@click.option("--catalog-path", required=False, prompt=False, help="Path to the catalog file.")
@click.option("--run-results-path", required=False, prompt=False, help="Path to the run_results.json file.")
@click.option("--sources-path", required=False, prompt=False, help="Path to the sources.json file (source freshness results).")
@click.option("--semantic-manifest-path", required=False, prompt=False, help="Path to the semantic_manifest.json file.")
def onboard(
token,
instance_name,
Expand All @@ -164,6 +170,9 @@ def onboard(
dbt_integration_environment,
manifest_path,
catalog_path,
run_results_path,
sources_path,
semantic_manifest_path,
):
"""Onboard a manifest file to DBT. You can specify either --dbt_integration_id or --dbt_integration_name."""

Expand Down Expand Up @@ -198,34 +207,98 @@ def onboard(
elif dbt_integration_name and dbt_integration_id:
click.echo("Warning: Both integration ID and name provided. Using ID and ignoring name.")

# Validate manifest (required)
try:
load_manifest(manifest_path)
except Exception as e:
click.echo(f"Error: {e}")
return

# Validate optional artifacts if provided
if catalog_path:
try:
load_catalog(catalog_path)
except Exception as e:
click.echo(f"Error validating catalog: {e}")
return

if run_results_path:
try:
load_run_results(run_results_path)
except Exception as e:
click.echo(f"Error validating run_results: {e}")
return

if sources_path:
try:
load_sources(sources_path)
except Exception as e:
click.echo(f"Error validating sources: {e}")
return

if semantic_manifest_path:
try:
load_semantic_manifest(semantic_manifest_path)
except Exception as e:
click.echo(f"Error validating semantic_manifest: {e}")
return

# Onboard manifest (required)
response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "manifest", manifest_path, backend_url)
if response["ok"]:
click.echo("Manifest onboarded successfully!")
else:
click.echo(f"{response['message']}")

if not catalog_path:
return

response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "catalog", catalog_path, backend_url)
if response["ok"]:
click.echo("Catalog onboarded successfully!")
else:
click.echo(f"{response['message']}")
# Onboard optional artifacts
artifacts_uploaded = ["manifest"]

if catalog_path:
response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "catalog", catalog_path, backend_url)
if response["ok"]:
click.echo("Catalog onboarded successfully!")
artifacts_uploaded.append("catalog")
else:
click.echo(f"{response['message']}")

if run_results_path:
response = onboard_file(
token, instance_name, dbt_integration_id, dbt_integration_environment, "run_results", run_results_path, backend_url
)
if response["ok"]:
click.echo("Run results onboarded successfully!")
artifacts_uploaded.append("run_results")
else:
click.echo(f"{response['message']}")

if sources_path:
response = onboard_file(token, instance_name, dbt_integration_id, dbt_integration_environment, "sources", sources_path, backend_url)
if response["ok"]:
click.echo("Sources onboarded successfully!")
artifacts_uploaded.append("sources")
else:
click.echo(f"{response['message']}")

if semantic_manifest_path:
response = onboard_file(
token, instance_name, dbt_integration_id, dbt_integration_environment, "semantic_manifest", semantic_manifest_path, backend_url
)
if response["ok"]:
click.echo("Semantic manifest onboarded successfully!")
artifacts_uploaded.append("semantic_manifest")
else:
click.echo(f"{response['message']}")
Comment on lines +275 to +283

This comment was marked as outdated.

Comment on lines +275 to +283
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantic_manifest_path artifact is uploaded without validation, unlike the other artifacts (manifest, catalog, run_results, sources). This inconsistency could lead to invalid files being uploaded. Consider adding validation for semantic_manifest before uploading, similar to lines 217-236.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for semantic_manifest only. dbt-artifacts-library doesn't have a parser for it, so are storing it as-it-is.

Comment on lines +275 to +283

This comment was marked as outdated.


# Start ingestion
response = start_dbt_ingestion(token, instance_name, dbt_integration_id, dbt_integration_environment, backend_url)
if response["ok"]:
url = map_url_to_instance(backend_url, instance_name)
artifacts_str = ", ".join(artifacts_uploaded)
if not url:
click.echo("Manifest and catalog ingestion has started.")
click.echo(f"Ingestion has started for: {artifacts_str}")
else:
url = f"{url}/settings/integrations/{dbt_integration_id}/{dbt_integration_environment}"
click.echo(f"Manifest and catalog ingestion has started. You can check the status at {url}")
click.echo(f"Ingestion has started for: {artifacts_str}. You can check the status at {url}")
else:
click.echo(f"{response['message']}")
44 changes: 44 additions & 0 deletions src/datapilot/core/platforms/dbt/schemas/run_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Union

from pydantic import ConfigDict

from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v1 import RunResultsV1 as BaseRunResultsV1
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v2 import RunResultsV2 as BaseRunResultsV2
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v3 import RunResultsV3 as BaseRunResultsV3
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v4 import RunResultsV4 as BaseRunResultsV4
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v5 import RunResultsV5 as BaseRunResultsV5
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v6 import RunResultsV6 as BaseRunResultsV6


class RunResultsV1(BaseRunResultsV1):
model_config = ConfigDict(extra="allow")


class RunResultsV2(BaseRunResultsV2):
model_config = ConfigDict(extra="allow")


class RunResultsV3(BaseRunResultsV3):
model_config = ConfigDict(extra="allow")


class RunResultsV4(BaseRunResultsV4):
model_config = ConfigDict(extra="allow")


class RunResultsV5(BaseRunResultsV5):
model_config = ConfigDict(extra="allow")


class RunResultsV6(BaseRunResultsV6):
model_config = ConfigDict(extra="allow")
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The model_config = ConfigDict(extra="allow") is redundant here since the base classes (BaseRunResultsV1 through BaseRunResultsV6) already have this configuration set. While this doesn't cause issues, it adds unnecessary repetition. Consider removing these redundant configurations unless there's a specific need to override them.

Copilot uses AI. Check for mistakes.


RunResults = Union[
RunResultsV6,
RunResultsV5,
RunResultsV4,
RunResultsV3,
RunResultsV2,
RunResultsV1,
]
12 changes: 12 additions & 0 deletions src/datapilot/core/platforms/dbt/schemas/semantic_manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Union

from pydantic import ConfigDict

from vendor.dbt_artifacts_parser.parsers.semantic_manifest.semantic_manifest_v1 import SemanticManifestV1 as BaseSemanticManifestV1


class SemanticManifestV1(BaseSemanticManifestV1):
model_config = ConfigDict(extra="allow")


SemanticManifest = Union[SemanticManifestV1]
26 changes: 26 additions & 0 deletions src/datapilot/core/platforms/dbt/schemas/sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Union

from pydantic import ConfigDict

from vendor.dbt_artifacts_parser.parsers.sources.sources_v1 import SourcesV1 as BaseSourcesV1
from vendor.dbt_artifacts_parser.parsers.sources.sources_v2 import SourcesV2 as BaseSourcesV2
from vendor.dbt_artifacts_parser.parsers.sources.sources_v3 import SourcesV3 as BaseSourcesV3


class SourcesV1(BaseSourcesV1):
model_config = ConfigDict(extra="allow")


class SourcesV2(BaseSourcesV2):
model_config = ConfigDict(extra="allow")


class SourcesV3(BaseSourcesV3):
model_config = ConfigDict(extra="allow")
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The model_config = ConfigDict(extra="allow") is redundant here since the base classes (BaseSourcesV1 through BaseSourcesV3) already have this configuration set. While this doesn't cause issues, it adds unnecessary repetition. Consider removing these redundant configurations unless there's a specific need to override them.

Suggested change
from pydantic import ConfigDict
from vendor.dbt_artifacts_parser.parsers.sources.sources_v1 import SourcesV1 as BaseSourcesV1
from vendor.dbt_artifacts_parser.parsers.sources.sources_v2 import SourcesV2 as BaseSourcesV2
from vendor.dbt_artifacts_parser.parsers.sources.sources_v3 import SourcesV3 as BaseSourcesV3
class SourcesV1(BaseSourcesV1):
model_config = ConfigDict(extra="allow")
class SourcesV2(BaseSourcesV2):
model_config = ConfigDict(extra="allow")
class SourcesV3(BaseSourcesV3):
model_config = ConfigDict(extra="allow")
from vendor.dbt_artifacts_parser.parsers.sources.sources_v1 import SourcesV1 as BaseSourcesV1
from vendor.dbt_artifacts_parser.parsers.sources.sources_v2 import SourcesV2 as BaseSourcesV2
from vendor.dbt_artifacts_parser.parsers.sources.sources_v3 import SourcesV3 as BaseSourcesV3
class SourcesV1(BaseSourcesV1):
...
class SourcesV2(BaseSourcesV2):
...
class SourcesV3(BaseSourcesV3):
...

Copilot uses AI. Check for mistakes.


Sources = Union[
SourcesV3,
SourcesV2,
SourcesV1,
]
54 changes: 52 additions & 2 deletions src/datapilot/core/platforms/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@
from datapilot.core.platforms.dbt.schemas.manifest import AltimateManifestSourceNode
from datapilot.core.platforms.dbt.schemas.manifest import AltimateManifestTestNode
from datapilot.core.platforms.dbt.schemas.manifest import Manifest
from datapilot.core.platforms.dbt.schemas.run_results import RunResults
from datapilot.core.platforms.dbt.schemas.semantic_manifest import SemanticManifest
from datapilot.core.platforms.dbt.schemas.sources import Sources
from datapilot.exceptions.exceptions import AltimateFileNotFoundError
from datapilot.exceptions.exceptions import AltimateInvalidJSONError
from datapilot.utils.utils import extract_dir_name_from_file_path
from datapilot.utils.utils import extract_folders_in_path
from datapilot.utils.utils import is_superset_path
from datapilot.utils.utils import load_json
from vendor.dbt_artifacts_parser.parser import parse_manifest
from vendor.dbt_artifacts_parser.parser import parse_run_results
from vendor.dbt_artifacts_parser.parser import parse_semantic_manifest
from vendor.dbt_artifacts_parser.parser import parse_sources

MODEL_TYPE_PATTERNS = {
STAGING: r"^stg_.*", # Example: models starting with 'stg_'
Expand Down Expand Up @@ -94,8 +100,52 @@ def load_catalog(catalog_path: str) -> Catalog:
return catalog


def load_run_results(run_results_path: str) -> Manifest:
raise NotImplementedError
def load_run_results(run_results_path: str) -> RunResults:
try:
run_results_dict = load_json(run_results_path)
except FileNotFoundError as e:
raise AltimateFileNotFoundError(f"Run results file not found: {run_results_path}. Error: {e}") from e
except ValueError as e:
raise AltimateInvalidJSONError(f"Invalid JSON file: {run_results_path}. Error: {e}") from e

try:
run_results: RunResults = parse_run_results(run_results_dict)
except ValueError as e:
raise AltimateInvalidManifestError(f"Invalid run results file: {run_results_path}. Error: {e}") from e
Comment on lines +110 to +112

This comment was marked as outdated.

Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception type AltimateInvalidManifestError is misleading when used for non-manifest files like run_results. Consider creating a more generic exception type such as AltimateInvalidArtifactError or using specific exceptions for each artifact type.

Copilot uses AI. Check for mistakes.

return run_results


def load_sources(sources_path: str) -> Sources:
try:
sources_dict = load_json(sources_path)
except FileNotFoundError as e:
raise AltimateFileNotFoundError(f"Sources file not found: {sources_path}. Error: {e}") from e
except ValueError as e:
raise AltimateInvalidJSONError(f"Invalid JSON file: {sources_path}. Error: {e}") from e

try:
sources: Sources = parse_sources(sources_dict)
except ValueError as e:
raise AltimateInvalidManifestError(f"Invalid sources file: {sources_path}. Error: {e}") from e
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception type AltimateInvalidManifestError is misleading when used for non-manifest files like sources. Consider creating a more generic exception type such as AltimateInvalidArtifactError or using specific exceptions for each artifact type.

Copilot uses AI. Check for mistakes.

return sources


def load_semantic_manifest(semantic_manifest_path: str) -> SemanticManifest:
try:
semantic_manifest_dict = load_json(semantic_manifest_path)
except FileNotFoundError as e:
raise AltimateFileNotFoundError(f"Semantic manifest file not found: {semantic_manifest_path}. Error: {e}") from e
except ValueError as e:
raise AltimateInvalidJSONError(f"Invalid JSON file: {semantic_manifest_path}. Error: {e}") from e

try:
semantic_manifest: SemanticManifest = parse_semantic_manifest(semantic_manifest_dict)
except ValueError as e:
raise AltimateInvalidManifestError(f"Invalid semantic manifest file: {semantic_manifest_path}. Error: {e}") from e

return semantic_manifest


# TODO: Add tests!
Expand Down
18 changes: 18 additions & 0 deletions src/vendor/dbt_artifacts_parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v4 import RunResultsV4
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v5 import RunResultsV5
from vendor.dbt_artifacts_parser.parsers.run_results.run_results_v6 import RunResultsV6
from vendor.dbt_artifacts_parser.parsers.semantic_manifest.semantic_manifest_v1 import SemanticManifestV1
from vendor.dbt_artifacts_parser.parsers.sources.sources_v1 import SourcesV1
from vendor.dbt_artifacts_parser.parsers.sources.sources_v2 import SourcesV2
from vendor.dbt_artifacts_parser.parsers.sources.sources_v3 import SourcesV3
Expand Down Expand Up @@ -345,3 +346,20 @@ def parse_sources_v3(sources: dict) -> SourcesV3:
if dbt_schema_version == ArtifactTypes.SOURCES_V3.value.dbt_schema_version:
return SourcesV3(**sources)
raise ValueError("Not a sources.json v3")


#
# semantic-manifest
#
def parse_semantic_manifest(semantic_manifest: dict) -> SemanticManifestV1:
"""Parse semantic_manifest.json

Args:
semantic_manifest: A dict of semantic_manifest.json

Returns:
SemanticManifestV1
"""
# Semantic manifest uses a flexible schema, so we accept any valid dict
# The schema version check is optional since the format may vary
return SemanticManifestV1(**semantic_manifest)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from vendor.dbt_artifacts_parser.parsers.semantic_manifest.semantic_manifest_v1 import SemanticManifestV1

__all__ = ["SemanticManifestV1"]
Loading