|
7 | 7 | import time |
8 | 8 | import webbrowser |
9 | 9 | from dataclasses import replace |
| 10 | +from datetime import datetime |
10 | 11 | from pathlib import Path |
11 | 12 | from typing import Any |
12 | 13 |
|
@@ -860,20 +861,44 @@ def _get_ext_hms_conf_from_policy(cluster_policy): |
860 | 861 | spark_conf_dict[key[11:]] = cluster_policy[key]["value"] |
861 | 862 | return instance_profile, spark_conf_dict |
862 | 863 |
|
| 864 | + @staticmethod |
| 865 | + def _readable_timedelta(epoch): |
| 866 | + when = datetime.fromtimestamp(epoch) |
| 867 | + duration = datetime.now() - when |
| 868 | + data = {} |
| 869 | + data["days"], remaining = divmod(duration.total_seconds(), 86_400) |
| 870 | + data["hours"], remaining = divmod(remaining, 3_600) |
| 871 | + data["minutes"], data["seconds"] = divmod(remaining, 60) |
| 872 | + |
| 873 | + time_parts = ((name, round(value)) for name, value in data.items()) |
| 874 | + time_parts = [f"{value} {name[:-1] if value == 1 else name}" for name, value in time_parts if value > 0] |
| 875 | + time_parts.append("ago") |
| 876 | + if time_parts: |
| 877 | + return " ".join(time_parts) |
| 878 | + else: |
| 879 | + return "less than 1 second ago" |
| 880 | + |
863 | 881 | def latest_job_status(self) -> list[dict]: |
864 | 882 | latest_status = [] |
865 | 883 | for step, job_id in self._state.jobs.items(): |
866 | 884 | try: |
| 885 | + job_state = None |
| 886 | + start_time = None |
867 | 887 | job_runs = list(self._ws.jobs.list_runs(job_id=int(job_id), limit=1)) |
868 | | - if not job_runs: |
869 | | - continue |
870 | | - state = job_runs[0].state |
871 | | - result_state = state.result_state if state else None |
| 888 | + if job_runs: |
| 889 | + state = job_runs[0].state |
| 890 | + job_state = None |
| 891 | + if state and state.result_state: |
| 892 | + job_state = state.result_state.name |
| 893 | + elif state and state.life_cycle_state: |
| 894 | + job_state = state.life_cycle_state.name |
| 895 | + if job_runs[0].start_time: |
| 896 | + start_time = job_runs[0].start_time / 1000 |
872 | 897 | latest_status.append( |
873 | 898 | { |
874 | 899 | "step": step, |
875 | | - "state": "UNKNOWN" if not job_runs else str(result_state), |
876 | | - "started": "<never run>" if not job_runs else job_runs[0].start_time, |
| 900 | + "state": "UNKNOWN" if not (job_runs and job_state) else job_state, |
| 901 | + "started": "<never run>" if not job_runs else self._readable_timedelta(start_time), |
877 | 902 | } |
878 | 903 | ) |
879 | 904 | except InvalidParameterValue as e: |
|
0 commit comments