Skip to content

Commit e7062de

Browse files
committed
fix: management of the submission status in JobAgent
1 parent 7c0096d commit e7062de

File tree

10 files changed

+453
-327
lines changed

10 files changed

+453
-327
lines changed

src/DIRAC/Resources/Computing/InProcessComputingElement.py

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515

1616
class InProcessComputingElement(ComputingElement):
17-
#############################################################################
1817
def __init__(self, ceUniqueID):
1918
"""Standard constructor."""
2019
super().__init__(ceUniqueID)
@@ -25,14 +24,14 @@ def __init__(self, ceUniqueID):
2524
self.processors = int(self.ceParameters.get("NumberOfProcessors", 1))
2625
self.ceParameters["MaxTotalJobs"] = 1
2726

28-
#############################################################################
2927
def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
3028
"""Method to submit job (overriding base method).
3129
3230
:param str executableFile: file to execute via systemCall.
3331
Normally the JobWrapperTemplate when invoked by the JobAgent.
3432
:param str proxy: the proxy used for running the job (the payload). It will be dumped to a file.
3533
:param list inputs: dependencies of executableFile
34+
:return: S_OK(payload exit code) / S_ERROR() if submission issue
3635
"""
3736
payloadEnv = dict(os.environ)
3837
payloadProxy = ""
@@ -79,34 +78,33 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
7978
for inputFile in inputs:
8079
os.unlink(inputFile)
8180

82-
ret = S_OK()
83-
81+
# Submission issue
8482
if not result["OK"]:
8583
self.log.error("Fail to run InProcess", result["Message"])
86-
elif result["Value"][0] > 128:
87-
res = S_ERROR()
88-
# negative exit values are returned as 256 - exit
89-
res["Value"] = result["Value"][0] - 256 # yes, it's "correct"
90-
self.log.warn("InProcess Job Execution Failed")
91-
self.log.info("Exit status:", result["Value"])
92-
if res["Value"] == -2:
93-
error = "JobWrapper initialization error"
94-
elif res["Value"] == -1:
95-
error = "JobWrapper execution error"
84+
return S_ERROR(f"Failed to run InProcess: {result['Message']}")
85+
86+
retCode = result["Value"][0]
87+
# Submission issue
88+
if retCode > 128:
89+
# Negative exit values are returned as 256 - exit
90+
retCodeSubmission = retCode - 256 # yes, it's "correct"
91+
self.log.warn("Job Execution Failed")
92+
self.log.info("Exit status:", retCode)
93+
if retCodeSubmission == -2:
94+
errorMessage = "JobWrapper initialization error"
95+
elif retCodeSubmission == -1:
96+
errorMessage = "JobWrapper execution error"
9697
else:
97-
error = "InProcess Job Execution Failed"
98-
res["Message"] = error
99-
return res
100-
elif result["Value"][0] > 0:
98+
errorMessage = "Job Execution Failed"
99+
return S_ERROR(errorMessage)
100+
101+
# Submission ok but payload failed
102+
if retCode:
101103
self.log.warn("Fail in payload execution")
102-
self.log.info("Exit status:", result["Value"][0])
103-
ret["PayloadFailed"] = result["Value"][0]
104-
else:
105-
self.log.debug("InProcess CE result OK")
106104

107-
return ret
105+
self.log.info("Exit status:", retCode)
106+
return S_OK(retCode)
108107

109-
#############################################################################
110108
def getCEStatus(self):
111109
"""Method to return information on running and waiting jobs,
112110
as well as number of available processors

src/DIRAC/Resources/Computing/PoolComputingElement.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
2020
**Code Documentation**
2121
"""
22+
import functools
2223
import os
2324
import concurrent.futures
2425

@@ -85,6 +86,9 @@ def _reset(self):
8586

8687
self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors))
8788
self.ceParameters["MaxTotalJobs"] = self.processors
89+
# Indicates that the submission is done asynchronously
90+
# The result is not immediately available
91+
self.ceParameters["AsyncSubmission"] = True
8892
self.innerCESubmissionType = self.ceParameters.get("InnerCESubmissionType", self.innerCESubmissionType)
8993
return S_OK()
9094

@@ -107,15 +111,18 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
107111
:param str proxy: payload proxy
108112
:param list inputs: dependencies of executableFile
109113
110-
:return: S_OK/S_ERROR of the result of the job submission
114+
:return: S_OK always. The result of the submission should be included in taskResults
111115
"""
112116

113117
if self.pPool is None:
114118
self.pPool = concurrent.futures.ProcessPoolExecutor(max_workers=self.processors)
115119

116120
processorsForJob = self._getProcessorsForJobs(kwargs)
117121
if not processorsForJob:
118-
return S_ERROR("Not enough processors for the job")
122+
self.taskResults[self.taskID] = S_ERROR("Not enough processors for the job")
123+
taskID = self.taskID
124+
self.taskID += 1
125+
return S_OK(taskID)
119126

120127
# Now persisting the job limits for later use in pilot.cfg file (pilot 3 default)
121128
cd = ConfigurationData(loadDefaultCFG=False)
@@ -141,12 +148,15 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
141148
if "USER" in os.environ:
142149
taskKwargs["PayloadUser"] = os.environ["USER"] + f"p{str(nUser).zfill(2)}"
143150

151+
# Submission
144152
future = self.pPool.submit(executeJob, executableFile, proxy, self.taskID, inputs, **taskKwargs)
145153
self.processorsPerTask[future] = processorsForJob
154+
future.add_done_callback(functools.partial(self.finalizeJob, self.taskID))
155+
156+
taskID = self.taskID
146157
self.taskID += 1
147-
future.add_done_callback(self.finalizeJob)
148158

149-
return S_OK() # returning S_OK as callback will do the rest
159+
return S_OK(taskID) # returning S_OK as callback will do the rest
150160

151161
def _getProcessorsForJobs(self, kwargs):
152162
"""helper function"""
@@ -187,7 +197,7 @@ def _getProcessorsForJobs(self, kwargs):
187197

188198
return requestedProcessors
189199

190-
def finalizeJob(self, future):
200+
def finalizeJob(self, taskID, future):
191201
"""Finalize the job by updating the process utilisation counters
192202
193203
:param future: evaluating the future result
@@ -196,10 +206,10 @@ def finalizeJob(self, future):
196206

197207
result = future.result() # This would be the result of the e.g. InProcess.submitJob()
198208
if result["OK"]:
199-
self.log.info("Task %s finished successfully, %d processor(s) freed" % (future, nProc))
209+
self.log.info("Task finished successfully:", f"{taskID}; {nProc} processor(s) freed")
200210
else:
201-
self.log.error("Task failed submission", f"{future}, message: {result['Message']}")
202-
self.taskResults[future] = result
211+
self.log.error("Task failed submission:", f"{taskID}; message: {result['Message']}")
212+
self.taskResults[taskID] = result
203213

204214
def getCEStatus(self):
205215
"""Method to return information on running and waiting jobs,

src/DIRAC/Resources/Computing/SingularityComputingElement.py

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -218,17 +218,13 @@ def __createWorkArea(self, jobDesc=None, log=None, logLevel="INFO", proxy=None):
218218
os.mkdir(self.__workdir)
219219
except OSError:
220220
if not os.path.isdir(self.__workdir):
221-
result = S_ERROR(f"Failed to create container base directory '{self.__workdir}'")
222-
result["ReschedulePayload"] = True
223-
return result
221+
return S_ERROR(f"Failed to create container base directory '{self.__workdir}'")
224222
# Otherwise, directory probably just already exists...
225223
baseDir = None
226224
try:
227225
baseDir = tempfile.mkdtemp(prefix=f"job{jobDesc.get('jobID', 0)}_", dir=self.__workdir)
228226
except OSError:
229-
result = S_ERROR(f"Failed to create container work directory in '{self.__workdir}'")
230-
result["ReschedulePayload"] = True
231-
return result
227+
return S_ERROR(f"Failed to create container work directory in '{self.__workdir}'")
232228

233229
self.log.debug(f"Use singularity workarea: {baseDir}")
234230
for subdir in ["home", "tmp", "var_tmp"]:
@@ -259,7 +255,6 @@ def __createWorkArea(self, jobDesc=None, log=None, logLevel="INFO", proxy=None):
259255
extraOptions="" if self.__installDIRACInContainer else "/tmp/pilot.cfg",
260256
)
261257
if not result["OK"]:
262-
result["ReschedulePayload"] = True
263258
return result
264259
wrapperPath = result["Value"]
265260

@@ -334,30 +329,23 @@ def __checkResult(tmpDir):
334329
retCode = int(fp.read())
335330
except (OSError, ValueError):
336331
# Something failed while trying to get the return code
337-
result = S_ERROR("Failed to get return code from inner wrapper")
338-
result["ReschedulePayload"] = True
339-
return result
332+
return S_ERROR("Failed to get return code from inner wrapper")
340333

341-
result = S_OK()
342-
if retCode:
343-
# This is the one case where we don't reschedule:
344-
# An actual failure of the inner payload for some reason
345-
result = S_ERROR("Command failed with exit code %d" % retCode)
346-
return result
334+
return S_OK(retCode)
347335

348336
def submitJob(self, executableFile, proxy=None, **kwargs):
349337
"""Start a container for a job.
350338
executableFile is ignored. A new wrapper suitable for running in a
351339
container is created from jobDesc.
340+
341+
:return: S_OK(payload exit code) / S_ERROR() if submission issue
352342
"""
353343
rootImage = self.__root
354344

355345
# Check that singularity is available
356346
if not self.__hasSingularity():
357347
self.log.error("Singularity is not installed on PATH.")
358-
result = S_ERROR("Failed to find singularity ")
359-
result["ReschedulePayload"] = True
360-
return result
348+
return S_ERROR("Failed to find singularity")
361349

362350
self.log.info("Creating singularity container")
363351

@@ -448,9 +436,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
448436
else:
449437
# if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS)
450438
self.log.error("Singularity image to exec not found: ", rootImage)
451-
result = S_ERROR("Failed to find singularity image to exec")
452-
result["ReschedulePayload"] = True
453-
return result
439+
return S_ERROR("Failed to find singularity image to exec")
454440

455441
self.log.debug(f"Execute singularity command: {cmd}")
456442
self.log.debug(f"Execute singularity env: {self.__getEnv()}")
@@ -463,9 +449,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
463449
if proxy and renewTask:
464450
gThreadScheduler.removeTask(renewTask)
465451
self.__deleteWorkArea(baseDir)
466-
result = S_ERROR("Error running singularity command")
467-
result["ReschedulePayload"] = True
468-
return result
452+
return S_ERROR("Error running singularity command")
469453

470454
result = self.__checkResult(tmpDir)
471455
if proxy and renewTask:

src/DIRAC/Resources/Computing/SudoComputingElement.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
3232
:param str executableFile: file to execute via systemCall.
3333
Normally the JobWrapperTemplate when invoked by the JobAgent.
3434
:param str proxy: the proxy used for running the job (the payload). It will be dumped to a file.
35+
:return: S_OK(payload exit code) / S_ERROR() if submission issue
3536
"""
3637
payloadProxy = ""
3738
if proxy:
@@ -152,20 +153,30 @@ def sudoExecute(self, executableFile, payloadProxy, payloadUsername, payloadUID,
152153
result = shellCall(0, cmd, callbackFunction=self.sendOutput)
153154
self.runningJobs -= 1
154155
if not result["OK"]:
155-
result["Value"] = (0, "", "")
156-
return result
156+
self.log.error("Fail to run Sudo", result["Message"])
157+
return S_ERROR(f"Failed to run Sudo: {result['Message']}")
158+
159+
retCode = result["Value"][0]
160+
# Submission issue
161+
if retCode > 128:
162+
# Negative exit values are returned as 256 - exit
163+
retCodeSubmission = retCode - 256 # yes, it's "correct"
164+
self.log.warn("Job Execution Failed")
165+
self.log.info("Exit status:", retCode)
166+
if retCodeSubmission == -2:
167+
errorMessage = "JobWrapper initialization error"
168+
elif retCodeSubmission == -1:
169+
errorMessage = "JobWrapper execution error"
170+
else:
171+
errorMessage = "Job Execution Failed"
172+
return S_ERROR(errorMessage)
157173

158-
resultTuple = result["Value"]
159-
status = resultTuple[0]
160-
stdOutput = resultTuple[1]
161-
stdError = resultTuple[2]
162-
self.log.info(f"Status after the sudo execution is {str(status)}")
163-
if status > 128:
164-
error = S_ERROR(status)
165-
error["Value"] = (status, stdOutput, stdError)
166-
return error
174+
# Submission ok but payload failed
175+
if retCode:
176+
self.log.warn("Fail in payload execution")
167177

168-
return result
178+
self.log.info("Exit status:", retCode)
179+
return S_OK(retCode)
169180

170181
#############################################################################
171182
def getCEStatus(self):

0 commit comments

Comments
 (0)