Skip to content

Commit 8b2c72d

Browse files
authored
Added databricks labs ucx ensure-assessment-run to CLI commands. (#708)
Closes #674 Description: This change adds a CLI command that performs the following logic. 1. Check whether UCX is installed for the current user on the workspace 2. Check whether the Assessment workflow was successfully run or is still running 3. Wait for a current run to complete or start a new run and wait for it to complete.
1 parent 46ea0f1 commit 8b2c72d

File tree

5 files changed

+129
-21
lines changed

5 files changed

+129
-21
lines changed

labs.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,8 @@ commands:
4444
- name: create-table-mapping
4545
description: create initial table mapping for review
4646

47+
- name: ensure-assessment-run
48+
description: ensure the assessment job was run on a workspace
49+
4750
- name: validate-external-locations
4851
description: validates and provides mapping to external table to external location and shared generation tf scripts

src/databricks/labs/ucx/cli.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import webbrowser
55

66
from databricks.sdk import WorkspaceClient
7-
from databricks.sdk.errors import NotFound
87

98
from databricks.labs.ucx.account import AccountWorkspaces, WorkspaceInfo
109
from databricks.labs.ucx.config import AccountConfig, ConnectConfig
@@ -17,6 +16,11 @@
1716

1817
logger = logging.getLogger("databricks.labs.ucx")
1918

19+
CANT_FIND_UCX_MSG = (
20+
"Couldn't find UCX configuration in the user's home folder. "
21+
"Make sure the current user has configured and installed UCX."
22+
)
23+
2024

2125
def workflows():
2226
ws = WorkspaceClient()
@@ -47,18 +51,13 @@ def skip(schema: str, table: str | None = None):
4751
logger.error("--Schema is a required parameter.")
4852
return None
4953
ws = WorkspaceClient()
50-
installation_manager = WorkspaceInstaller(ws)
51-
logger.info("Fetching installation config.")
52-
try:
53-
warehouse_id = installation_manager._current_config.warehouse_id
54-
sql_backend = StatementExecutionBackend(ws, warehouse_id)
55-
except NotFound:
56-
logger.error(
57-
"Couldn't find UCX configuration in the user's home folder. "
58-
"Make sure the current user has configured and installed UCX."
59-
)
54+
installation_manager = InstallationManager(ws)
55+
installation = installation_manager.for_user(ws.current_user.me())
56+
if not installation:
57+
logger.error(CANT_FIND_UCX_MSG)
6058
return None
61-
59+
warehouse_id = installation.config.warehouse_id
60+
sql_backend = StatementExecutionBackend(ws, warehouse_id)
6261
mapping = TableMapping(ws)
6362
if table:
6463
mapping.skip_table(sql_backend, schema, table)
@@ -102,6 +101,18 @@ def validate_external_locations():
102101
webbrowser.open(f"{ws.config.host}/#workspace{path}")
103102

104103

104+
def ensure_assessment_run():
105+
ws = WorkspaceClient()
106+
installation_manager = InstallationManager(ws)
107+
installation = installation_manager.for_user(ws.current_user.me())
108+
if not installation:
109+
logger.error(CANT_FIND_UCX_MSG)
110+
return None
111+
else:
112+
workspace_installer = WorkspaceInstaller(ws)
113+
workspace_installer.validate_and_run("assessment")
114+
115+
105116
MAPPING = {
106117
"open-remote-config": open_remote_config,
107118
"installations": list_installations,
@@ -110,6 +121,7 @@ def validate_external_locations():
110121
"manual-workspace-info": manual_workspace_info,
111122
"create-table-mapping": create_table_mapping,
112123
"validate-external-locations": validate_external_locations,
124+
"ensure-assessment-run": ensure_assessment_run,
113125
"skip": skip,
114126
}
115127

src/databricks/labs/ucx/install.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
PermissionDenied,
1818
)
1919
from databricks.sdk.service import compute, jobs
20+
from databricks.sdk.service.jobs import RunLifeCycleState, RunResultState
2021
from databricks.sdk.service.sql import EndpointInfoWarehouseType, SpotInstancePolicy
2122
from databricks.sdk.service.workspace import ImportFormat
2223

@@ -860,6 +861,29 @@ def _remove_install_folder(self):
860861
except InvalidParameterValue:
861862
logger.error("Error deleting install folder")
862863

864+
def validate_step(self, step: str) -> bool:
865+
job_id = self._state.jobs[step]
866+
logger.debug(f"Validating {step} workflow: {self._ws.config.host}#job/{job_id}")
867+
current_runs = list(self._ws.jobs.list_runs(completed_only=False, job_id=job_id))
868+
for run in current_runs:
869+
if run.state and run.state.result_state == RunResultState.SUCCESS:
870+
return True
871+
for run in current_runs:
872+
if (
873+
run.run_id
874+
and run.state
875+
and run.state.life_cycle_state in (RunLifeCycleState.RUNNING, RunLifeCycleState.PENDING)
876+
):
877+
logger.info("Identified a run in progress waiting for run completion")
878+
self._ws.jobs.wait_get_run_job_terminated_or_skipped(run_id=run.run_id)
879+
run_new_state = self._ws.jobs.get_run(run_id=run.run_id).state
880+
return run_new_state is not None and run_new_state.result_state == RunResultState.SUCCESS
881+
return False
882+
883+
def validate_and_run(self, step: str):
884+
if not self.validate_step(step):
885+
self.run_workflow(step)
886+
863887

864888
if __name__ == "__main__":
865889
ws = WorkspaceClient(product="ucx", product_version=__version__)

tests/unit/installer/test_installation_manager.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
import io
2+
from unittest.mock import MagicMock
23

34
import pytest
45
from databricks.sdk.errors import NotFound
5-
from databricks.sdk.service.iam import User
6+
from databricks.sdk.service.iam import ComplexValue, User
7+
from databricks.sdk.service.jobs import (
8+
BaseRun,
9+
RunLifeCycleState,
10+
RunResultState,
11+
RunState,
12+
)
613

714
from databricks.labs.ucx.framework.parallel import ManyError
15+
from databricks.labs.ucx.install import WorkspaceInstaller
816
from databricks.labs.ucx.installer import InstallationManager
917

1018

@@ -53,3 +61,62 @@ def test_corrupt_config(mocker):
5361
installation_manager = InstallationManager(ws)
5462
user_installations = installation_manager.user_installations()
5563
assert len(user_installations) == 0
64+
65+
66+
def test_validate_assessment(mocker):
67+
ws = mocker.patch("databricks.sdk.WorkspaceClient.__init__")
68+
current_user = MagicMock()
69+
current_user.me.return_value = User(user_name="foo", groups=[ComplexValue(display="admins")])
70+
71+
state = MagicMock()
72+
state.jobs = {"assessment": 123}
73+
74+
ws.current_user = current_user
75+
ws.jobs.list_runs.return_value = [
76+
BaseRun(run_id=123, state=RunState(result_state=RunResultState.SUCCESS)),
77+
BaseRun(run_id=111, state=RunState(result_state=RunResultState.FAILED)),
78+
]
79+
ws.jobs.wait_get_run_job_terminated_or_skipped = MagicMock(return_value=None)
80+
installation_manager = WorkspaceInstaller(ws)
81+
installation_manager._state = state
82+
83+
assert installation_manager.validate_step("assessment")
84+
85+
ws.jobs.list_runs.return_value = [
86+
BaseRun(run_id=123, state=RunState(result_state=RunResultState.FAILED)),
87+
BaseRun(run_id=111, state=RunState(result_state=RunResultState.FAILED)),
88+
]
89+
90+
assert not installation_manager.validate_step("assessment")
91+
92+
ws.jobs.list_runs.return_value = [
93+
BaseRun(run_id=123, state=RunState(result_state=RunResultState.FAILED)),
94+
BaseRun(run_id=111, state=RunState(life_cycle_state=RunLifeCycleState.RUNNING)),
95+
]
96+
97+
installation_manager.validate_step("assessment")
98+
ws.jobs.wait_get_run_job_terminated_or_skipped.assert_called()
99+
100+
101+
def test_validate_run_assessment(mocker):
102+
ws = mocker.patch("databricks.sdk.WorkspaceClient.__init__")
103+
current_user = MagicMock()
104+
current_user.me.return_value = User(user_name="foo", groups=[ComplexValue(display="admins")])
105+
106+
state = MagicMock()
107+
state.jobs = {"assessment": 123}
108+
109+
ws.current_user = current_user
110+
installation_manager = WorkspaceInstaller(ws)
111+
installation_manager._state = state
112+
installation_manager.validate_step = MagicMock(return_value=True)
113+
# Test a use case where assessment ran successfully
114+
installation_manager.validate_and_run("assessment")
115+
installation_manager.validate_step.assert_called_with("assessment")
116+
117+
# Test a use case where assessment didn't run successfully
118+
installation_manager.run_workflow = MagicMock()
119+
installation_manager.validate_step = MagicMock(return_value=False)
120+
installation_manager.validate_and_run("assessment")
121+
installation_manager.validate_step.assert_called_with("assessment")
122+
installation_manager.run_workflow.assert_called_with("assessment")

tests/unit/test_cli.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
from unittest.mock import MagicMock
2+
13
import pytest
24
from databricks.sdk.errors import NotFound
35
from databricks.sdk.service import iam
4-
from databricks.sdk.service.iam import User
6+
from databricks.sdk.service.iam import ComplexValue, User
57

68
from databricks.labs.ucx.cli import skip
79

@@ -23,12 +25,12 @@ def test_skip_no_schema(mocker, caplog):
2325

2426
def test_skip_no_ucx(caplog, mocker):
2527
mocker.patch("databricks.sdk.WorkspaceClient.__init__", return_value=None)
26-
mocker.patch("databricks.labs.ucx.install.WorkspaceInstaller.__init__", return_value=None)
27-
mocker.patch("databricks.labs.ucx.install.WorkspaceInstaller._current_config", return_value="foo")
28-
mocker.patch(
29-
"databricks.labs.ucx.framework.crawlers.StatementExecutionBackend.__init__",
30-
return_value=None,
31-
side_effect=NotFound("..."),
32-
)
28+
current_user = MagicMock()
29+
current_user.me.return_value = User(user_name="foo", groups=[ComplexValue(display="admins")])
30+
current_user.return_value = None
31+
mocker.patch("databricks.sdk.WorkspaceClient.current_user", return_value=current_user)
32+
# ws.current_user = current_user
33+
mocker.patch("databricks.labs.ucx.installer.InstallationManager.__init__", return_value=None)
34+
mocker.patch("databricks.labs.ucx.installer.InstallationManager.for_user", return_value=None)
3335
skip(schema="schema", table="table")
3436
assert [rec.message for rec in caplog.records if "UCX configuration" in rec.message]

0 commit comments

Comments
 (0)