Skip to content

Commit 914f0b8

Browse files
Added tests for issue #786. (#815)
1 parent 31acd2b commit 914f0b8

File tree

2 files changed

+198
-26
lines changed

2 files changed

+198
-26
lines changed

src/databricks/labs/ucx/install.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,7 @@ def _job_clusters(self, names: set[str]):
799799

800800
@staticmethod
801801
def _readable_timedelta(epoch):
802-
when = datetime.fromtimestamp(epoch)
802+
when = datetime.utcfromtimestamp(epoch)
803803
duration = datetime.now() - when
804804
data = {}
805805
data["days"], remaining = divmod(duration.total_seconds(), 86_400)
@@ -808,41 +808,41 @@ def _readable_timedelta(epoch):
808808

809809
time_parts = ((name, round(value)) for name, value in data.items())
810810
time_parts = [f"{value} {name[:-1] if value == 1 else name}" for name, value in time_parts if value > 0]
811-
time_parts.append("ago")
811+
if len(time_parts) > 0:
812+
time_parts.append("ago")
812813
if time_parts:
813814
return " ".join(time_parts)
814815
return "less than 1 second ago"
815816

816817
def latest_job_status(self) -> list[dict]:
817818
latest_status = []
818819
for step, job_id in self._state.jobs.items():
820+
job_state = None
821+
start_time = None
819822
try:
820-
step_status = self._step_status(job_id, step)
821-
latest_status.append(step_status)
823+
job_runs = list(self._ws.jobs.list_runs(job_id=int(job_id), limit=1))
822824
except InvalidParameterValue as e:
823825
logger.warning(f"skipping {step}: {e}")
824826
continue
827+
if job_runs:
828+
state = job_runs[0].state
829+
if state and state.result_state:
830+
job_state = state.result_state.name
831+
elif state and state.life_cycle_state:
832+
job_state = state.life_cycle_state.name
833+
if job_runs[0].start_time:
834+
start_time = job_runs[0].start_time / 1000
835+
latest_status.append(
836+
{
837+
"step": step,
838+
"state": "UNKNOWN" if not (job_runs and job_state) else job_state,
839+
"started": (
840+
"<never run>" if not (job_runs and start_time) else self._readable_timedelta(start_time)
841+
),
842+
}
843+
)
825844
return latest_status
826845

827-
def _step_status(self, job_id, step):
828-
job_state = None
829-
start_time = None
830-
job_runs = list(self._ws.jobs.list_runs(job_id=int(job_id), limit=1))
831-
if job_runs:
832-
state = job_runs[0].state
833-
job_state = None
834-
if state and state.result_state:
835-
job_state = state.result_state.name
836-
elif state and state.life_cycle_state:
837-
job_state = state.life_cycle_state.name
838-
if job_runs[0].start_time:
839-
start_time = job_runs[0].start_time / 1000
840-
return {
841-
"step": step,
842-
"state": "UNKNOWN" if not (job_runs and job_state) else job_state,
843-
"started": "<never run>" if not job_runs else self._readable_timedelta(start_time),
844-
}
845-
846846
def _get_result_state(self, job_id):
847847
job_runs = list(self._ws.jobs.list_runs(job_id=job_id, limit=1))
848848
latest_job_run = job_runs[0]

tests/unit/test_install.py

Lines changed: 175 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import json
2-
from datetime import timedelta
3-
from unittest.mock import MagicMock, create_autospec
2+
from datetime import datetime, timedelta
3+
from unittest.mock import MagicMock, create_autospec, patch
44

55
import pytest
66
from databricks.labs.blueprint.installation import Installation, MockInstallation
@@ -18,7 +18,12 @@
1818
)
1919
from databricks.sdk.service import compute, iam, jobs, sql
2020
from databricks.sdk.service.compute import CreatePolicyResponse, Policy, State
21-
from databricks.sdk.service.jobs import BaseRun, RunResultState, RunState
21+
from databricks.sdk.service.jobs import (
22+
BaseRun,
23+
RunLifeCycleState,
24+
RunResultState,
25+
RunState,
26+
)
2227
from databricks.sdk.service.sql import (
2328
Dashboard,
2429
DataSource,
@@ -1045,3 +1050,170 @@ def test_repair_run_result_state(ws, caplog, mock_installation_with_jobs, any_pr
10451050

10461051
workspace_installation.repair_run("assessment")
10471052
assert "Please try after sometime" in caplog.text
1053+
1054+
1055+
@pytest.mark.parametrize(
1056+
"state,expected",
1057+
[
1058+
(
1059+
RunState(
1060+
result_state=None,
1061+
life_cycle_state=RunLifeCycleState.RUNNING,
1062+
),
1063+
"RUNNING",
1064+
),
1065+
(
1066+
RunState(
1067+
result_state=RunResultState.SUCCESS,
1068+
life_cycle_state=RunLifeCycleState.TERMINATED,
1069+
),
1070+
"SUCCESS",
1071+
),
1072+
(
1073+
RunState(
1074+
result_state=RunResultState.FAILED,
1075+
life_cycle_state=RunLifeCycleState.TERMINATED,
1076+
),
1077+
"FAILED",
1078+
),
1079+
(
1080+
RunState(
1081+
result_state=None,
1082+
life_cycle_state=None,
1083+
),
1084+
"UNKNOWN",
1085+
),
1086+
],
1087+
)
1088+
def test_latest_job_status_states(ws, mock_installation_with_jobs, any_prompt, state, expected):
1089+
base = [
1090+
BaseRun(
1091+
job_id=123,
1092+
run_name="assessment",
1093+
state=state,
1094+
start_time=1704114000000,
1095+
)
1096+
]
1097+
sql_backend = MockBackend()
1098+
wheels = create_autospec(WheelsV2)
1099+
config = WorkspaceConfig(inventory_database='ucx')
1100+
timeout = timedelta(seconds=1)
1101+
workspace_installation = WorkspaceInstallation(
1102+
config, mock_installation_with_jobs, sql_backend, wheels, ws, any_prompt, timeout
1103+
)
1104+
ws.jobs.list_runs.return_value = base
1105+
status = workspace_installation.latest_job_status()
1106+
assert len(status) == 1
1107+
assert status[0]["state"] == expected
1108+
1109+
1110+
@patch(f"{databricks.labs.ucx.install.__name__}.datetime", wraps=datetime)
1111+
@pytest.mark.parametrize(
1112+
"start_time,expected",
1113+
[
1114+
(1704114000000, "1 hour ago"), # 2024-01-01 13:00:00
1115+
(1704117600000, "less than 1 second ago"), # 2024-01-01 14:00:00
1116+
(1704116990000, "10 minutes 10 seconds ago"), # 2024-01-01 13:49:50
1117+
(None, "<never run>"),
1118+
],
1119+
)
1120+
def test_latest_job_status_success_with_time(
1121+
mock_datetime, ws, mock_installation_with_jobs, any_prompt, start_time, expected
1122+
):
1123+
base = [
1124+
BaseRun(
1125+
job_id=123,
1126+
run_name="assessment",
1127+
state=RunState(
1128+
result_state=RunResultState.SUCCESS,
1129+
life_cycle_state=RunLifeCycleState.TERMINATED,
1130+
),
1131+
start_time=start_time,
1132+
)
1133+
]
1134+
sql_backend = MockBackend()
1135+
wheels = create_autospec(WheelsV2)
1136+
config = WorkspaceConfig(inventory_database='ucx')
1137+
timeout = timedelta(seconds=1)
1138+
workspace_installation = WorkspaceInstallation(
1139+
config, mock_installation_with_jobs, sql_backend, wheels, ws, any_prompt, timeout
1140+
)
1141+
ws.jobs.list_runs.return_value = base
1142+
faked_now = datetime(2024, 1, 1, 14, 0, 0)
1143+
mock_datetime.now.return_value = faked_now
1144+
status = workspace_installation.latest_job_status()
1145+
assert status[0]["started"] == expected
1146+
1147+
1148+
def test_latest_job_status_list(ws, any_prompt):
1149+
runs = [
1150+
[
1151+
BaseRun(
1152+
job_id=1,
1153+
run_name="job1",
1154+
state=RunState(
1155+
result_state=None,
1156+
life_cycle_state=RunLifeCycleState.RUNNING,
1157+
),
1158+
start_time=1705577671907,
1159+
)
1160+
],
1161+
[
1162+
BaseRun(
1163+
job_id=2,
1164+
run_name="job2",
1165+
state=RunState(
1166+
result_state=RunResultState.SUCCESS,
1167+
life_cycle_state=RunLifeCycleState.TERMINATED,
1168+
),
1169+
start_time=1705577671907,
1170+
)
1171+
],
1172+
[], # the last job has no runs
1173+
]
1174+
sql_backend = MockBackend()
1175+
wheels = create_autospec(WheelsV2)
1176+
config = WorkspaceConfig(inventory_database='ucx')
1177+
timeout = timedelta(seconds=1)
1178+
mock_installation = MockInstallation(
1179+
{'state.json': {'resources': {'jobs': {"job1": "1", "job2": "2", "job3": "3"}}}}
1180+
)
1181+
workspace_installation = WorkspaceInstallation(
1182+
config, mock_installation, sql_backend, wheels, ws, any_prompt, timeout
1183+
)
1184+
ws.jobs.list_runs.side_effect = iter(runs)
1185+
status = workspace_installation.latest_job_status()
1186+
assert len(status) == 3
1187+
assert status[0]["step"] == "job1"
1188+
assert status[0]["state"] == "RUNNING"
1189+
assert status[1]["step"] == "job2"
1190+
assert status[1]["state"] == "SUCCESS"
1191+
assert status[2]["step"] == "job3"
1192+
assert status[2]["state"] == "UNKNOWN"
1193+
1194+
1195+
def test_latest_job_status_no_job_run(ws, mock_installation_with_jobs, any_prompt):
1196+
sql_backend = MockBackend()
1197+
wheels = create_autospec(WheelsV2)
1198+
config = WorkspaceConfig(inventory_database='ucx')
1199+
timeout = timedelta(seconds=1)
1200+
workspace_installation = WorkspaceInstallation(
1201+
config, mock_installation_with_jobs, sql_backend, wheels, ws, any_prompt, timeout
1202+
)
1203+
ws.jobs.list_runs.return_value = ""
1204+
status = workspace_installation.latest_job_status()
1205+
assert len(status) == 1
1206+
assert status[0]["step"] == "assessment"
1207+
1208+
1209+
def test_latest_job_status_exception(ws, mock_installation_with_jobs, any_prompt):
1210+
sql_backend = MockBackend()
1211+
wheels = create_autospec(WheelsV2)
1212+
config = WorkspaceConfig(inventory_database='ucx')
1213+
timeout = timedelta(seconds=1)
1214+
workspace_installation = WorkspaceInstallation(
1215+
config, mock_installation_with_jobs, sql_backend, wheels, ws, any_prompt, timeout
1216+
)
1217+
ws.jobs.list_runs.side_effect = InvalidParameterValue("Workflow does not exists")
1218+
status = workspace_installation.latest_job_status()
1219+
assert len(status) == 0

0 commit comments

Comments
 (0)