Skip to content

Commit e81e71e

Browse files
feat: added catalog and env support (#36)
* feat: added catalog support * feat: added env * WIP * WIP * WIP * feat: remove file ids * WIP * WIP * WIP * fix: imports --------- Co-authored-by: surya <[email protected]>
1 parent a5ceeda commit e81e71e

File tree

3 files changed

+64
-8
lines changed

3 files changed

+64
-8
lines changed

src/datapilot/clients/altimate/client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def post(self, endpoint, data=None, timeout=None):
6262
response = requests.post(url, headers=headers, json=data, timeout=timeout)
6363
self.logger.debug(f"Received POST response with status: {response.status_code }")
6464

65-
return response
65+
return response.json()
6666

6767
def put(self, endpoint, data, timeout=None):
6868
url = f"{self.base_url}{endpoint}"
@@ -83,3 +83,7 @@ def get_signed_url(self, params=None):
8383
def validate_credentials(self):
8484
endpoint = "/dbt/v3/validate-credentials"
8585
return self.get(endpoint)
86+
87+
def start_dbt_ingestion(self, params=None):
88+
endpoint = "/dbt/v1/start_dbt_ingestion"
89+
return self.post(endpoint, data=params)

src/datapilot/clients/altimate/utils.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,22 @@ def validate_credentials(
4646
return api_client.validate_credentials()
4747

4848

49-
def onboard_manifest(api_token, tenant, dbt_core_integration_id, manifest_path, backend_url) -> Dict:
49+
def onboard_file(api_token, tenant, dbt_core_integration_id, dbt_core_integration_environment, file_type, file_path, backend_url) -> Dict:
5050
api_client = APIClient(api_token, base_url=backend_url, tenant=tenant)
5151

52-
params = {"dbt_core_integration_id": dbt_core_integration_id, "file_type": "manifest"}
52+
params = {
53+
"dbt_core_integration_id": dbt_core_integration_id,
54+
"dbt_core_integration_environment_type": dbt_core_integration_environment,
55+
"file_type": file_type,
56+
}
5357
signed_url_data = api_client.get_signed_url(params)
5458
if signed_url_data:
5559
signed_url = signed_url_data.get("url")
5660
file_id = signed_url_data.get("dbt_core_integration_file_id")
5761
api_client.log(f"Received signed URL: {signed_url}")
5862
api_client.log(f"Received File ID: {file_id}")
5963

60-
upload_response = upload_content_to_signed_url(manifest_path, signed_url)
64+
upload_response = upload_content_to_signed_url(file_path, signed_url)
6165

6266
if upload_response:
6367
verify_params = {"dbt_core_integration_file_id": file_id}
@@ -73,3 +77,20 @@ def onboard_manifest(api_token, tenant, dbt_core_integration_id, manifest_path,
7377
"ok": False,
7478
"message": "Error in uploading the manifest. ",
7579
}
80+
81+
82+
def start_dbt_ingestion(api_token, tenant, dbt_core_integration_id, dbt_core_integration_environment, backend_url):
83+
api_client = APIClient(api_token, base_url=backend_url, tenant=tenant)
84+
params = {
85+
"dbt_core_integration_id": dbt_core_integration_id,
86+
"dbt_core_integration_environment_type": dbt_core_integration_environment,
87+
}
88+
data = api_client.start_dbt_ingestion(params)
89+
if data and data.get("ok"):
90+
return {"ok": True}
91+
else:
92+
api_client.log("Error starting dbt ingestion worker")
93+
return {
94+
"ok": False,
95+
"message": "Error starting dbt ingestion worker. ",
96+
}

src/datapilot/core/platforms/dbt/cli/cli.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import click
44

55
from datapilot.clients.altimate.utils import check_token_and_instance
6-
from datapilot.clients.altimate.utils import onboard_manifest
6+
from datapilot.clients.altimate.utils import onboard_file
7+
from datapilot.clients.altimate.utils import start_dbt_ingestion
78
from datapilot.clients.altimate.utils import validate_credentials
89
from datapilot.config.config import load_config
910
from datapilot.core.platforms.dbt.constants import MODEL
@@ -87,9 +88,21 @@ def project_health(manifest_path, catalog_path, config_path=None, select=None):
8788
@click.option("--token", prompt="API Token", help="Your API token for authentication.")
8889
@click.option("--instance-name", prompt="Instance Name", help="Your tenant ID.")
8990
@click.option("--dbt_core_integration_id", prompt="DBT Core Integration ID", help="DBT Core Integration ID")
91+
@click.option(
92+
"--dbt_core_integration_environment", default="PROD", prompt="DBT Core Integration Environment", help="DBT Core Integration Environment"
93+
)
9094
@click.option("--manifest-path", required=True, prompt="Manifest Path", help="Path to the manifest file.")
95+
@click.option("--catalog-path", required=False, prompt=False, help="Path to the catalog file.")
9196
@click.option("--backend-url", required=False, help="Altimate's Backend URL", default="https://api.myaltimate.com")
92-
def onboard(token, instance_name, dbt_core_integration_id, manifest_path, backend_url="https://api.myaltimate.com", env=None):
97+
def onboard(
98+
token,
99+
instance_name,
100+
dbt_core_integration_id,
101+
dbt_core_integration_environment,
102+
manifest_path,
103+
catalog_path,
104+
backend_url="https://api.myaltimate.com",
105+
):
93106
"""Onboard a manifest file to DBT."""
94107
check_token_and_instance(token, instance_name)
95108

@@ -104,9 +117,27 @@ def onboard(token, instance_name, dbt_core_integration_id, manifest_path, backen
104117
click.echo(f"Error: {e}")
105118
return
106119

107-
response = onboard_manifest(token, instance_name, dbt_core_integration_id, manifest_path, backend_url)
108-
120+
response = onboard_file(
121+
token, instance_name, dbt_core_integration_id, dbt_core_integration_environment, "manifest", manifest_path, backend_url
122+
)
109123
if response["ok"]:
110124
click.echo("Manifest onboarded successfully!")
111125
else:
112126
click.echo(f"{response['message']}")
127+
128+
if not catalog_path:
129+
return
130+
131+
response = onboard_file(
132+
token, instance_name, dbt_core_integration_id, dbt_core_integration_environment, "catalog", catalog_path, backend_url
133+
)
134+
if response["ok"]:
135+
click.echo("Catalog onboarded successfully!")
136+
else:
137+
click.echo(f"{response['message']}")
138+
139+
response = start_dbt_ingestion(token, instance_name, dbt_core_integration_id, dbt_core_integration_environment, backend_url)
140+
if response["ok"]:
141+
click.echo("Onboarding completed successfully!")
142+
else:
143+
click.echo(f"{response['message']}")

0 commit comments

Comments
 (0)