33from dataclasses import dataclass
44
55from databricks .sdk import WorkspaceClient
6- from databricks .sdk .errors import NotFound
7- from databricks .sdk .service .compute import Policy
86from databricks .sdk .service .jobs import BaseJob
97
10- from databricks .labs .ucx .assessment .crawlers import (
11- _AZURE_SP_CONF_FAILURE_MSG ,
12- INCOMPATIBLE_SPARK_CONFIG_KEYS ,
13- _azure_sp_conf_in_init_scripts ,
14- _azure_sp_conf_present_check ,
15- _get_init_script_data ,
16- logger ,
17- spark_version_compatibility ,
18- )
8+ from databricks .labs .ucx .assessment .clusters import ClustersMixin
9+ from databricks .labs .ucx .assessment .crawlers import logger
1910from databricks .labs .ucx .framework .crawlers import CrawlerBase , SqlBackend
2011
2112
@@ -28,7 +19,7 @@ class JobInfo:
2819 creator : str | None = None
2920
3021
31- class JobsMixin :
22+ class JobsMixin ( ClustersMixin ) :
3223 @staticmethod
3324 def _get_cluster_configs_from_all_jobs (all_jobs , all_clusters_by_id ):
3425 for j in all_jobs :
@@ -90,30 +81,12 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[
9081 )
9182
9283 for job , cluster_config in self ._get_cluster_configs_from_all_jobs (all_jobs , all_clusters_by_id ):
93- support_status = spark_version_compatibility (cluster_config .spark_version )
9484 job_id = job .job_id
9585 if not job_id :
9686 continue
97- if support_status != "supported" :
98- job_assessment [job_id ].add (f"not supported DBR: { cluster_config .spark_version } " )
99-
100- if cluster_config .spark_conf is not None :
101- self ._job_spark_conf (cluster_config , job_assessment , job_id )
102-
103- # Checking if Azure cluster config is present in cluster policies
104- if cluster_config .policy_id :
105- policy = self ._safe_get_cluster_policy (cluster_config .policy_id )
106- if policy is None :
107- continue
108- if policy .definition :
109- if _azure_sp_conf_present_check (json .loads (policy .definition )):
110- job_assessment [job_id ].add (f"{ _AZURE_SP_CONF_FAILURE_MSG } Job cluster." )
111- if policy .policy_family_definition_overrides :
112- if _azure_sp_conf_present_check (json .loads (policy .policy_family_definition_overrides )):
113- job_assessment [job_id ].add (f"{ _AZURE_SP_CONF_FAILURE_MSG } Job cluster." )
114-
115- if cluster_config .init_scripts :
116- self ._init_scripts (cluster_config , job_assessment , job_id )
87+ cluster_failures = self ._check_cluster_failures (cluster_config )
88+ for failure in json .loads (cluster_failures .failures ):
89+ job_assessment [job_id ].add (failure )
11790
11891 # TODO: next person looking at this - rewrite, as this code makes no sense
11992 for job_key in job_details .keys (): # pylint: disable=consider-using-dict-items,consider-iterating-dictionary
@@ -122,33 +95,6 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[
12295 job_details [job_key ].success = 0
12396 return list (job_details .values ())
12497
125- def _init_scripts (self , cluster_config , job_assessment , job_id ):
126- for init_script_info in cluster_config .init_scripts :
127- init_script_data = _get_init_script_data (self ._ws , init_script_info )
128- if not init_script_data :
129- continue
130- if not _azure_sp_conf_in_init_scripts (init_script_data ):
131- continue
132- job_assessment [job_id ].add (f"{ _AZURE_SP_CONF_FAILURE_MSG } Job cluster." )
133-
134- def _job_spark_conf (self , cluster_config , job_assessment , job_id ):
135- for k in INCOMPATIBLE_SPARK_CONFIG_KEYS :
136- if k in cluster_config .spark_conf :
137- job_assessment [job_id ].add (f"unsupported config: { k } " )
138- for value in cluster_config .spark_conf .values ():
139- if "dbfs:/mnt" in value or "/dbfs/mnt" in value :
140- job_assessment [job_id ].add (f"using DBFS mount in configuration: { value } " )
141- # Checking if Azure cluster config is present in spark config
142- if _azure_sp_conf_present_check (cluster_config .spark_conf ):
143- job_assessment [job_id ].add (f"{ _AZURE_SP_CONF_FAILURE_MSG } Job cluster." )
144-
145- def _safe_get_cluster_policy (self , policy_id : str ) -> Policy | None :
146- try :
147- return self ._ws .cluster_policies .get (policy_id )
148- except NotFound :
149- logger .warning (f"The cluster policy was deleted: { policy_id } " )
150- return None
151-
15298 def snapshot (self ) -> Iterable [JobInfo ]:
15399 return self ._snapshot (self ._try_fetch , self ._crawl )
154100
0 commit comments