Skip to content

Commit 1d6220e

Browse files
authored
Merge pull request #5724 from DIRACGridBot/cherry-pick-2-cdd7b018f-integration
[sweep:integration] Fixes for setting the Jobs Status
2 parents 5a1c216 + cbf028a commit 1d6220e

File tree

10 files changed

+41
-42
lines changed

10 files changed

+41
-42
lines changed

src/DIRAC/Core/Utilities/Time.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ def toString(myDate=None):
160160
and day carries the sign.
161161
To keep internal consistency we are using:
162162
[hour]:[min]:[sec]:[microsec]
163-
where min, sec, microsec are alwys positive intergers and hour carries the
164-
sign.
163+
where min, sec, microsec are always positive integers and hour carries the sign.
165164
"""
166165
if isinstance(myDate, _dateTimeType):
167166
return str(myDate)

src/DIRAC/FrameworkSystem/Service/SystemAdministratorHandler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ def export_updateSoftware(self, version):
309309
stderr=subprocess.PIPE,
310310
universal_newlines=True,
311311
check=False,
312-
timeout=300,
312+
timeout=600,
313313
)
314314
if r.returncode != 0:
315315
stderr = [x for x in r.stderr.split("\n") if not x.startswith("Extracting : ")]

src/DIRAC/Interfaces/API/Dirac.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
5757
from DIRAC.Resources.Storage.StorageElement import StorageElement
5858
from DIRAC.Resources.Catalog.FileCatalog import FileCatalog
59+
from DIRAC.WorkloadManagementSystem.Client import JobStatus
5960
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
6061
from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient
6162
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
@@ -204,7 +205,11 @@ def retrieveRepositorySandboxes(self, requestedStates=None, destinationDirectory
204205
gLogger.warn("No repository is initialised")
205206
return S_OK()
206207
if requestedStates is None:
207-
requestedStates = ["Done", "Failed", "Completed"] # because users dont care about completed
208+
requestedStates = [
209+
JobStatus.DONE,
210+
JobStatus.FAILED,
211+
JobStatus.COMPLETED,
212+
] # because users dont care about completed
208213
jobs = self.jobRepo.readRepository()["Value"]
209214
for jobID in sorted(jobs):
210215
jobDict = jobs[jobID]

src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ def execute(self):
341341
par_name=thisp, par_value=gConfig.getValue("/LocalSite/%s" % thisp, "Unknown"), sendFlag=False
342342
)
343343

344-
jobReport.setJobStatus(status=JobStatus.MATCHED, minorStatus="Job Received by Agent", sendFlag=False)
344+
jobReport.setJobStatus(minorStatus="Job Received by Agent", sendFlag=False)
345345
result_setupProxy = self._setupProxy(ownerDN, jobGroup)
346346
if not result_setupProxy["OK"]:
347347
return self._rescheduleFailedJob(jobID, result_setupProxy["Message"], self.stopOnApplicationFailure)
@@ -526,7 +526,7 @@ def _checkInstallSoftware(self, jobID, jobParams, resourceParams, jobReport):
526526
self.log.verbose(msg)
527527
return S_OK(msg)
528528

529-
jobReport.setJobStatus(status=JobStatus.MATCHED, minorStatus="Installing Software", sendFlag=False)
529+
jobReport.setJobStatus(minorStatus="Installing Software", sendFlag=False)
530530
softwareDist = jobParams["SoftwareDistModule"]
531531
self.log.verbose("Found VO Software Distribution module", ": %s" % (softwareDist))
532532
argumentsDict = {"Job": jobParams, "CE": resourceParams}
@@ -579,7 +579,7 @@ def _submitJob(
579579
return result
580580

581581
wrapperFile = result["Value"]
582-
jobReport.setJobStatus(status=JobStatus.MATCHED, minorStatus="Submitting To CE")
582+
jobReport.setJobStatus(minorStatus="Submitting To CE")
583583

584584
self.log.info("Submitting JobWrapper", "%s to %sCE" % (os.path.basename(wrapperFile), self.ceName))
585585

src/DIRAC/WorkloadManagementSystem/Client/JobReport.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def setJob(self, jobID):
3636
self.jobID = jobID
3737

3838
def setJobStatus(self, status="", minorStatus="", applicationStatus="", sendFlag=True):
39-
"""Send job status information to the JobState service for jobID"""
39+
"""Accumulate and possibly send job status information to the JobState service"""
4040

4141
timeStamp = Time.toString()
4242
# add job status record

src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def __init__(self, state):
8585
FAILED: State(12, [RESCHEDULED, DELETED], defState=FAILED),
8686
DONE: State(11, [DELETED], defState=DONE),
8787
COMPLETED: State(10, [DONE, FAILED], defState=COMPLETED),
88-
COMPLETING: State(9, [DONE, FAILED, COMPLETED], defState=COMPLETING),
88+
COMPLETING: State(9, [DONE, FAILED, COMPLETED, STALLED, KILLED], defState=COMPLETING),
8989
STALLED: State(8, [RUNNING, FAILED, KILLED], defState=STALLED),
9090
RUNNING: State(7, [STALLED, DONE, FAILED, COMPLETING, KILLED, RECEIVED], defState=RUNNING),
9191
RESCHEDULED: State(6, [WAITING, RECEIVED, DELETED], defState=RESCHEDULED),

src/DIRAC/WorkloadManagementSystem/Client/Matcher.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
__RCSID__ = "$Id"
1010

1111
import time
12-
import re
1312

1413
from DIRAC import gLogger, convertToPy3VersionNumber
1514

@@ -18,9 +17,9 @@
1817
from DIRAC.Core.Security import Properties
1918
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
2019
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
20+
from DIRAC.WorkloadManagementSystem.Client import JobStatus
2121
from DIRAC.WorkloadManagementSystem.Client.Limiter import Limiter
2222
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
23-
2423
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, singleValueDefFields, multiValueMatchFields
2524
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
2625
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
@@ -266,7 +265,7 @@ def _reportStatus(self, resourceDict, jobID):
266265
else:
267266
self.log.verbose("Set job attributes for jobID", jobID)
268267

269-
result = self.jlDB.addLoggingRecord(jobID, status="Matched", minorStatus="Assigned", source="Matcher")
268+
result = self.jlDB.addLoggingRecord(jobID, status=JobStatus.MATCHED, minorStatus="Assigned", source="Matcher")
270269
if not result["OK"]:
271270
self.log.error(
272271
"Problem reporting job status", "addLoggingRecord, jobID = %s: %s" % (jobID, result["Message"])

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2015,7 +2015,7 @@ def setJobCommand(self, jobID, command, arguments=None):
20152015
return self._update(req)
20162016

20172017
#####################################################################################
2018-
def getJobCommand(self, jobID, status="Received"):
2018+
def getJobCommand(self, jobID, status=JobStatus.RECEIVED):
20192019
"""Get a command to be passed to the job together with the
20202020
next heart beat
20212021
"""

src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ def __init__(self, jobID=None, jobReport=None):
194194
#############################################################################
195195
def initialize(self, arguments):
196196
"""Initializes parameters and environment for job."""
197-
self.__report(status=JobStatus.RUNNING, minorStatus=JobMinorStatus.JOB_INITIALIZATION)
197+
self.__report(status=JobStatus.RUNNING, minorStatus=JobMinorStatus.JOB_INITIALIZATION, sendFlag=True)
198198
self.log.info("Starting Job Wrapper Initialization for Job", self.jobID)
199199
self.jobArgs = arguments["Job"]
200200
self.log.verbose(self.jobArgs)
@@ -371,8 +371,8 @@ def execute(self):
371371
self.log.verbose("%s = %s" % (nameEnv, valEnv))
372372

373373
if os.path.exists(executable):
374-
# it's in fact not yet running: it will be in few lines
375-
self.__report(status=JobStatus.RUNNING, minorStatus=JobMinorStatus.APPLICATION, sendFlag=True)
374+
# the actual executable is not yet running: it will be in few lines
375+
self.__report(minorStatus=JobMinorStatus.APPLICATION, sendFlag=True)
376376
spObject = Subprocess(timeout=False, bufferLimit=int(self.bufferLimit))
377377
command = executable
378378
if jobArguments:
@@ -482,9 +482,7 @@ def execute(self):
482482
self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.APP_ERRORS, sendFlag=True)
483483
if status in (DErrno.EWMSRESC, DErrno.EWMSRESC & 255): # the status will be truncated to 0xDE (222)
484484
self.log.verbose("job will be rescheduled")
485-
self.__report(
486-
status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.GOING_RESCHEDULE, sendFlag=True
487-
)
485+
self.__report(minorStatus=JobMinorStatus.GOING_RESCHEDULE, sendFlag=True)
488486
return S_ERROR(DErrno.EWMSRESC, "Job will be rescheduled")
489487

490488
else:
@@ -564,7 +562,9 @@ def __getCPUHMS(self, cpuTime):
564562
#############################################################################
565563
def resolveInputData(self):
566564
"""Input data is resolved here using a VO specific plugin module."""
567-
self.__report(status=JobStatus.RUNNING, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, sendFlag=True)
565+
self.__report(
566+
minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, sendFlag=True
567+
) # if we are here, the status should be "Running"
568568

569569
# What is this input data? - and exit if there's no input
570570
inputData = self.jobArgs["InputData"]
@@ -899,7 +899,7 @@ def __resolveOutputSandboxFiles(self, outputSandbox):
899899
def __transferOutputDataFiles(self, outputData, outputSE, outputPath):
900900
"""Performs the upload and registration in the File Catalog(s)"""
901901
self.log.verbose("Uploading output data files")
902-
self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.UPLOADING_OUTPUT_DATA)
902+
self.__report(minorStatus=JobMinorStatus.UPLOADING_OUTPUT_DATA) # the major status should be "Completing"
903903
self.log.info("Output data files %s to be uploaded to %s SE" % (", ".join(outputData), outputSE))
904904
missing = []
905905
uploaded = []
@@ -1091,7 +1091,7 @@ def transferInputSandbox(self, inputSandbox):
10911091
sandboxFiles = []
10921092
registeredISB = []
10931093
lfns = []
1094-
self.__report(status=JobStatus.RUNNING, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX)
1094+
self.__report(minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) # Should be in "Running" status
10951095
if not isinstance(inputSandbox, (list, tuple)):
10961096
inputSandbox = [inputSandbox]
10971097
for isb in inputSandbox:
@@ -1118,21 +1118,19 @@ def transferInputSandbox(self, inputSandbox):
11181118
self.log.info("Downloading Input SandBox %s" % isb)
11191119
result = SandboxStoreClient().downloadSandbox(isb)
11201120
if not result["OK"]:
1121-
self.__report(
1122-
status=JobStatus.RUNNING, minorStatus=JobMinorStatus.FAILED_DOWNLOADING_INPUT_SANDBOX
1123-
)
1121+
self.__report(minorStatus=JobMinorStatus.FAILED_DOWNLOADING_INPUT_SANDBOX)
11241122
return S_ERROR("Cannot download Input sandbox %s: %s" % (isb, result["Message"]))
11251123
else:
11261124
self.inputSandboxSize += result["Value"]
11271125

11281126
if lfns:
11291127
self.log.info("Downloading Input SandBox LFNs, number of files to get", len(lfns))
1130-
self.__report(status=JobStatus.RUNNING, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX_LFN)
1128+
self.__report(minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX_LFN)
11311129
lfns = [fname.replace("LFN:", "").replace("lfn:", "") for fname in lfns]
11321130
download = self.dm.getFile(lfns)
11331131
if not download["OK"]:
11341132
self.log.warn(download)
1135-
self.__report(status=JobStatus.RUNNING, minorStatus=JobMinorStatus.FAILED_DOWNLOADING_INPUT_SANDBOX_LFN)
1133+
self.__report(minorStatus=JobMinorStatus.FAILED_DOWNLOADING_INPUT_SANDBOX_LFN)
11361134
return S_ERROR(download["Message"])
11371135
failed = download["Value"]["Failed"]
11381136
if failed:
@@ -1303,16 +1301,6 @@ def sendFailoverRequest(self):
13031301
request.JobID = self.jobID
13041302
request.SourceComponent = "Job_%s" % self.jobID
13051303

1306-
# JobReport part first
1307-
result = self.jobReport.generateForwardDISET()
1308-
if result["OK"]:
1309-
if isinstance(result["Value"], Operation):
1310-
self.log.info("Adding a job state update DISET operation to the request")
1311-
request.addOperation(result["Value"])
1312-
else:
1313-
self.log.warn("JobReportFailure", "Could not generate a forwardDISET operation: %s" % result["Message"])
1314-
self.log.warn("JobReportFailure", "The job won't fail, but the jobLogging info might be incomplete")
1315-
13161304
# Failover transfer requests
13171305
for storedOperation in self.failoverTransfer.request:
13181306
request.addOperation(storedOperation)
@@ -1325,6 +1313,16 @@ def sendFailoverRequest(self):
13251313
for storedOperation in requestStored:
13261314
request.addOperation(storedOperation)
13271315

1316+
# JobReport part
1317+
result = self.jobReport.generateForwardDISET()
1318+
if result["OK"]:
1319+
if isinstance(result["Value"], Operation):
1320+
self.log.info("Adding a job state update DISET operation to the request")
1321+
request.addOperation(result["Value"])
1322+
else:
1323+
self.log.warn("JobReportFailure", "Could not generate a forwardDISET operation: %s" % result["Message"])
1324+
self.log.warn("JobReportFailure", "The job won't fail, but the jobLogging info might be incomplete")
1325+
13281326
if len(request):
13291327
# The request is ready, send it now
13301328
isValid = RequestValidator().validate(request)

src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,12 @@ def initializeHandler(cls, svcInfoDict):
5757
@classmethod
5858
def export_updateJobFromStager(cls, jobID, status):
5959
"""Simple call back method to be used by the stager."""
60-
if status == JobStatus.DONE:
60+
if status == "Done":
6161
jobStatus = JobStatus.CHECKING
6262
minorStatus = "JobScheduling"
63-
elif status == JobStatus.FAILED:
64-
jobStatus = JobStatus.FAILED
65-
minorStatus = "Staging input files failed"
6663
else:
67-
return S_ERROR("updateJobFromStager: %s status not known." % status)
64+
jobStatus = None
65+
minorStatus = "Staging input files failed"
6866

6967
infoStr = None
7068
trials = 10

0 commit comments

Comments
 (0)