Skip to content

Commit 5be5228

Browse files
authored
Merge pull request #8192 from fstagni/90_wms_fixes
[9.0] StalledJobAgent improvements
2 parents 7fe25eb + 1cf4240 commit 5be5228

File tree

8 files changed

+128
-75
lines changed

8 files changed

+128
-75
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,14 @@
1212
import hashlib
1313
import json
1414
import os
15-
from pathlib import Path
1615
import random
1716
import shutil
1817
import sys
19-
from collections import defaultdict
2018
import time
19+
from collections import defaultdict
20+
from pathlib import Path
2121

22-
from diraccfg import CFG
23-
24-
from DIRAC import gConfig, S_OK, S_ERROR
22+
from DIRAC import S_ERROR, S_OK, gConfig
2523
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
2624
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueues
2725
from DIRAC.Core.Utilities import DErrno
@@ -30,6 +28,7 @@
3028
from DIRAC.Core.Utilities.Version import getVersion
3129
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
3230
from DIRAC.Resources.Computing import ComputingElement
31+
from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent
3332
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus, PilotStatus
3433
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
3534
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
@@ -41,11 +40,10 @@
4140
resolveInputData,
4241
transferInputSandbox,
4342
)
44-
from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved
45-
from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent
46-
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
4743
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials
44+
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
4845
from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved
46+
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
4947

5048
MAX_JOBS_MANAGED = 100
5149

@@ -740,7 +738,7 @@ def _checkSubmittedJobWrappers(self, ce: ComputingElement, site: str):
740738
return S_OK()
741739

742740
# Get their parameters
743-
if not (result := self.jobMonitoring.getJobParameters(jobs, ["GridCE", "TaskID", "Stamp"]))["OK"]:
741+
if not (result := getJobParameters(jobs, ["GridCE", "TaskID", "Stamp"]))["OK"]:
744742
self.log.error("Failed to get the list of taskIDs", result["Message"])
745743
return result
746744

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
2121
from DIRAC.Core.Utilities.TimeUtilities import fromString, second, toEpoch
2222
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
23-
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
24-
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
2523
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
2624
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
2725
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
2826
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
27+
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
28+
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import rescheduleJobs
2929

3030

3131
class StalledJobAgent(AgentModule):
@@ -254,11 +254,11 @@ def _failStalledJobs(self, jobID):
254254

255255
def _getJobPilotStatus(self, jobID):
256256
"""Get the job pilot status."""
257-
result = JobMonitoringClient().getJobParameter(jobID, "Pilot_Reference")
257+
result = getJobParameters(jobID, "Pilot_Reference")
258258
if not result["OK"]:
259259
return result
260-
pilotReference = result["Value"].get("Pilot_Reference", "Unknown")
261-
if pilotReference == "Unknown":
260+
pilotReference = result["Value"].get("Pilot_Reference")
261+
if not pilotReference:
262262
# There is no pilot reference, hence its status is unknown
263263
return S_OK("NoPilot")
264264

@@ -389,7 +389,7 @@ def _sendAccounting(self, jobID):
389389
if lastHeartBeatTime is not None and lastHeartBeatTime > endTime:
390390
endTime = lastHeartBeatTime
391391

392-
result = JobMonitoringClient().getJobParameter(jobID, "CPUNormalizationFactor")
392+
result = getJobParameters(jobID, "CPUNormalizationFactor")
393393
if not result["OK"] or not result["Value"]:
394394
self.log.error(
395395
"Error getting Job Parameter CPUNormalizationFactor, setting 0",
@@ -518,8 +518,7 @@ def _checkLoggingInfo(self, jobID, jobDict):
518518
return startTime, endTime
519519

520520
def _kickStuckJobs(self):
521-
"""Reschedule jobs stuck in initialization status Rescheduled,
522-
Matched."""
521+
"""Reschedule jobs stuck in initialization status Rescheduled, Matched."""
523522

524523
message = ""
525524

@@ -530,17 +529,12 @@ def _kickStuckJobs(self):
530529
return result
531530

532531
jobIDs = result["Value"]
533-
jobManagerClient = JobManagerClient()
534532
if jobIDs:
535533
self.log.info(f"Rescheduling {len(jobIDs)} jobs stuck in {JobStatus.MATCHED} status")
536-
result = jobManagerClient.rescheduleJob(jobIDs)
534+
result = rescheduleJobs(jobIDs)
537535
if not result["OK"]:
538536
message = f"Failed to reschedule jobs stuck in {JobStatus.MATCHED} status"
539537
message += "\n" + result["Message"]
540-
if "InvalidJobIDs" in result:
541-
message += "\n" + "\tInvalid job IDs: " + str(result["InvalidJobIDs"])
542-
if "NonauthorizedJobIDs" in result:
543-
message += "\n" + "\tNon authorized job IDs: " + str(result["NonauthorizedJobIDs"])
544538

545539
checkTime = datetime.datetime.utcnow() - self.rescheduledTime * second
546540
result = self.jobDB.selectJobs({"Status": JobStatus.RESCHEDULED}, older=checkTime)
@@ -550,18 +544,14 @@ def _kickStuckJobs(self):
550544

551545
jobIDs = result["Value"]
552546
if jobIDs:
553-
self.log.info(f"Rescheduling {len(jobIDs)} jobs stuck in Rescheduled status")
554-
result = jobManagerClient.rescheduleJob(jobIDs)
547+
self.log.info(f"Rescheduling {len(jobIDs)} jobs stuck in {JobStatus.RESCHEDULED} status")
548+
result = rescheduleJobs(jobIDs)
555549
if not result["OK"]:
556550
message = f"Failed to reschedule jobs stuck in {JobStatus.RESCHEDULED} status"
557551
message += "\n" + result["Message"]
558-
if "InvalidJobIDs" in result:
559-
message += "\n" + "\tInvalid job IDs: " + str(result["InvalidJobIDs"])
560-
if "NonauthorizedJobIDs" in result:
561-
message += "\n" + "\tNon authorized job IDs: " + str(result["NonauthorizedJobIDs"])
562552

563553
if message:
564-
return S_ERROR(message)
554+
self.log.error(message)
565555
return S_OK()
566556

567557
def _failSubmittingJobs(self):

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ def sja(mocker):
2525
)
2626
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobDB")
2727
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobLoggingDB")
28-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobMonitoringClient", return_value=MagicMock())
29-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobManagerClient", return_value=MagicMock())
28+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.rescheduleJobs", return_value=MagicMock())
3029
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.PilotAgentsDB", return_value=MagicMock())
30+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getJobParameters", return_value=MagicMock())
3131
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.WMSClient", return_value=MagicMock())
3232
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getDNForUsername", return_value=MagicMock())
3333

src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,9 +447,10 @@ def getJobsForPilot(self, pilotID):
447447
"""Get IDs of Jobs that were executed by a pilot"""
448448
cmd = "SELECT pilotID,JobID FROM JobToPilotMapping "
449449
if isinstance(pilotID, list):
450-
cmd = cmd + " WHERE pilotID IN (%s)" % ",".join(["%s" % x for x in pilotID])
450+
pilotIDs_string = ",".join(str(int(x)) for x in pilotID)
451+
cmd = f"{cmd} WHERE pilotID IN ({pilotIDs_string})"
451452
else:
452-
cmd = cmd + f" WHERE pilotID = {pilotID}"
453+
cmd = f"{cmd} WHERE pilotID = {pilotID}"
453454

454455
result = self._query(cmd)
455456
if not result["OK"]:

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
)
3535
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
3636
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength
37+
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import rescheduleJobs
3738

3839
MAX_PARAMETRIC_JOBS = 20
3940

@@ -345,8 +346,7 @@ def __getJobList(jobInput):
345346
types_rescheduleJob = []
346347

347348
def export_rescheduleJob(self, jobIDs):
348-
"""Reschedule a single job. If the optional proxy parameter is given
349-
it will be used to refresh the proxy in the Proxy Repository
349+
"""Reschedule a list of jobs.
350350
351351
:param list jobIDs: list of job IDs
352352
@@ -360,22 +360,12 @@ def export_rescheduleJob(self, jobIDs):
360360
validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights(
361361
jobList, RIGHT_RESCHEDULE
362362
)
363-
for jobID in validJobList:
364-
self.taskQueueDB.deleteJob(jobID)
365-
result = self.jobDB.rescheduleJob(jobID)
366-
self.log.debug(str(result))
367-
if not result["OK"]:
368-
return result
369-
self.jobLoggingDB.addLoggingRecord(
370-
result["JobID"],
371-
status=result["Status"],
372-
minorStatus=result["MinorStatus"],
373-
applicationStatus="Unknown",
374-
source="JobManager",
375-
)
363+
res = rescheduleJobs(validJobList, source="JobManager")
364+
if not res["OK"]:
365+
self.log.error(res["Message"])
376366

377367
if invalidJobList or nonauthJobList:
378-
result = S_ERROR("Some jobs failed reschedule")
368+
result = S_ERROR("Some jobs can not be rescheduled")
379369
if invalidJobList:
380370
result["InvalidJobIDs"] = invalidJobList
381371
if nonauthJobList:

src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from DIRAC.Core.Utilities.JEncode import strToIntDict
1212
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
1313
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
14+
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
1415

1516

1617
class JobMonitoringHandlerMixin:
@@ -270,27 +271,8 @@ def export_getJobParameters(self, jobIDs, parName=None):
270271
if not isinstance(jobIDs, list):
271272
jobIDs = [jobIDs]
272273
jobIDs = [int(jobID) for jobID in jobIDs]
273-
res = self.elasticJobParametersDB.getJobParameters(jobIDs, self.vo, parName)
274-
if not res["OK"]:
275-
return res
276-
parameters = res["Value"]
277274

278-
# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
279-
res = self.jobDB.getJobParameters(jobIDs, parName)
280-
if not res["OK"]:
281-
return res
282-
parametersM = res["Value"]
283-
284-
# and now combine
285-
final = dict(parametersM)
286-
# if job in JobDB, update with parameters from ES if any
287-
for jobID in final:
288-
final[jobID].update(parameters.get(jobID, {}))
289-
# if job in ES and not in JobDB, take ES
290-
for jobID in parameters:
291-
if jobID not in final:
292-
final[jobID] = parameters[jobID]
293-
return S_OK(final)
275+
return getJobParameters(jobIDs, parName, self.vo or "")
294276

295277
##############################################################################
296278
types_getAtticJobParameters = [int]

src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
"""
33
import multiprocessing
44

5-
from DIRAC import gConfig, gLogger
5+
from DIRAC import gConfig, gLogger, S_OK
66
from DIRAC.Core.Utilities.List import fromChar
7+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
8+
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB
79

810

911
def getMemoryFromProc():
@@ -139,3 +141,62 @@ def getNumberOfGPUs(siteName=None, gridCE=None, queue=None):
139141
# 3) return 0
140142
gLogger.info("NumberOfGPUs could not be found in CS")
141143
return 0
144+
145+
146+
def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> dict:
147+
"""Utility to get a job parameter for a list of jobIDs pertaining to a VO.
148+
If the jobID is not in the JobParametersDB, it will be looked up in the JobDB.
149+
150+
Requires direct access to the JobParametersDB and JobDB.
151+
152+
:param jobIDs: list of jobIDs
153+
:param parName: name of the parameter to be retrieved
154+
:param vo: VO of the jobIDs
155+
:return: dictionary with jobID as key and the parameter as value
156+
:rtype: dict
157+
"""
158+
elasticJobParametersDB = JobParametersDB()
159+
jobDB = JobDB()
160+
161+
if vo: # a user is connecting, with a proxy
162+
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
163+
if not res["OK"]:
164+
return res
165+
parameters = res["Value"]
166+
else: # a service is connecting, no proxy, e.g. StalledJobAgent
167+
q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})"
168+
res = jobDB._query(q)
169+
if not res["OK"]:
170+
return res
171+
if not res["Value"]:
172+
return S_OK({})
173+
# get the VO for each jobID
174+
voDict = {}
175+
for jobID, vo in res["Value"]:
176+
if vo not in voDict:
177+
voDict[vo] = []
178+
voDict[vo].append(jobID)
179+
# get the parameters for each VO
180+
parameters = {}
181+
for vo, jobIDs in voDict.items():
182+
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
183+
if not res["OK"]:
184+
return res
185+
parameters.update(res["Value"])
186+
187+
# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
188+
res = jobDB.getJobParameters(jobIDs, parName)
189+
if not res["OK"]:
190+
return res
191+
parametersM = res["Value"]
192+
193+
# and now combine
194+
final = dict(parametersM)
195+
# if job in JobDB, update with parameters from ES if any
196+
for jobID in final:
197+
final[jobID].update(parameters.get(jobID, {}))
198+
# if job in ES and not in JobDB, take ES
199+
for jobID in parameters:
200+
if jobID not in final:
201+
final[jobID] = parameters[jobID]
202+
return S_OK(final)

src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
from DIRAC.Core.Utilities import TimeUtilities
1010
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
1111
from DIRAC.WorkloadManagementSystem.Client import JobStatus
12-
13-
if TYPE_CHECKING:
14-
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
15-
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
12+
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
13+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
14+
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB
1615

1716

1817
class JobStatusUtility:
@@ -243,3 +242,35 @@ def getNewStatus(
243242
minor = sDict.get("MinorStatus", minor)
244243
application = sDict.get("ApplicationStatus", application)
245244
return S_OK((status, minor, application))
245+
246+
247+
def rescheduleJobs(jobIDs: list[int], source: str = "") -> dict:
248+
"""Utility to reschedule jobs (not atomic, nor bulk)
249+
Requires direct access to the JobDB and TaskQueueDB
250+
251+
:param jobIDs: list of jobIDs
252+
:param source: source of the reschedule
253+
:return: S_OK/S_ERROR
254+
:rtype: dict
255+
256+
"""
257+
258+
failedJobs = []
259+
260+
for jobID in jobIDs:
261+
result = JobDB().rescheduleJob(jobID)
262+
if not result["OK"]:
263+
failedJobs.append(jobID)
264+
continue
265+
TaskQueueDB().deleteJob(jobID)
266+
JobLoggingDB().addLoggingRecord(
267+
result["JobID"],
268+
status=result["Status"],
269+
minorStatus=result["MinorStatus"],
270+
applicationStatus="Unknown",
271+
source=source,
272+
)
273+
274+
if failedJobs:
275+
return S_ERROR(f"Failed to reschedule jobs {failedJobs}")
276+
return S_OK()

0 commit comments

Comments
 (0)