Skip to content

Commit 0c73abc

Browse files
committed
feat: clean job files in AREX
1 parent 784f8d8 commit 0c73abc

File tree

1 file changed

+79
-42
lines changed

1 file changed

+79
-42
lines changed

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 79 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -376,32 +376,6 @@ def _uploadJobDependencies(self, arcJobID, executableFile, inputs, executables):
376376
self.log.verbose("Input correctly uploaded", fileToSubmit)
377377
return S_OK()
378378

379-
def _killJob(self, arcJobList):
380-
"""Kill the specified jobs
381-
382-
:param list arcJobList: list of ARC Job IDs
383-
"""
384-
result = self._checkSession()
385-
if not result["OK"]:
386-
self.log.error("Cannot kill jobs", result["Message"])
387-
return result
388-
389-
# List of jobs in json format for the REST query
390-
jobsJson = {"job": [{"id": job} for job in arcJobList]}
391-
392-
# Prepare the command
393-
params = {"action": "kill"}
394-
query = self._urlJoin("jobs")
395-
396-
# Killing jobs should be fast
397-
result = self._request("post", query, params=params, data=json.dumps(jobsJson))
398-
if not result["OK"]:
399-
self.log.error("Failed to kill all these jobs.", result["Message"])
400-
return S_ERROR("Failed to kill all these jobs")
401-
402-
self.log.debug("Successfully deleted jobs")
403-
return S_OK()
404-
405379
def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=None):
406380
"""Method to submit job
407381
Assume that the ARC queues are always of the format nordugrid-<batchSystem>-<queue>
@@ -469,12 +443,83 @@ def killJob(self, jobIDList):
469443
470444
:param list jobIDList: list of DIRAC Job IDs
471445
"""
446+
if not isinstance(jobIDList, list):
447+
jobIDList = [jobIDList]
472448
self.log.debug("Killing jobs", ",".join(jobIDList))
473449

474-
# List of jobs in json format for the REST query
475-
jList = [self._DiracToArcID(job) for job in jobIDList]
450+
# Convert DIRAC jobs to ARC jobs
451+
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
452+
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
476453
return self._killJob(jList)
477454

455+
def _killJob(self, arcJobList):
456+
"""Kill the specified jobs
457+
458+
:param list arcJobList: list of ARC Job IDs
459+
"""
460+
result = self._checkSession()
461+
if not result["OK"]:
462+
self.log.error("Cannot kill jobs", result["Message"])
463+
return result
464+
465+
# List of jobs in json format for the REST query
466+
jobsJson = {"job": [{"id": job} for job in arcJobList]}
467+
468+
# Prepare the command
469+
params = {"action": "kill"}
470+
query = self._urlJoin("jobs")
471+
472+
# Killing jobs should be fast
473+
result = self._request("post", query, params=params, data=json.dumps(jobsJson))
474+
if not result["OK"]:
475+
self.log.error("Failed to kill all these jobs.", result["Message"])
476+
return S_ERROR("Failed to kill all these jobs")
477+
478+
self.log.debug("Successfully deleted jobs")
479+
return S_OK()
480+
481+
#############################################################################
482+
483+
def cleanJob(self, jobIDList):
484+
"""Clean files related to the specified jobs
485+
486+
:param list jobIDList: list of DIRAC Job IDs
487+
"""
488+
if not isinstance(jobIDList, list):
489+
jobIDList = [jobIDList]
490+
self.log.debug("Cleaning jobs", ",".join(jobIDList))
491+
492+
# Convert DIRAC jobs to ARC jobs
493+
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
494+
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
495+
return self._cleanJob(jList)
496+
497+
def _cleanJob(self, arcJobList):
498+
"""Clean files related to the specified jobs
499+
500+
:param list jobIDList: list of ARC Job IDs
501+
"""
502+
result = self._checkSession()
503+
if not result["OK"]:
504+
self.log.error("Cannot clean jobs", result["Message"])
505+
return result
506+
507+
# List of jobs in json format for the REST query
508+
jobsJson = {"job": [{"id": job} for job in arcJobList]}
509+
510+
# Prepare the command
511+
params = {"action": "clean"}
512+
query = self._urlJoin("jobs")
513+
514+
# Cleaning jobs
515+
result = self._request("post", query, params=params, data=json.dumps(jobsJson))
516+
if not result["OK"]:
517+
self.log.error("Failed to clean all these jobs.", result["Message"])
518+
return S_ERROR("Failed to clean all these jobs")
519+
520+
self.log.debug("Successfully cleaned jobs")
521+
return S_OK()
522+
478523
#############################################################################
479524

480525
def getCEStatus(self):
@@ -613,14 +658,10 @@ def getJobStatus(self, jobIDList):
613658
if not isinstance(jobIDList, list):
614659
jobIDList = [jobIDList]
615660

616-
# Jobs are stored with a DIRAC stamp (":::XXXXX") appended
617-
jobList = []
618-
for j in jobIDList:
619-
job = j.split(":::")[0]
620-
jobList.append(job)
621-
622-
self.log.debug("Getting status of jobs : %s" % jobList)
623-
arcJobsJson = {"job": [{"id": self._DiracToArcID(job)} for job in jobList]}
661+
self.log.debug("Getting status of jobs:", jobIDList)
662+
# Convert DIRAC jobs to ARC jobs and encapsulate them in a dictionary for the REST query
663+
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
664+
arcJobsJson = {"job": [{"id": self._DiracToArcID(job.split(":::")[0])} for job in jobIDList]}
624665

625666
# Prepare the command
626667
params = {"action": "status"}
@@ -688,12 +729,8 @@ def getJobLog(self, jobID):
688729
self.log.error("Cannot get job logging info", result["Message"])
689730
return result
690731

691-
# Extract stamp from the Job ID
692-
if ":::" in jobID:
693-
jobID = jobID.split(":::")[0]
694-
695732
# Prepare the command: Get output files
696-
arcJob = self._DiracToArcID(jobID)
733+
arcJob = self._DiracToArcID(jobID.split(":::")[0])
697734
query = self._urlJoin(os.path.join("jobs", arcJob, "diagnose", "errors"))
698735

699736
# Submit the GET request to retrieve outputs
@@ -759,9 +796,9 @@ def getJobOutput(self, jobID, workingDirectory=None):
759796
remoteOutputs = result["Value"]
760797
self.log.debug("Outputs to get are", remoteOutputs)
761798

762-
# We assume that workingDirectory exists
763799
if not workingDirectory:
764800
if "WorkingDirectory" in self.ceParameters:
801+
# We assume that workingDirectory exists
765802
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], job)
766803
else:
767804
workingDirectory = job

0 commit comments

Comments
 (0)