Skip to content

Commit a965b05

Browse files
authored
Merge pull request #8183 from chrisburr/bulk-job-input
[9.0] Support bulk calls to JobMonitoring.getInputData
2 parents d47dd14 + 2648feb commit a965b05

File tree

4 files changed

+48
-13
lines changed

4 files changed

+48
-13
lines changed

src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,9 @@ def getJobsStates(self, jobIDs):
7979
if res["OK"]:
8080
res["Value"] = strToIntDict(res["Value"])
8181
return res
82+
83+
def getInputData(self, jobIDs):
84+
res = self._getRPC().getInputData(jobIDs)
85+
if res["OK"] and isinstance(res["Value"], dict):
86+
res["Value"] = strToIntDict(res["Value"])
87+
return res

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,26 @@
1111
* *CompressJDLs*: Enable compression of JDLs when they are stored in the database, default *False*.
1212
1313
"""
14+
from __future__ import annotations
15+
1416
import datetime
1517
import operator
18+
from typing import overload
1619

1720
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
1821
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSiteTier
1922
from DIRAC.Core.Base.DB import DB
2023
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
2124
from DIRAC.Core.Utilities.Decorators import deprecated
2225
from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError
23-
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, convertToReturnValue, returnValueOrRaise, SErrorException
26+
from DIRAC.Core.Utilities.ReturnValues import (
27+
S_ERROR,
28+
S_OK,
29+
convertToReturnValue,
30+
returnValueOrRaise,
31+
SErrorException,
32+
DReturnType,
33+
)
2434
from DIRAC.FrameworkSystem.Client.Logger import contextLogger
2535
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
2636
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
@@ -320,23 +330,42 @@ def getJobOptParameters(self, jobID, paramList=None):
320330

321331
#############################################################################
322332

323-
def getInputData(self, jobID):
333+
@overload
334+
def getInputData(self, jobID: int | str) -> DReturnType[list[str]]:
335+
...
336+
337+
@overload
338+
def getInputData(self, jobID: list[int | str]) -> DReturnType[dict[int, list[str]]]:
339+
...
340+
341+
def getInputData(self, jobID: int | str | list[int | str]) -> DReturnType[list[str] | dict[int, list[str]]]:
324342
"""Get input data for the given job"""
325-
ret = self._escapeString(jobID)
326-
if not ret["OK"]:
327-
return ret
328-
jobID = ret["Value"]
329-
cmd = f"SELECT LFN FROM InputData WHERE JobID={jobID}"
343+
if isinstance(jobID, (int, str)):
344+
ret = self._escapeString(jobID)
345+
if not ret["OK"]:
346+
return ret
347+
jobID = ret["Value"]
348+
query = f"JobID={jobID}"
349+
result = []
350+
else:
351+
job_ids = {int(i) for i in jobID}
352+
query = f"JobID IN ({','.join(map(str, job_ids))})"
353+
result = {i: [] for i in job_ids}
354+
cmd = f"SELECT JobID, LFN FROM InputData WHERE {query}"
330355
res = self._query(cmd)
331356
if not res["OK"]:
332357
return res
333358

334-
inputData = [i[0] for i in res["Value"] if i[0].strip()]
335-
for index, lfn in enumerate(inputData):
359+
for jid, lfn in res["Value"]:
360+
lfn = lfn.strip()
336361
if lfn.lower().startswith("lfn:"):
337-
inputData[index] = lfn[4:]
362+
lfn = lfn[4:]
363+
if isinstance(result, list):
364+
result.append(lfn)
365+
else:
366+
result[jid].append(lfn)
338367

339-
return S_OK(inputData)
368+
return S_OK(result)
340369

341370
#############################################################################
342371
def setInputData(self, jobID, inputData):

src/DIRAC/WorkloadManagementSystem/DB/tests/Test_JobDB.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def test_getInputData(jobDB: JobDB):
2828
"""Test the getInputData method from JobDB"""
2929
# Arrange
3030
jobDB._escapeString = MagicMock(return_value=S_OK())
31-
jobDB._query = MagicMock(return_value=S_OK((("/vo/user/lfn1",), ("LFN:/vo/user/lfn2",))))
31+
jobDB._query = MagicMock(return_value=S_OK([(1234, "/vo/user/lfn1"), (1234, "LFN:/vo/user/lfn2")]))
3232

3333
# Act
3434
res = jobDB.getInputData(1234)

src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ def export_getJobHeartBeatData(cls, jobID):
514514
return cls.jobDB.getHeartBeatData(jobID)
515515

516516
##############################################################################
517-
types_getInputData = [int]
517+
types_getInputData = [(int, list)]
518518

519519
@classmethod
520520
def export_getInputData(cls, jobID):

0 commit comments

Comments
 (0)