Skip to content

Commit d2a50cf

Browse files
JCZuurmondnfx
andauthored
Verify migration progress prerequisites during UCX catalog creation (#2912)
## Changes Verify migration progress prerequisites during UCX catalog creation: - UC metastore exists. - UCX catalog exists. - A job run corresponding to the "assessment" job: - Finished successfully. - OR if pending or running, we will wait up to 1 hour for the assessment run to finish. If did still not finish successfully, we fail. Otherwise, we consider the prerequisites to be NOT matched. ### Linked issues Resolves #2816 ### Functionality - [x] modified CLI command `create-ucx-catalog` ### Tests - [x] manually tested - [x] added unit tests --------- Co-authored-by: Serge Smertin <[email protected]>
1 parent 77c0fe9 commit d2a50cf

File tree

7 files changed

+156
-61
lines changed

7 files changed

+156
-61
lines changed

src/databricks/labs/ucx/cli.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,7 @@ def create_ucx_catalog(w: WorkspaceClient, prompts: Prompts, ctx: WorkspaceConte
624624
workspace_context = ctx or WorkspaceContext(w)
625625
workspace_context.catalog_schema.create_ucx_catalog(prompts)
626626
workspace_context.progress_tracking_installation.run()
627+
workspace_context.verify_progress_tracking.verify()
627628

628629

629630
@ucx.command

src/databricks/labs/ucx/contexts/application.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler
5252
from databricks.labs.ucx.hive_metastore.verification import VerifyHasCatalog, VerifyHasMetastore
5353
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
54+
from databricks.labs.ucx.progress.install import VerifyProgressTracking
5455
from databricks.labs.ucx.source_code.graph import DependencyResolver
5556
from databricks.labs.ucx.source_code.jobs import WorkflowLinter
5657
from databricks.labs.ucx.source_code.known import KnownList
@@ -397,6 +398,10 @@ def verify_has_metastore(self) -> VerifyHasMetastore:
397398
def verify_has_ucx_catalog(self) -> VerifyHasCatalog:
398399
return VerifyHasCatalog(self.workspace_client, self.config.ucx_catalog)
399400

401+
@cached_property
402+
def verify_progress_tracking(self) -> VerifyProgressTracking:
403+
return VerifyProgressTracking(self.verify_has_metastore, self.verify_has_ucx_catalog, self.deployed_workflows)
404+
400405
@cached_property
401406
def pip_resolver(self) -> PythonLibraryResolver:
402407
return PythonLibraryResolver(self.allow_list)

src/databricks/labs/ucx/progress/install.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
import datetime as dt
12
import logging
23
from dataclasses import dataclass
34

45
from databricks.labs.lsql.backends import SqlBackend
56
from databricks.labs.lsql.deployment import SchemaDeployer
67

78
from databricks.labs.ucx.__about__ import __version__
9+
from databricks.labs.ucx.hive_metastore.verification import MetastoreNotFoundError, VerifyHasCatalog, VerifyHasMetastore
10+
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
811
from databricks.labs.ucx.progress.workflow_runs import WorkflowRun
912

1013

@@ -52,3 +55,53 @@ def run(self) -> None:
5255
self._schema_deployer.deploy_table("workflow_runs", WorkflowRun)
5356
self._schema_deployer.deploy_table("historical", Historical)
5457
logger.info("Installation completed successfully!")
58+
59+
60+
class VerifyProgressTracking:
61+
"""Verify the progress tracking is ready to be used."""
62+
63+
def __init__(
64+
self,
65+
verify_has_metastore: VerifyHasMetastore,
66+
verify_has_ucx_catalog: VerifyHasCatalog,
67+
deployed_workflows: DeployedWorkflows,
68+
) -> None:
69+
self._verify_has_metastore = verify_has_metastore
70+
self._verify_has_ucx_catalog = verify_has_ucx_catalog
71+
self._deployed_workflows = deployed_workflows
72+
73+
def verify(self, timeout=dt.timedelta(seconds=0)) -> None:
74+
"""Verify the progress tracking installation is ready to be used.
75+
76+
Prerequisites:
77+
- UC metastore exists.
78+
- UCX catalog exists.
79+
- A job run corresponding to the "assessment" job:
80+
- Finished successfully.
81+
- OR if pending or running, we will wait up to the timeout for the assessment run to finish. If it did still
82+
not finish successfully, we raise an error.
83+
84+
Otherwise, we consider the prerequisites to be NOT matched.
85+
86+
Args :
87+
timeout (datetime.timedelta) : Timeout to wait for pending or running assessment run.
88+
89+
Raises :
90+
RuntimeWarning : Signalling the prerequisites are not met.
91+
"""
92+
metastore_not_attached_message = (
93+
"Metastore not attached to workspace. Run `databricks labs ucx assign-metastore`"
94+
)
95+
try:
96+
has_metastore = self._verify_has_metastore.verify_metastore()
97+
except MetastoreNotFoundError as e:
98+
raise RuntimeWarning(metastore_not_attached_message) from e
99+
if not has_metastore:
100+
raise RuntimeWarning(metastore_not_attached_message)
101+
if not self._verify_has_ucx_catalog.verify():
102+
raise RuntimeWarning("UCX catalog not configured. Run `databricks labs ucx create-ucx-catalog`")
103+
if not self._deployed_workflows.validate_step("assessment", timeout=timeout):
104+
raise RuntimeWarning(
105+
"Assessment workflow did not complete successfully yet. "
106+
"Run `databricks labs ucx ensure-assessment-run` command"
107+
)

src/databricks/labs/ucx/progress/workflows.py

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from databricks.labs.ucx.contexts.workflow_task import RuntimeContext
44
from databricks.labs.ucx.framework.tasks import Workflow, job_task
5-
from databricks.labs.ucx.hive_metastore.verification import MetastoreNotFoundError
65

76

87
class MigrationProgress(Workflow):
@@ -111,29 +110,9 @@ def setup_table_migration(self, ctx: RuntimeContext) -> None:
111110
def verify_prerequisites(self, ctx: RuntimeContext) -> None:
112111
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled.
113112
114-
Prerequisites:
115-
- UC metastore exists.
116-
- UCX catalog exists.
117-
- A job run corresponding to the "assessment" job:
118-
- Finished successfully.
119-
- OR if pending or running, we will wait up to 1 hour for the assessment run to finish. If did still not
120-
finish successfully, we fail.
121-
122-
Otherwise, we consider the prerequisites to be NOT matched.
123-
124-
Raises :
125-
RuntimeWarning : Signalling the prerequisites are not met.
113+
We will wait up to 1 hour for the assessment run to finish if it is running or pending.
126114
"""
127-
try:
128-
has_metastore = ctx.verify_has_metastore.verify_metastore()
129-
except MetastoreNotFoundError as e:
130-
raise RuntimeWarning("Metastore not attached to workspace") from e
131-
if not has_metastore:
132-
raise RuntimeWarning("Metastore not attached to workspace")
133-
if not ctx.verify_has_ucx_catalog.verify():
134-
raise RuntimeWarning("UCX catalog not configured. Run `databricks labs ucx create-ucx-catalog` command")
135-
if not ctx.deployed_workflows.validate_step("assessment", timeout=dt.timedelta(hours=1)):
136-
raise RuntimeWarning("Assessment workflow not completed successfully")
115+
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))
137116

138117
@job_task(depends_on=[crawl_tables, verify_prerequisites], job_cluster="table_migration")
139118
def refresh_table_migration_status(self, ctx: RuntimeContext) -> None:

tests/unit/progress/test_install.py

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
from databricks.labs.ucx.progress.install import ProgressTrackingInstallation
1+
import datetime as dt
2+
from unittest.mock import create_autospec
3+
4+
import pytest
5+
6+
from databricks.labs.ucx.hive_metastore.verification import MetastoreNotFoundError, VerifyHasCatalog, VerifyHasMetastore
7+
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
8+
from databricks.labs.ucx.progress.install import ProgressTrackingInstallation, VerifyProgressTracking
29

310

411
def test_progress_tracking_installation_run_creates_progress_tracking_schema(mock_backend) -> None:
@@ -12,3 +19,74 @@ def test_progress_tracking_installation_run_creates_tables(mock_backend) -> None
1219
installation.run()
1320
# Dataclass to schema conversion is tested within the lsql package
1421
assert sum("CREATE TABLE IF NOT EXISTS" in query for query in mock_backend.queries) == 2
22+
23+
24+
def test_verify_progress_tracking_valid_prerequisites() -> None:
25+
verify_has_metastore = create_autospec(VerifyHasMetastore)
26+
verify_has_catalog = create_autospec(VerifyHasCatalog)
27+
deployed_workflows = create_autospec(DeployedWorkflows)
28+
verify_progress_tracking = VerifyProgressTracking(verify_has_metastore, verify_has_catalog, deployed_workflows)
29+
timeout = dt.timedelta(hours=1)
30+
try:
31+
verify_progress_tracking.verify(timeout=timeout)
32+
except RuntimeError as e:
33+
assert False, f"Verify progress tracking raises: {e}"
34+
else:
35+
assert True, "Valid prerequisites found"
36+
verify_has_metastore.verify_metastore.assert_called_once()
37+
verify_has_catalog.verify.assert_called_once()
38+
deployed_workflows.validate_step.assert_called_once_with("assessment", timeout=timeout)
39+
40+
41+
def test_verify_progress_tracking_raises_runtime_error_if_metastore_not_attached_to_workspace(
42+
mock_installation,
43+
) -> None:
44+
verify_has_metastore = create_autospec(VerifyHasMetastore)
45+
verify_has_metastore.verify_metastore.side_effect = MetastoreNotFoundError
46+
verify_has_catalog = create_autospec(VerifyHasCatalog)
47+
deployed_workflows = create_autospec(DeployedWorkflows)
48+
verify_progress_tracking = VerifyProgressTracking(verify_has_metastore, verify_has_catalog, deployed_workflows)
49+
with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace"):
50+
verify_progress_tracking.verify()
51+
verify_has_metastore.verify_metastore.assert_called_once()
52+
verify_has_catalog.assert_not_called()
53+
deployed_workflows.assert_not_called()
54+
55+
56+
def test_verify_progress_tracking_raises_runtime_error_if_no_metastore(mock_installation) -> None:
57+
verify_has_metastore = create_autospec(VerifyHasMetastore)
58+
verify_has_metastore.verify_metastore.return_value = False
59+
verify_has_catalog = create_autospec(VerifyHasCatalog)
60+
deployed_workflows = create_autospec(DeployedWorkflows)
61+
verify_progress_tracking = VerifyProgressTracking(verify_has_metastore, verify_has_catalog, deployed_workflows)
62+
with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace"):
63+
verify_progress_tracking.verify()
64+
verify_has_metastore.verify_metastore.assert_called_once()
65+
verify_has_catalog.assert_not_called()
66+
deployed_workflows.assert_not_called()
67+
68+
69+
def test_verify_progress_tracking_raises_runtime_error_if_missing_ucx_catalog(mock_installation) -> None:
70+
verify_has_metastore = create_autospec(VerifyHasMetastore)
71+
verify_has_catalog = create_autospec(VerifyHasCatalog)
72+
verify_has_catalog.verify.return_value = False
73+
deployed_workflows = create_autospec(DeployedWorkflows)
74+
verify_progress_tracking = VerifyProgressTracking(verify_has_metastore, verify_has_catalog, deployed_workflows)
75+
with pytest.raises(RuntimeWarning, match="UCX catalog not configured."):
76+
verify_progress_tracking.verify()
77+
verify_has_metastore.verify_metastore.assert_called_once()
78+
verify_has_catalog.verify.assert_called_once()
79+
deployed_workflows.assert_not_called()
80+
81+
82+
def test_verify_progress_tracking_raises_runtime_error_if_assessment_workflow_did_not_run(mock_installation) -> None:
83+
verify_has_metastore = create_autospec(VerifyHasMetastore)
84+
verify_has_catalog = create_autospec(VerifyHasCatalog)
85+
deployed_workflows = create_autospec(DeployedWorkflows)
86+
deployed_workflows.validate_step.return_value = False
87+
verify_progress_tracking = VerifyProgressTracking(verify_has_metastore, verify_has_catalog, deployed_workflows)
88+
with pytest.raises(RuntimeWarning, match="Assessment workflow did not complete successfully yet."):
89+
verify_progress_tracking.verify()
90+
verify_has_metastore.verify_metastore.assert_called_once()
91+
verify_has_catalog.verify.assert_called_once()
92+
deployed_workflows.validate_step.assert_called_once_with("assessment", timeout=dt.timedelta(seconds=0))

tests/unit/progress/test_workflows.py

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import pytest
44
from databricks.sdk import WorkspaceClient
5-
from databricks.sdk.errors import PermissionDenied
65
from databricks.sdk.service.catalog import CatalogInfo, MetastoreAssignment
76
from databricks.sdk.service.jobs import BaseRun, RunResultState, RunState
87

@@ -55,39 +54,10 @@ def test_migration_progress_with_valid_prerequisites(run_workflow) -> None:
5554
assert True, "Valid prerequisites found"
5655

5756

58-
def test_migration_progress_raises_runtime_error_if_metastore_not_attached_to_workflow(run_workflow) -> None:
57+
def test_migration_progress_with_invalid_prerequisites(run_workflow) -> None:
58+
"""All invalid prerequisites permutations are tested for `VerifyProgressTracking` separately."""
5959
ws = create_autospec(WorkspaceClient)
6060
ws.metastores.current.return_value = None
6161
task = MigrationProgress.verify_prerequisites
62-
with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace"):
62+
with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace."):
6363
run_workflow(task, workspace_client=ws)
64-
65-
66-
def test_migration_progress_raises_runtime_error_if_missing_permissions_to_access_metastore(run_workflow) -> None:
67-
ws = create_autospec(WorkspaceClient)
68-
ws.metastores.current.side_effect = PermissionDenied
69-
task = MigrationProgress.verify_prerequisites
70-
with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace"):
71-
run_workflow(task, workspace_client=ws)
72-
73-
74-
def test_migration_progress_raises_runtime_error_if_missing_ucx_catalog(run_workflow) -> None:
75-
ws = create_autospec(WorkspaceClient)
76-
ws.catalogs.get.return_value = None
77-
task = MigrationProgress.verify_prerequisites
78-
with pytest.raises(RuntimeWarning, match="UCX catalog not configured. .*"):
79-
run_workflow(task, workspace_client=ws)
80-
81-
82-
def test_migration_progress_raises_runtime_error_if_missing_permissions_to_access_ucx_catalog(run_workflow) -> None:
83-
ws = create_autospec(WorkspaceClient)
84-
ws.catalogs.get.side_effect = PermissionDenied
85-
task = MigrationProgress.verify_prerequisites
86-
with pytest.raises(RuntimeWarning, match="UCX catalog not configured. .*"):
87-
run_workflow(task, workspace_client=ws)
88-
89-
90-
def test_migration_progress_raises_runtime_error_if_assessment_workflow_did_not_run(run_workflow) -> None:
91-
task = MigrationProgress.verify_prerequisites
92-
with pytest.raises(RuntimeWarning, match="Assessment workflow not completed successfully"):
93-
run_workflow(task)

tests/unit/test_cli.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from databricks.sdk import AccountClient, WorkspaceClient
1313
from databricks.sdk.errors import NotFound
1414
from databricks.sdk.errors.platform import BadRequest
15-
from databricks.sdk.service import jobs, sql
15+
from databricks.sdk.service import sql
1616
from databricks.sdk.service.catalog import ExternalLocationInfo, MetastoreInfo
1717
from databricks.sdk.service.compute import ClusterDetails, ClusterSource
1818
from databricks.sdk.service.iam import ComplexValue, User
@@ -842,8 +842,8 @@ def test_revert_cluster_remap_empty(ws, caplog):
842842
ws.workspace.list.assert_called_once()
843843

844844

845-
def test_relay_logs(ws, caplog):
846-
ws.jobs.list_runs.return_value = [jobs.BaseRun(run_id=123, start_time=int(time.time()))]
845+
def test_relay_logs(ws, caplog) -> None:
846+
ws.jobs.list_runs.return_value = [Run(run_id=123, start_time=int(time.time()))]
847847
ws.workspace.list.side_effect = [
848848
[
849849
ObjectInfo(path='/Users/foo/.ucx/logs/run-123-0', object_type=ObjectType.DIRECTORY),
@@ -889,21 +889,30 @@ def test_assign_metastore_logs_account_id_and_assigns_metastore(caplog, acc_clie
889889

890890
def test_create_ucx_catalog_calls_get_catalog(ws) -> None:
891891
prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"})
892+
ws.jobs.list_runs.return_value = [Run(state=RunState(result_state=RunResultState.SUCCESS))]
892893

893894
create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws))
894895

895-
ws.catalogs.get.assert_called_once()
896+
ws.catalogs.get.assert_called()
896897

897898

898899
def test_create_ucx_catalog_creates_history_schema_and_table(ws, mock_backend) -> None:
899900
prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"})
901+
ws.jobs.list_runs.return_value = [Run(state=RunState(result_state=RunResultState.SUCCESS))]
900902

901903
create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws).replace(sql_backend=mock_backend))
902904

903905
assert len(mock_backend.queries) > 0, "No queries executed on backend"
904906
assert "CREATE SCHEMA" in mock_backend.queries[0]
905907

906908

909+
def test_create_ucx_catalog_raises_runtime_error_because_progress_tracking_prerequisites_are_not_met(ws) -> None:
910+
prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"})
911+
912+
with pytest.raises(RuntimeWarning): # Specific warning is not important here
913+
create_ucx_catalog(ws, prompts)
914+
915+
907916
@pytest.mark.parametrize("run_as_collection", [False, True])
908917
def test_migrate_tables_calls_migrate_table_job_run_now(
909918
run_as_collection,

0 commit comments

Comments
 (0)