Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies = [
"databricks-sdk~=0.29.0",
"sqlglot==25.8.1",
"databricks-labs-blueprint[yaml]>=0.2.3",
"databricks-labs-lsql>=0.4.3",
"databricks-labs-lsql>=0.7.5",
"cryptography>=41.0.3",
]

Expand Down
139 changes: 82 additions & 57 deletions src/databricks/labs/remorph/deployment/dashboard.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,107 @@
import json
import logging
from datetime import timedelta
from importlib.abc import Traversable
from typing import Any
from pathlib import Path

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.installer import InstallState
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError
from databricks.sdk.errors import InvalidParameterValue
from databricks.sdk.retries import retried
from databricks.sdk.service.dashboards import Dashboard
from databricks.sdk.errors import InvalidParameterValue, NotFound, DatabricksError, ResourceAlreadyExists

from databricks.sdk.service.dashboards import LifecycleState
from databricks.labs.lsql.dashboards import DashboardMetadata, Dashboards

from databricks.labs.remorph.config import ReconcileMetadataConfig

logger = logging.getLogger(__name__)


class DashboardDeployment:
_UPLOAD_TIMEOUT = timedelta(seconds=30)

def __init__(self, ws: WorkspaceClient, installation: Installation, install_state: InstallState):
def __init__(
self,
ws: WorkspaceClient,
installation: Installation,
install_state: InstallState,
):
self._ws = ws
self._installation = installation
self._install_state = install_state

def deploy(self, name: str, dashboard_file: Traversable, parameters: dict[str, Any] | None = None):
logger.debug(f"Deploying dashboard {name} from {dashboard_file.name}")
dashboard_data = self._substitute_params(dashboard_file, parameters or {})
dashboard = self._update_or_create_dashboard(name, dashboard_data, dashboard_file)
logger.info(f"Dashboard deployed with dashboard_id {dashboard.dashboard_id}")
logger.info(f"Dashboard URL: {self._ws.config.host}/sql/dashboardsv3/{dashboard.dashboard_id}")
def _handle_existing_dashboard(self, dashboard_id: str, display_name: str, parent_path: str) -> str | None:
try:
dashboard = self._ws.lakeview.get(dashboard_id)

if dashboard.lifecycle_state is None:
raise NotFound(f"Dashboard life cycle state: {display_name} ({dashboard_id})")
if dashboard.lifecycle_state == LifecycleState.TRASHED:
logger.info(f"Recreating trashed dashboard: {display_name} ({dashboard_id})")
return None # Recreate the dashboard if it is trashed (manually)

except (NotFound, InvalidParameterValue):
logger.info(f"Recovering invalid dashboard: {display_name} ({dashboard_id})")
try:
dashboard_path = f"{parent_path}/{display_name}.lvdash.json"
self._ws.workspace.delete(dashboard_path) # Cannot recreate dashboard if file still exists
logger.debug(f"Deleted dangling dashboard {display_name} ({dashboard_id}): {dashboard_path}")
except NotFound:
pass
return None # Recreate the dashboard if it's reference is corrupted (manually)
return dashboard_id # Update the existing dashboard

def deploy(
self,
name: str,
folder: Path,
config: ReconcileMetadataConfig,
):
"""Create a dashboard from Queries inside folder"""
logger.info(f"Deploying dashboard {name} from {folder}")
parent_path = f"{self._installation.install_folder()}/dashboards"
try:
self._ws.workspace.mkdirs(parent_path)
except ResourceAlreadyExists:
pass

metadata = DashboardMetadata.from_path(folder).replace_database(
catalog=config.catalog,
catalog_to_replace="remorph_catalog",
database=config.schema,
database_to_replace="remorph_schema",
)

metadata.display_name = self._name_with_prefix(metadata.display_name)

dashboard_id = self._install_state.dashboards.get(name)
if dashboard_id is not None:
dashboard_id = self._handle_existing_dashboard(dashboard_id, metadata.display_name, parent_path)

# dashboard_data = self._substitute_params(dashboard_file, parameters or {})
dashboard_id = self._update_or_create_dashboard(name, dashboard_id, metadata, parent_path)
logger.info(f"Dashboard deployed with dashboard_id {dashboard_id}")
logger.info(f"Dashboard URL: {self._ws.config.host}/sql/dashboardsv3/{dashboard_id}")
self._install_state.save()

@retried(on=[DatabricksError], timeout=_UPLOAD_TIMEOUT)
def _update_or_create_dashboard(self, name: str, dashboard_data, dashboard_file) -> Dashboard:
if name in self._install_state.dashboards:
try:
dashboard_id = self._install_state.dashboards[name]
logger.info(f"Updating dashboard with id={dashboard_id}")
updated_dashboard = self._ws.lakeview.update(
dashboard_id,
display_name=self._name_with_prefix(name),
serialized_dashboard=dashboard_data,
)
return updated_dashboard
except InvalidParameterValue:
del self._install_state.dashboards[name]
logger.warning(f"Dashboard {name} does not exist anymore for some reason.")
return self._update_or_create_dashboard(name, dashboard_data, dashboard_file)
logger.info(f"Creating new dashboard {name}")
new_dashboard = self._ws.lakeview.create(
display_name=self._name_with_prefix(name),
parent_path=self._install_state.install_folder(),
serialized_dashboard=dashboard_data,
def _update_or_create_dashboard(
self,
name: str,
dashboard_id: str,
metadata: DashboardMetadata,
parent_path: str,
) -> str:

dashboard = Dashboards(self._ws).create_dashboard(
metadata,
dashboard_id=dashboard_id,
parent_path=parent_path,
warehouse_id=self._ws.config.warehouse_id,
publish=True,
)
assert new_dashboard.dashboard_id is not None
self._install_state.dashboards[name] = new_dashboard.dashboard_id
return new_dashboard

def _substitute_params(self, dashboard_file: Traversable, parameters: dict[str, Any]) -> str:
if not parameters:
return dashboard_file.read_text()

with dashboard_file.open() as f:
dashboard_data = json.load(f)

for dataset in dashboard_data.get("datasets", []):
for param in dataset.get("parameters", []):
if param["keyword"] in parameters:
param["defaultSelection"] = {
"values": {
"dataType": "STRING",
"values": [
{"value": parameters[param["keyword"]]},
],
},
}

return json.dumps(dashboard_data)
assert dashboard.dashboard_id is not None
self._install_state.dashboards[name] = dashboard.dashboard_id
return dashboard.dashboard_id

def _name_with_prefix(self, name: str) -> str:
prefix = self._installation.product()
Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/remorph/deployment/installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def _upload_wheel(self):
def install(self, config: RemorphConfigs):
wheel_paths: list[str] = self._upload_wheel()
if config.reconcile:
logger.info("Installing Remorph reconcile Metadata components.")
self._recon_deployment.install(config.reconcile, wheel_paths)
self._apply_upgrades()

Expand Down
13 changes: 4 additions & 9 deletions src/databricks/labs/remorph/deployment/recon.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from databricks.labs.blueprint.wheels import ProductInfo
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import InvalidParameterValue, NotFound

from databricks.labs.blueprint.wheels import find_project_root
import databricks.labs.remorph.resources
from databricks.labs.remorph.config import ReconcileConfig
from databricks.labs.remorph.deployment.dashboard import DashboardDeployment
Expand Down Expand Up @@ -41,6 +41,7 @@ def __init__(

def install(self, recon_config: ReconcileConfig | None, wheel_paths: list[str]):
if not recon_config:
logger.warning("Recon Config is empty")
return
logger.info("Installing reconcile components.")
self._deploy_tables(recon_config)
Expand Down Expand Up @@ -98,15 +99,9 @@ def _deploy_dashboards(self, recon_config: ReconcileConfig):
continue

def _deploy_recon_metrics_dashboard(self, name: str, recon_config: ReconcileConfig):
dashboard_params = {
"catalog": recon_config.metadata_config.catalog,
"schema": recon_config.metadata_config.schema,
}

reconcile_dashboard_path = "reconcile/dashboards/Remorph-Reconciliation.lvdash.json"
dashboard_file = files(databricks.labs.remorph.resources).joinpath(reconcile_dashboard_path)
queries_folder = find_project_root(__file__) / "src/databricks/labs/remorph/resources/reconcile/dashboards"
logger.info(f"Creating Reconciliation Dashboard `{name}`")
self._dashboard_deployer.deploy(name, dashboard_file, parameters=dashboard_params)
self._dashboard_deployer.deploy(name, queries_folder, recon_config.metadata_config)

def _get_dashboards(self) -> list[tuple[str, str]]:
return [
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Aggregates Reconcile Table Metrics
### It provides the following information:

* Mismatch
* Missing in Source
* Missing in Target
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
columns:
- recon_id
- dd_recon_id
type: MULTI_SELECT
title: Recon Id
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
columns:
- source_table
type: MULTI_SELECT
title: Source Table Name
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
columns:
- target_table
type: MULTI_SELECT
title: Target Table Name
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
columns:
- source_type
type: MULTI_SELECT
title: Source Type
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
columns:
- executed_by
type: MULTI_SELECT
title: Executed by
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
columns:
- start_ts
title: Started At
type: DATE_RANGE_PICKER
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* --title 'Aggregates Summary Table' --width 6 --height 6 */
SELECT
main.recon_id,
main.source_type,
main.source_table.`catalog` AS source_catalog,
main.source_table.`schema` AS source_schema,
main.source_table.table_name AS source_table_name,
IF(
ISNULL(source_catalog),
CONCAT_WS('.', source_schema, source_table_name),
CONCAT_WS(
'.',
source_catalog,
source_schema,
source_table_name
)
) AS source_table,
main.target_table.`catalog` AS target_catalog,
main.target_table.`schema` AS target_schema,
main.target_table.table_name AS target_table_name,
CONCAT_WS(
'.',
target_catalog,
target_schema,
target_table_name
) AS target_table,
UPPER(rules.rule_info.agg_type) || CONCAT('(', rules.rule_info.agg_column, ')') AS aggregate_column,
rules.rule_info.group_by_columns,
metrics.run_metrics.status AS status,
metrics.run_metrics.exception_message AS exception,
metrics.recon_metrics.missing_in_source AS missing_in_source,
metrics.recon_metrics.missing_in_target AS missing_in_target,
metrics.recon_metrics.mismatch AS mismatch,
metrics.run_metrics.run_by_user AS executed_by,
main.start_ts AS start_ts,
main.end_ts AS end_ts
FROM
remorph.reconcile.main main
INNER JOIN remorph.reconcile.aggregate_metrics metrics
INNER JOIN remorph.reconcile.aggregate_rules rules
ON main.recon_table_id = metrics.recon_table_id
AND rules.rule_id = metrics.rule_id
ORDER BY
metrics.inserted_ts DESC,
main.recon_id,
main.target_table.table_name
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Drill Down
### The Aggregates Reconcile details table contains all the sample records information of mismatches and missing entries.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
columns:
- dd_recon_id
type: MULTI_SELECT
title: Recon Id
width: 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
columns:
- dd_recon_type
type: MULTI_SELECT
title: Category
width: 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
columns:
- dd_aggregate_type
type: MULTI_SELECT
title: Aggregate Type
width: 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
columns:
- dd_target_table
type: MULTI_SELECT
title: Target Table Name
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
columns:
- dd_source_table
type: MULTI_SELECT
title: Source Table Name
Loading