1+ import json
12import os
23import logging
34import logging_utils
@@ -99,6 +100,9 @@ def log_job_configs(self, users_list=None, groups_list = None, log_file='jobs.lo
99100 jobs_log = self .get_export_dir () + log_file
100101 acl_jobs_log = self .get_export_dir () + acl_file
101102 error_logger = logging_utils .get_error_logger (wmconstants .WM_EXPORT , wmconstants .JOB_OBJECT , self .get_export_dir ())
103+ acl_error_logger = logging_utils .get_error_logger (
104+ wmconstants .WM_EXPORT , wmconstants .JOB_ACL_OBJECT , self .get_export_dir ()
105+ )
102106 # pinned by cluster_user is a flag per cluster
103107 jl_full = self .get_jobs_list (False )
104108 if users_list :
@@ -116,18 +120,42 @@ def log_job_configs(self, users_list=None, groups_list = None, log_file='jobs.lo
116120 job_settings ['name' ] = new_job_name
117121 # reset the original struct with the new settings
118122 x ['settings' ] = job_settings
119- log_fp .write (json .dumps (x ) + '\n ' )
123+
124+ # get ACLs and check that the job has one owner before writing
120125 job_perms = self .get (f'/preview/permissions/jobs/{ job_id } ' )
121126 if not logging_utils .log_response_error (error_logger , job_perms ):
122- job_perms ['job_name' ] = new_job_name
123- acl_fp .write (json .dumps (job_perms ) + '\n ' )
127+ valid_acl = False
128+ acls = job_perms .get ("access_control_list" )
129+ if acls :
130+ for acl in acls :
131+ for permission in acl .get ("all_permissions" ):
132+ if permission .get ("permission_level" ) == "IS_OWNER" :
133+ valid_acl = True
134+ if valid_acl :
135+ # job and job_acl are fine, writing both to the output files
136+ log_fp .write (json .dumps (x ) + '\n ' )
137+
138+ job_perms ['job_name' ] = new_job_name
139+ acl_fp .write (json .dumps (job_perms ) + '\n ' )
140+ else :
141+ # job_acl is malformed, the job is written to error output file
142+ message = f"The following job id { job_id } has malformed permissions: { json .dumps (job_perms )} "
143+ logging .error (message )
144+ logging_utils .log_response_error (acl_error_logger , {
145+ 'error' : message
146+ })
147+ logging_utils .log_response_error (error_logger , {
148+ 'error' : message , 'json' : json .dumps (x )
149+ })
124150
125151 def import_job_configs (self , log_file = 'jobs.log' , acl_file = 'acl_jobs.log' , job_map_file = 'job_id_map.log' ):
126152 jobs_log = self .get_export_dir () + log_file
127153 acl_jobs_log = self .get_export_dir () + acl_file
128154 job_map_log = self .get_export_dir () + job_map_file
129155 error_logger = logging_utils .get_error_logger (
130156 wmconstants .WM_IMPORT , wmconstants .JOB_OBJECT , self .get_export_dir ())
157+ job_acl_error_logger = logging_utils .get_error_logger (
158+ wmconstants .WM_IMPORT , wmconstants .JOB_ACL_OBJECT , self .get_export_dir ())
131159 if not os .path .exists (jobs_log ):
132160 logging .info ("No job configurations to import." )
133161 return
@@ -243,10 +271,13 @@ def adjust_ids_for_cluster(settings): #job_settings or task_settings
243271 acl_perms = self .build_acl_args (acl_conf ['access_control_list' ], True )
244272 acl_create_args = {'access_control_list' : acl_perms }
245273 acl_resp = self .patch (api , acl_create_args )
246- if not logging_utils .log_response_error (error_logger , acl_resp ) and 'object_id' in acl_conf :
274+ if not logging_utils .log_response_error (job_acl_error_logger , acl_resp ) and 'object_id' in acl_conf :
247275 checkpoint_job_configs_set .write (acl_conf ['object_id' ])
248276 else :
249- raise RuntimeError ("Import job has failed. Refer to the previous log messages to investigate." )
277+ if self .is_skip_failed ():
278+ logging .error (f"Skipped { acl_conf } " )
279+ else :
280+ raise RuntimeError ("Import job has failed. Refer to the previous log messages to investigate." )
250281 # update the imported job names
251282 self .update_imported_job_names (error_logger , checkpoint_job_configs_set )
252283
0 commit comments