Skip to content

Commit 5ff8bc1

Browse files
authored
Added databricks labs ucx repair-run --step ... CLI command for repair run of any failed workflows, like assessment, migrate-groups etc. (#724)
1 parent 24d4acb commit 5ff8bc1

File tree

6 files changed

+152
-1
lines changed

6 files changed

+152
-1
lines changed

labs.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ commands:
5050
- name: validate-external-locations
5151
description: validates and provides mapping to external table to external location and shared generation tf scripts
5252

53+
- name: repair-run
54+
description: Repair Run the Failed Job
55+
flags:
56+
- name: step
57+
description: name of the step
58+
5359
- name: revert-migrated-tables
5460
description: remove notation on a migrated table for re-migration
5561
flags:

src/databricks/labs/ucx/cli.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,15 @@ def ensure_assessment_run():
114114
workspace_installer.validate_and_run("assessment")
115115

116116

117+
def repair_run(step):
118+
if not step:
119+
raise KeyError("You did not specify --step")
120+
ws = WorkspaceClient()
121+
installer = WorkspaceInstaller(ws)
122+
logger.info(f"Repair Running {step} Job")
123+
installer.repair_run(step)
124+
125+
117126
def revert_migrated_tables(schema: str, table: str, *, delete_managed: bool = False):
118127
ws = WorkspaceClient()
119128
prompts = Prompts()
@@ -149,6 +158,7 @@ def revert_migrated_tables(schema: str, table: str, *, delete_managed: bool = Fa
149158
"validate-external-locations": validate_external_locations,
150159
"ensure-assessment-run": ensure_assessment_run,
151160
"skip": skip,
161+
"repair-run": repair_run,
152162
"revert-migrated-tables": revert_migrated_tables,
153163
}
154164

src/databricks/labs/ucx/install.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,29 @@ def latest_job_status(self) -> list[dict]:
813813
continue
814814
return latest_status
815815

816+
def repair_run(self, workflow):
817+
try:
818+
job_id = self._state.jobs.get(workflow)
819+
if not job_id:
820+
logger.warning(f"{workflow} job does not exists hence skipping Repair Run")
821+
return
822+
job_runs = list(self._ws.jobs.list_runs(job_id=job_id, limit=1))
823+
if not job_runs:
824+
logger.warning(f"{workflow} job is not initialized yet. Can't trigger repair run now")
825+
return
826+
latest_job_run = job_runs[0]
827+
state = latest_job_run.state
828+
if state.result_state.value != "FAILED":
829+
logger.warning(f"{workflow} job is not in FAILED state hence skipping Repair Run")
830+
return
831+
run_id = latest_job_run.run_id
832+
job_url = f"{self._ws.config.host}#job/{job_id}/run/{run_id}"
833+
logger.debug(f"Repair Running {workflow} job: {job_url}")
834+
self._ws.jobs.repair_run(run_id=run_id, rerun_all_failed_tasks=True)
835+
webbrowser.open(job_url)
836+
except InvalidParameterValue as e:
837+
logger.warning(f"skipping {workflow}: {e}")
838+
816839
def uninstall(self):
817840
if self._prompts and not self._prompts.confirm(
818841
"Do you want to uninstall ucx from the workspace too, this would "

tests/integration/test_installation.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,25 @@ def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation
149149
ws.groups.get(ws_group_a.id)
150150

151151

152+
@retried(on=[NotFound, InvalidParameterValue, OperationFailed], timeout=timedelta(minutes=10))
153+
def test_repair_run_workflow_job(ws, mocker, new_installation, sql_backend):
154+
install = new_installation()
155+
mocker.patch("webbrowser.open")
156+
sql_backend.execute(f"DROP SCHEMA {install.current_config.inventory_database} CASCADE")
157+
with pytest.raises(OperationFailed):
158+
install.run_workflow("099-destroy-schema")
159+
160+
sql_backend.execute(f"CREATE SCHEMA IF NOT EXISTS {install.current_config.inventory_database}")
161+
162+
install.repair_run("099-destroy-schema")
163+
workflow_job_id = install._state.jobs["099-destroy-schema"]
164+
run_status = None
165+
while run_status is None:
166+
job_runs = list(install._ws.jobs.list_runs(job_id=workflow_job_id, limit=1))
167+
run_status = job_runs[0].state.result_state
168+
assert run_status.value == "SUCCESS"
169+
170+
152171
@retried(on=[NotFound], timeout=timedelta(minutes=5))
153172
def test_uninstallation(ws, sql_backend, new_installation):
154173
install = new_installation()

tests/unit/test_cli.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from databricks.sdk.service import iam
66
from databricks.sdk.service.iam import ComplexValue, User
77

8-
from databricks.labs.ucx.cli import skip
8+
from databricks.labs.ucx.cli import repair_run, skip
99

1010

1111
@pytest.fixture
@@ -33,3 +33,21 @@ def test_skip_no_ucx(caplog, mocker):
3333
mocker.patch("databricks.labs.ucx.installer.InstallationManager.for_user", return_value=None)
3434
skip(schema="schema", table="table")
3535
assert [rec.message for rec in caplog.records if "UCX configuration" in rec.message]
36+
37+
38+
def test_repair_run(mocker, caplog):
39+
mocker.patch("databricks.sdk.WorkspaceClient.__init__", return_value=None)
40+
mocker.patch("databricks.labs.ucx.install.WorkspaceInstaller.__init__", return_value=None)
41+
mocker.patch("databricks.labs.ucx.install.WorkspaceInstaller.repair_run", return_value=None)
42+
repair_run("assessment")
43+
assert caplog.messages == []
44+
45+
46+
def test_no_step_in_repair_run(mocker, caplog):
47+
mocker.patch("databricks.sdk.WorkspaceClient.__init__", return_value=None)
48+
mocker.patch("databricks.labs.ucx.install.WorkspaceInstaller.__init__", return_value=None)
49+
mocker.patch("databricks.labs.ucx.install.WorkspaceInstaller.repair_run", return_value=None)
50+
try:
51+
repair_run("")
52+
except KeyError as e:
53+
assert e.args[0] == "You did not specify --step"

tests/unit/test_install.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
GlobalInitScriptDetailsWithContent,
1818
Policy,
1919
)
20+
from databricks.sdk.service.jobs import BaseRun, RunResultState, RunState
2021
from databricks.sdk.service.sql import (
2122
Dashboard,
2223
DataSource,
@@ -1002,3 +1003,77 @@ def test_uninstall_no_config_file(ws, mocker):
10021003
ws.workspace.download = lambda _: io.BytesIO(config_bytes)
10031004
ws.workspace.get_status.side_effect = NotFound(...)
10041005
install.uninstall()
1006+
1007+
1008+
def test_repair_run(ws, mocker):
1009+
base = [
1010+
BaseRun(
1011+
job_clusters=None,
1012+
job_id=677268692725050,
1013+
job_parameters=None,
1014+
number_in_job=725118654200173,
1015+
run_id=725118654200173,
1016+
run_name="[UCX] assessment",
1017+
state=RunState(result_state=RunResultState.FAILED),
1018+
)
1019+
]
1020+
install = WorkspaceInstaller(ws, promtps=MockPrompts({".*": ""}))
1021+
mocker.patch("webbrowser.open")
1022+
install._state.jobs = {"assessment": "123"}
1023+
ws.jobs.list_runs.return_value = base
1024+
ws.jobs.list_runs.repair_run = None
1025+
install.repair_run("assessment")
1026+
1027+
1028+
def test_repair_run_success(ws, caplog):
1029+
base = [
1030+
BaseRun(
1031+
job_clusters=None,
1032+
job_id=677268692725050,
1033+
job_parameters=None,
1034+
number_in_job=725118654200173,
1035+
run_id=725118654200173,
1036+
run_name="[UCX] assessment",
1037+
state=RunState(result_state=RunResultState.SUCCESS),
1038+
)
1039+
]
1040+
install = WorkspaceInstaller(ws)
1041+
install._state.jobs = {"assessment": "123"}
1042+
ws.jobs.list_runs.return_value = base
1043+
ws.jobs.list_runs.repair_run = None
1044+
install.repair_run("assessment")
1045+
assert "job is not in FAILED state" in caplog.text
1046+
1047+
1048+
def test_repair_run_no_job_id(ws):
1049+
base = [
1050+
BaseRun(
1051+
job_clusters=None,
1052+
job_id=677268692725050,
1053+
job_parameters=None,
1054+
number_in_job=725118654200173,
1055+
run_id=725118654200173,
1056+
run_name="[UCX] assessment",
1057+
state=RunState(result_state=RunResultState.SUCCESS),
1058+
)
1059+
]
1060+
install = WorkspaceInstaller(ws)
1061+
install._state.jobs = {"assessment": ""}
1062+
ws.jobs.list_runs.return_value = base
1063+
ws.jobs.list_runs.repair_run = None
1064+
install.repair_run("workflow")
1065+
1066+
1067+
def test_repair_run_no_job_run(ws):
1068+
install = WorkspaceInstaller(ws)
1069+
install._state.jobs = {"assessment": "677268692725050"}
1070+
ws.jobs.list_runs.return_value = ""
1071+
ws.jobs.list_runs.repair_run = None
1072+
install.repair_run("assessment")
1073+
1074+
1075+
def test_repair_run_exception(ws):
1076+
install = WorkspaceInstaller(ws)
1077+
install._state.jobs = {"assessment": "123"}
1078+
ws.jobs.list_runs.side_effect = InvalidParameterValue("Workflow does not exists")
1079+
install.repair_run("assessment")

0 commit comments

Comments
 (0)