@@ -38,27 +38,34 @@ class JobInfo:
3838
3939
4040class JobsMixin :
41- @staticmethod
42- def _get_cluster_configs_from_all_jobs (all_jobs , all_clusters_by_id ): # pylint: disable=too-complex
43- for j in all_jobs :
44- if j .settings is None :
41+ @classmethod
42+ def _get_cluster_configs_from_all_jobs (cls , all_jobs , all_clusters_by_id ):
43+ for job in all_jobs :
44+ if job .settings is None :
4545 continue
46- if j .settings .job_clusters is not None :
47- for job_cluster in j .settings .job_clusters :
48- if job_cluster .new_cluster is None :
49- continue
50- yield j , job_cluster .new_cluster
51- if j .settings .tasks is None :
46+ if job .settings .job_clusters is not None :
47+ yield from cls ._job_clusters (job )
48+ if job .settings .tasks is None :
5249 continue
53- for task in j .settings .tasks :
54- if task .existing_cluster_id is not None :
55- interactive_cluster = all_clusters_by_id .get (task .existing_cluster_id , None )
56- if interactive_cluster is None :
57- continue
58- yield j , interactive_cluster
50+ yield from cls ._task_clusters (job , all_clusters_by_id )
5951
60- elif task .new_cluster is not None :
61- yield j , task .new_cluster
52+ @classmethod
53+ def _task_clusters (cls , job , all_clusters_by_id ):
54+ for task in job .settings .tasks :
55+ if task .existing_cluster_id is not None :
56+ interactive_cluster = all_clusters_by_id .get (task .existing_cluster_id , None )
57+ if interactive_cluster is None :
58+ continue
59+ yield job , interactive_cluster
60+ elif task .new_cluster is not None :
61+ yield job , task .new_cluster
62+
63+ @staticmethod
64+ def _job_clusters (job ):
65+ for job_cluster in job .settings .job_clusters :
66+ if job_cluster .new_cluster is None :
67+ continue
68+ yield job , job_cluster .new_cluster
6269
6370
6471class JobsCrawler (CrawlerBase [JobInfo ], JobsMixin , CheckClusterMixin ):
@@ -299,22 +306,10 @@ def _assess_job_runs(self, submit_runs: Iterable[BaseRun], all_clusters_by_id) -
299306 runs_per_hash : dict [str , list [int | None ]] = {}
300307
301308 for submit_run in submit_runs :
302- task_failures = []
309+ task_failures : list [ str ] = []
303310 # v2.1+ API, with tasks
304311 if submit_run .tasks :
305- all_tasks : list [RunTask ] = submit_run .tasks
306- for task in sorted (all_tasks , key = lambda x : x .task_key if x .task_key is not None else "" ):
307- _task_key = task .task_key if task .task_key is not None else ""
308- _cluster_details = None
309- if task .new_cluster :
310- _cluster_details = ClusterDetails .from_dict (task .new_cluster .as_dict ())
311- if self ._needs_compatibility_check (task .new_cluster ):
312- task_failures .append ("no data security mode specified" )
313- if task .existing_cluster_id :
314- _cluster_details = all_clusters_by_id .get (task .existing_cluster_id , None )
315- if _cluster_details :
316- task_failures .extend (self ._check_cluster_failures (_cluster_details , _task_key ))
317-
312+ self ._check_run_task (submit_run .tasks , all_clusters_by_id , task_failures )
318313 # v2.0 API, without tasks
319314 elif submit_run .cluster_spec :
320315 _cluster_details = ClusterDetails .from_dict (submit_run .cluster_spec .as_dict ())
@@ -324,11 +319,23 @@ def _assess_job_runs(self, submit_runs: Iterable[BaseRun], all_clusters_by_id) -
324319 runs_per_hash [hashed_id ].append (submit_run .run_id )
325320 else :
326321 runs_per_hash [hashed_id ] = [submit_run .run_id ]
327-
328322 result [hashed_id ] = SubmitRunInfo (
329323 run_ids = json .dumps (runs_per_hash [hashed_id ]),
330324 hashed_id = hashed_id ,
331325 failures = json .dumps (list (set (task_failures ))),
332326 )
333327
334328 return list (result .values ())
329+
330+ def _check_run_task (self , all_tasks : list [RunTask ], clusters : dict [str , ClusterDetails ], task_failures : list [str ]):
331+ for task in sorted (all_tasks , key = lambda x : x .task_key if x .task_key is not None else "" ):
332+ _task_key = task .task_key if task .task_key is not None else ""
333+ cluster_details = None
334+ if task .new_cluster :
335+ cluster_details = ClusterDetails .from_dict (task .new_cluster .as_dict ())
336+ if self ._needs_compatibility_check (task .new_cluster ):
337+ task_failures .append ("no data security mode specified" )
338+ if task .existing_cluster_id :
339+ cluster_details = clusters .get (task .existing_cluster_id , None )
340+ if cluster_details :
341+ task_failures .extend (self ._check_cluster_failures (cluster_details , _task_key ))
0 commit comments