Skip to content

Commit 9391e01

Browse files
committed
refactor: extracted getJobParameters in utility, using it in agents
1 parent 33c01bc commit 9391e01

File tree

5 files changed

+78
-58
lines changed

5 files changed

+78
-58
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,14 @@
1212
import hashlib
1313
import json
1414
import os
15-
from pathlib import Path
1615
import random
1716
import shutil
1817
import sys
19-
from collections import defaultdict
2018
import time
19+
from collections import defaultdict
20+
from pathlib import Path
2121

22-
from diraccfg import CFG
23-
24-
from DIRAC import gConfig, S_OK, S_ERROR
22+
from DIRAC import S_ERROR, S_OK, gConfig
2523
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
2624
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueues
2725
from DIRAC.Core.Utilities import DErrno
@@ -30,6 +28,7 @@
3028
from DIRAC.Core.Utilities.Version import getVersion
3129
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
3230
from DIRAC.Resources.Computing import ComputingElement
31+
from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent
3332
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus, PilotStatus
3433
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
3534
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
@@ -41,11 +40,11 @@
4140
resolveInputData,
4241
transferInputSandbox,
4342
)
44-
from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved
45-
from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent
46-
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
4743
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials
4844
from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved
45+
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
46+
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
47+
4948

5049
MAX_JOBS_MANAGED = 100
5150

@@ -740,7 +739,7 @@ def _checkSubmittedJobWrappers(self, ce: ComputingElement, site: str):
740739
return S_OK()
741740

742741
# Get their parameters
743-
if not (result := self.jobMonitoring.getJobParameters(jobs, ["GridCE", "TaskID", "Stamp"]))["OK"]:
742+
if not (result := getJobParameters(jobs, ["GridCE", "TaskID", "Stamp"]))["OK"]:
744743
self.log.error("Failed to get the list of taskIDs", result["Message"])
745744
return result
746745

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
from DIRAC.Core.Utilities.TimeUtilities import fromString, second, toEpoch
2222
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
2323
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
24-
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
2524
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
2625
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
2726
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
2827
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
28+
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
2929

3030

3131
class StalledJobAgent(AgentModule):
@@ -254,11 +254,11 @@ def _failStalledJobs(self, jobID):
254254

255255
def _getJobPilotStatus(self, jobID):
256256
"""Get the job pilot status."""
257-
result = JobMonitoringClient().getJobParameter(jobID, "Pilot_Reference")
257+
result = getJobParameters(jobID, "Pilot_Reference")
258258
if not result["OK"]:
259259
return result
260-
pilotReference = result["Value"].get("Pilot_Reference", "Unknown")
261-
if pilotReference == "Unknown":
260+
pilotReference = result["Value"].get("Pilot_Reference")
261+
if not pilotReference:
262262
# There is no pilot reference, hence its status is unknown
263263
return S_OK("NoPilot")
264264

@@ -389,7 +389,7 @@ def _sendAccounting(self, jobID):
389389
if lastHeartBeatTime is not None and lastHeartBeatTime > endTime:
390390
endTime = lastHeartBeatTime
391391

392-
result = JobMonitoringClient().getJobParameter(jobID, "CPUNormalizationFactor")
392+
result = getJobParameters(jobID, "CPUNormalizationFactor")
393393
if not result["OK"] or not result["Value"]:
394394
self.log.error(
395395
"Error getting Job Parameter CPUNormalizationFactor, setting 0",

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ def sja(mocker):
2525
)
2626
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobDB")
2727
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobLoggingDB")
28-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobMonitoringClient", return_value=MagicMock())
2928
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobManagerClient", return_value=MagicMock())
3029
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.PilotAgentsDB", return_value=MagicMock())
30+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getJobParameters", return_value=MagicMock())
3131
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.WMSClient", return_value=MagicMock())
3232
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getDNForUsername", return_value=MagicMock())
3333

src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py

Lines changed: 3 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from DIRAC.Core.Utilities.JEncode import strToIntDict
1212
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
1313
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
14+
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
1415

1516

1617
class JobMonitoringHandlerMixin:
@@ -270,48 +271,8 @@ def export_getJobParameters(self, jobIDs, parName=None):
270271
if not isinstance(jobIDs, list):
271272
jobIDs = [jobIDs]
272273
jobIDs = [int(jobID) for jobID in jobIDs]
273-
if self.vo: # a user is connecting, with a proxy
274-
res = self.elasticJobParametersDB.getJobParameters(jobIDs, self.vo, parName)
275-
if not res["OK"]:
276-
return res
277-
parameters = res["Value"]
278-
else: # a service is connecting, no proxy, e.g. StalledJobAgent
279-
q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})"
280-
res = self.jobDB._query(q)
281-
if not res["OK"]:
282-
return res
283-
if not res["Value"]:
284-
return S_OK({})
285-
# get the VO for each jobID
286-
voDict = {}
287-
for jobID, vo in res["Value"]:
288-
if vo not in voDict:
289-
voDict[vo] = []
290-
voDict[vo].append(jobID)
291-
# get the parameters for each VO
292-
parameters = {}
293-
for vo, jobIDs in voDict.items():
294-
res = self.elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
295-
if not res["OK"]:
296-
return res
297-
parameters.update(res["Value"])
298-
299-
# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
300-
res = self.jobDB.getJobParameters(jobIDs, parName)
301-
if not res["OK"]:
302-
return res
303-
parametersM = res["Value"]
304-
305-
# and now combine
306-
final = dict(parametersM)
307-
# if job in JobDB, update with parameters from ES if any
308-
for jobID in final:
309-
final[jobID].update(parameters.get(jobID, {}))
310-
# if job in ES and not in JobDB, take ES
311-
for jobID in parameters:
312-
if jobID not in final:
313-
final[jobID] = parameters[jobID]
314-
return S_OK(final)
274+
275+
return getJobParameters(jobIDs, parName, self.vo or "")
315276

316277
##############################################################################
317278
types_getAtticJobParameters = [int]

src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
"""
33
import multiprocessing
44

5-
from DIRAC import gConfig, gLogger
5+
from DIRAC import gConfig, gLogger, S_OK
66
from DIRAC.Core.Utilities.List import fromChar
7+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
8+
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB
79

810

911
def getMemoryFromProc():
@@ -139,3 +141,61 @@ def getNumberOfGPUs(siteName=None, gridCE=None, queue=None):
139141
# 3) return 0
140142
gLogger.info("NumberOfGPUs could not be found in CS")
141143
return 0
144+
145+
146+
def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> dict:
147+
"""Utility to get a job parameter for a list of jobIDs pertaining to a VO.
148+
If the jobID is not in the JobParametersDB, it will be looked up in the JobDB.
149+
150+
Requires direct access to the JobParametersDB and JobDB.
151+
:param jobIDs: list of jobIDs
152+
:param parName: name of the parameter to be retrieved
153+
:param vo: VO of the jobIDs
154+
:return: dictionary with jobID as key and the parameter as value
155+
:rtype: dict
156+
"""
157+
elasticJobParametersDB = JobParametersDB()
158+
jobDB = JobDB()
159+
160+
if vo: # a user is connecting, with a proxy
161+
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
162+
if not res["OK"]:
163+
return res
164+
parameters = res["Value"]
165+
else: # a service is connecting, no proxy, e.g. StalledJobAgent
166+
q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})"
167+
res = jobDB._query(q)
168+
if not res["OK"]:
169+
return res
170+
if not res["Value"]:
171+
return S_OK({})
172+
# get the VO for each jobID
173+
voDict = {}
174+
for jobID, vo in res["Value"]:
175+
if vo not in voDict:
176+
voDict[vo] = []
177+
voDict[vo].append(jobID)
178+
# get the parameters for each VO
179+
parameters = {}
180+
for vo, jobIDs in voDict.items():
181+
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
182+
if not res["OK"]:
183+
return res
184+
parameters.update(res["Value"])
185+
186+
# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
187+
res = jobDB.getJobParameters(jobIDs, parName)
188+
if not res["OK"]:
189+
return res
190+
parametersM = res["Value"]
191+
192+
# and now combine
193+
final = dict(parametersM)
194+
# if job in JobDB, update with parameters from ES if any
195+
for jobID in final:
196+
final[jobID].update(parameters.get(jobID, {}))
197+
# if job in ES and not in JobDB, take ES
198+
for jobID in parameters:
199+
if jobID not in final:
200+
final[jobID] = parameters[jobID]
201+
return S_OK(final)

0 commit comments

Comments
 (0)