Skip to content

Commit 1228f15

Browse files
authored
Merge pull request #6973 from simon-mazenoux/feat-validating-jdl-using-pydantic
[8.1] Validating the JDL format using pydantic
2 parents 1b36402 + 7e81a06 commit 1228f15

File tree

14 files changed

+824
-143
lines changed

14 files changed

+824
-143
lines changed

environment.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ dependencies:
3030
- psutil >=4.2.0
3131
- pyasn1 >0.4.1
3232
- pyasn1-modules
33+
- pydantic
3334
- python-json-logger >=0.1.8
3435
- pytz >=2015.7
3536
- pyyaml

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ ignored-modules = ["MySQLdb", "numpy"]
6969
# avoid hangs.
7070
jobs = 0
7171

72+
# This allows pylint to not display the following false error :
73+
# No name 'BaseModel' in module 'pydantic' (no-name-in-module)
74+
# Related issue: https://github.com/pydantic/pydantic/issues/1961
75+
extension-pkg-whitelist = ["pydantic"]
76+
7277
[tool.pylint.typecheck]
7378
# List of decorators that change the signature of a decorated function.
7479
signature-mutators = []

src/DIRAC/Core/Utilities/ClassAd/ClassAdLight.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ def getAttributeFloat(self, name):
287287
value = None
288288
return value
289289

290-
def getAttributes(self):
290+
def getAttributes(self) -> list[str]:
291291
"""Get the list of all the attribute names
292292
293293
:return: list of names as strings

src/DIRAC/Core/Utilities/Graphs/Palette.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
JobMinorStatus.INPUT_NOT_AVAILABLE: "#2822A6",
3232
JobMinorStatus.INPUT_DATA_RESOLUTION: "#FFBE94",
3333
JobMinorStatus.DOWNLOADING_INPUT_SANDBOX: "#586CFF",
34-
JobMinorStatus.INPUT_CONTAINS_SLASHES: "#AB7800",
35-
JobMinorStatus.INPUT_INCORRECT: "#6812D6",
3634
JobMinorStatus.JOB_WRAPPER_INITIALIZATION: "#FFFFCC",
3735
JobMinorStatus.JOB_EXCEEDED_WALL_CLOCK: "#FF33CC",
3836
JobMinorStatus.JOB_INSUFFICIENT_DISK: "#33FFCC",

src/DIRAC/Core/Utilities/JDL.py

Lines changed: 201 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,46 @@
1+
"""Transformation classes around the JDL format."""
2+
13
from diraccfg import CFG
4+
from pydantic import ValidationError
5+
26
from DIRAC import S_OK, S_ERROR
37
from DIRAC.Core.Utilities import List
8+
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
9+
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import BaseJobDescriptionModel
10+
11+
ARGUMENTS = "Arguments"
12+
BANNED_SITES = "BannedSites"
13+
CPU_TIME = "CPUTime"
14+
EXECUTABLE = "Executable"
15+
EXECUTION_ENVIRONMENT = "ExecutionEnvironment"
16+
GRID_CE = "GridCE"
17+
INPUT_DATA = "InputData"
18+
INPUT_DATA_POLICY = "InputDataPolicy"
19+
INPUT_SANDBOX = "InputSandbox"
20+
JOB_CONFIG_ARGS = "JobConfigArgs"
21+
JOB_TYPE = "JobType"
22+
JOB_GROUP = "JobGroup"
23+
LOG_LEVEL = "LogLevel"
24+
NUMBER_OF_PROCESSORS = "NumberOfProcessors"
25+
MAX_NUMBER_OF_PROCESSORS = "MaxNumberOfProcessors"
26+
MIN_NUMBER_OF_PROCESSORS = "MinNumberOfProcessors"
27+
OUTPUT_DATA = "OutputData"
28+
OUTPUT_PATH = "OutputPath"
29+
OUTPUT_SE = "OutputSE"
30+
PLATFORM = "Platform"
31+
PRIORITY = "Priority"
32+
STD_ERROR = "StdError"
33+
STD_OUTPUT = "StdOutput"
34+
OUTPUT_SANDBOX = "OutputSandbox"
35+
JOB_NAME = "JobName"
36+
SITE = "Site"
37+
TAGS = "Tags"
38+
39+
OWNER = "Owner"
40+
OWNER_GROUP = "OwnerGroup"
41+
VO = "VirtualOrganization"
42+
43+
CREDENTIALS_FIELDS = {OWNER, OWNER_GROUP, VO}
444

545

646
def loadJDLAsCFG(jdl):
@@ -138,7 +178,7 @@ def dumpCFGAsJDL(cfg, level=1, tab=" "):
138178
else:
139179
val = List.fromChar(cfg[key])
140180
# Some attributes are never lists
141-
if len(val) < 2 or key in ["Arguments", "Executable", "StdOutput", "StdError"]:
181+
if len(val) < 2 or key in [ARGUMENTS, EXECUTABLE, STD_OUTPUT, STD_ERROR]:
142182
value = cfg[key]
143183
try:
144184
try_value = float(value)
@@ -157,3 +197,163 @@ def dumpCFGAsJDL(cfg, level=1, tab=" "):
157197
contents.append("%s};" % indent)
158198
contents.append(f"{tab * (level - 1)}]")
159199
return "\n".join(contents)
200+
201+
202+
def jdlToBaseJobDescriptionModel(classAd: ClassAd):
203+
"""
204+
Converts a JDL string into a JSON string for data validation from the BaseJob model
205+
This method allows compatibility with older Client versions that used the _toJDL method
206+
"""
207+
try:
208+
jobDescription = BaseJobDescriptionModel(
209+
executable=classAd.getAttributeString(EXECUTABLE),
210+
)
211+
if classAd.lookupAttribute(ARGUMENTS):
212+
jobDescription.arguments = classAd.getAttributeString(ARGUMENTS)
213+
classAd.deleteAttribute(ARGUMENTS)
214+
215+
if classAd.lookupAttribute(BANNED_SITES):
216+
jobDescription.bannedSites = classAd.getListFromExpression(BANNED_SITES)
217+
classAd.deleteAttribute(BANNED_SITES)
218+
219+
if classAd.lookupAttribute(CPU_TIME):
220+
jobDescription.cpuTime = classAd.getAttributeInt(CPU_TIME)
221+
classAd.deleteAttribute(CPU_TIME)
222+
223+
if classAd.lookupAttribute(EXECUTABLE):
224+
jobDescription.executable = classAd.getAttributeString(EXECUTABLE)
225+
classAd.deleteAttribute(EXECUTABLE)
226+
227+
if classAd.lookupAttribute(EXECUTION_ENVIRONMENT):
228+
executionEnvironment = classAd.getListFromExpression(EXECUTION_ENVIRONMENT)
229+
if executionEnvironment:
230+
jobDescription.executionEnvironment = {}
231+
for element in executionEnvironment:
232+
key, value = element.split("=")
233+
if value.isdigit():
234+
value = int(value)
235+
else:
236+
try:
237+
value = float(value)
238+
except ValueError:
239+
pass
240+
jobDescription.executionEnvironment[key] = value
241+
classAd.deleteAttribute(EXECUTION_ENVIRONMENT)
242+
243+
if classAd.lookupAttribute(GRID_CE):
244+
jobDescription.gridCE = classAd.getAttributeString(GRID_CE)
245+
classAd.deleteAttribute(GRID_CE)
246+
247+
if classAd.lookupAttribute(INPUT_DATA):
248+
jobDescription.inputData = classAd.getListFromExpression(INPUT_DATA)
249+
classAd.deleteAttribute(INPUT_DATA)
250+
251+
if classAd.lookupAttribute(INPUT_DATA_POLICY):
252+
jobDescription.inputDataPolicy = classAd.getAttributeString(INPUT_DATA_POLICY)
253+
classAd.deleteAttribute(INPUT_DATA_POLICY)
254+
255+
if classAd.lookupAttribute(INPUT_SANDBOX):
256+
jobDescription.inputSandbox = classAd.getListFromExpression(INPUT_SANDBOX)
257+
classAd.deleteAttribute(INPUT_SANDBOX)
258+
259+
if classAd.lookupAttribute(JOB_CONFIG_ARGS):
260+
jobDescription.jobConfigArgs = classAd.getAttributeString(JOB_CONFIG_ARGS)
261+
classAd.deleteAttribute(JOB_CONFIG_ARGS)
262+
263+
if classAd.lookupAttribute(JOB_GROUP):
264+
jobDescription.jobGroup = classAd.getAttributeString(JOB_GROUP)
265+
classAd.deleteAttribute(JOB_GROUP)
266+
267+
if classAd.lookupAttribute(JOB_NAME):
268+
jobDescription.jobName = classAd.getAttributeString(JOB_NAME)
269+
classAd.deleteAttribute(JOB_NAME)
270+
271+
if classAd.lookupAttribute(JOB_TYPE):
272+
jobDescription.jobType = classAd.getAttributeString(JOB_TYPE)
273+
classAd.deleteAttribute(JOB_TYPE)
274+
275+
if classAd.lookupAttribute(LOG_LEVEL):
276+
jobDescription.logLevel = classAd.getAttributeString(LOG_LEVEL)
277+
classAd.deleteAttribute(LOG_LEVEL)
278+
279+
if classAd.lookupAttribute(NUMBER_OF_PROCESSORS):
280+
jobDescription.maxNumberOfProcessors = classAd.getAttributeInt(NUMBER_OF_PROCESSORS)
281+
jobDescription.minNumberOfProcessors = classAd.getAttributeInt(NUMBER_OF_PROCESSORS)
282+
classAd.deleteAttribute(NUMBER_OF_PROCESSORS)
283+
classAd.deleteAttribute(MAX_NUMBER_OF_PROCESSORS)
284+
classAd.deleteAttribute(MIN_NUMBER_OF_PROCESSORS)
285+
else:
286+
if classAd.lookupAttribute(MAX_NUMBER_OF_PROCESSORS):
287+
jobDescription.maxNumberOfProcessors = classAd.getAttributeInt(MAX_NUMBER_OF_PROCESSORS)
288+
classAd.deleteAttribute(MAX_NUMBER_OF_PROCESSORS)
289+
if classAd.lookupAttribute(MIN_NUMBER_OF_PROCESSORS):
290+
jobDescription.minNumberOfProcessors = classAd.getAttributeInt(MIN_NUMBER_OF_PROCESSORS)
291+
classAd.deleteAttribute(MIN_NUMBER_OF_PROCESSORS)
292+
293+
if classAd.lookupAttribute(OUTPUT_DATA):
294+
jobDescription.outputData = set(classAd.getListFromExpression(OUTPUT_DATA))
295+
classAd.deleteAttribute(OUTPUT_DATA)
296+
297+
if classAd.lookupAttribute(OUTPUT_SANDBOX):
298+
jobDescription.outputSandbox = set(classAd.getListFromExpression(OUTPUT_SANDBOX))
299+
classAd.deleteAttribute(OUTPUT_SANDBOX)
300+
301+
if classAd.lookupAttribute(OUTPUT_PATH):
302+
jobDescription.outputPath = classAd.getAttributeString(OUTPUT_PATH)
303+
classAd.deleteAttribute(OUTPUT_PATH)
304+
305+
if classAd.lookupAttribute(OUTPUT_SE):
306+
jobDescription.outputSE = classAd.getAttributeString(OUTPUT_SE)
307+
classAd.deleteAttribute(OUTPUT_SE)
308+
309+
if classAd.lookupAttribute(SITE):
310+
jobDescription.sites = classAd.getListFromExpression(SITE)
311+
classAd.deleteAttribute(SITE)
312+
313+
if classAd.lookupAttribute(PLATFORM):
314+
jobDescription.platform = classAd.getAttributeString(PLATFORM)
315+
classAd.deleteAttribute(PLATFORM)
316+
317+
if classAd.lookupAttribute(PRIORITY):
318+
jobDescription.priority = classAd.getAttributeInt(PRIORITY)
319+
classAd.deleteAttribute(PRIORITY)
320+
321+
if classAd.lookupAttribute(STD_OUTPUT):
322+
jobDescription.stdout = classAd.getAttributeString(STD_OUTPUT)
323+
classAd.deleteAttribute(STD_OUTPUT)
324+
325+
if classAd.lookupAttribute(STD_ERROR):
326+
jobDescription.stderr = classAd.getAttributeString(STD_ERROR)
327+
classAd.deleteAttribute(STD_ERROR)
328+
329+
if classAd.lookupAttribute(TAGS):
330+
jobDescription.tags = classAd.getListFromExpression(TAGS)
331+
classAd.deleteAttribute(TAGS)
332+
333+
# Remove credentials
334+
for attribute in CREDENTIALS_FIELDS:
335+
classAd.deleteAttribute(attribute)
336+
337+
# Remove legacy attributes
338+
for attribute in {"DIRACSetup", "OwnerDN"}:
339+
classAd.deleteAttribute(attribute)
340+
341+
for attribute in classAd.getAttributes():
342+
if not jobDescription.extraFields:
343+
jobDescription.extraFields = {}
344+
345+
value = classAd.getAttributeString(attribute)
346+
if value.isdigit():
347+
value = int(value)
348+
else:
349+
try:
350+
value = float(value)
351+
except ValueError:
352+
pass
353+
354+
jobDescription.extraFields[attribute] = value
355+
356+
except ValidationError as e:
357+
return S_ERROR(f"Invalid JDL: {e}")
358+
359+
return S_OK(jobDescription)
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
""" Unit tests for JDL module"""
2+
3+
# pylint: disable=protected-access, invalid-name
4+
5+
from io import StringIO
6+
from unittest.mock import patch
7+
8+
import pytest
9+
10+
from DIRAC import S_OK
11+
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
12+
from DIRAC.Core.Utilities.JDL import jdlToBaseJobDescriptionModel
13+
from DIRAC.Interfaces.API.Job import Job
14+
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
15+
16+
17+
def test_jdlToBaseJobDescriptionModel_valid():
18+
"""This test makes sure that a job object can be parsed by the jdlToBaseJobDescriptionModel method"""
19+
# Arrange
20+
with patch("DIRAC.Core.Base.API.getSites", return_value=S_OK(["LCG.IN2P3.fr"])):
21+
job = Job()
22+
job.setConfigArgs("configArgs")
23+
job.setCPUTime(3600)
24+
job.setExecutable("/bin/echo", arguments="arguments", logFile="logFile")
25+
job.setName("JobName")
26+
with patch(
27+
"DIRAC.ConfigurationSystem.Client.Helpers.Operations.Operations.getValue",
28+
return_value="DIRAC.WorkloadManagementSystem.Client.DownloadInputData",
29+
):
30+
job.setInputDataPolicy("download")
31+
job.setInputSandbox(["inputfile.opts"])
32+
job.setOutputSandbox(["inputfile.opts"])
33+
job.setInputData(["/lhcb/production/DC04/v2/DST/00000742_00003493_10.dst"])
34+
job.setParameterSequence("IntSequence", [1, 2, 3])
35+
job.setParameterSequence("StrSequence", ["a", "b", "c"])
36+
job.setParameterSequence("FloatSequence", [1.0, 2.0, 3.0])
37+
38+
job.setOutputData(["outputfile.root"], outputSE="IN2P3-disk", outputPath="/myjobs/1234")
39+
with patch("DIRAC.Interfaces.API.Job.getDIRACPlatforms", return_value=S_OK("x86_64-slc6-gcc49-opt")):
40+
job.setPlatform("x86_64-slc6-gcc49-opt")
41+
job.setPriority(10)
42+
43+
job.setDestination("LCG.IN2P3.fr")
44+
job.setNumberOfProcessors(3)
45+
with patch("DIRAC.Interfaces.API.Job.getCESiteMapping", return_value=S_OK({"some.ce.IN2P3.fr": "LCG.IN2P3.fr"})):
46+
job.setDestinationCE("some.ce.IN2P3.fr")
47+
job.setType("Test")
48+
job.setTag(["WholeNode", "8GBMemory"])
49+
job.setJobGroup("1234abcd")
50+
job.setLogLevel("DEBUG")
51+
job.setConfigArgs("configArgs")
52+
job.setExecutionEnv({"INTVAR": 1, "STRVAR": "a"})
53+
job._addJDLParameter("ExtraInt", 1)
54+
job._addJDLParameter("ExtraFloat", 1.0)
55+
job._addJDLParameter("ExtraString", "test")
56+
# The 3 lines below are not a use case given that workflow parameters
57+
# must be strings, ints, floats or booleans
58+
# job._addJDLParameter("ExtraIntList", ";".join(["1", "2", "3"]))
59+
# job._addJDLParameter("ExtraFloatList", ";".join(["1.0", "2.0", "3.0"]))
60+
# job._addJDLParameter("ExtraStringList",";".join(["a", "b", "c"]))
61+
62+
# We make sure that the job above is valid
63+
assert not job.errorDict
64+
65+
# Act
66+
xml = job._toXML()
67+
jdl = f"[{job._toJDL(jobDescriptionObject=StringIO(xml))}]"
68+
69+
# Assert
70+
res = jdlToBaseJobDescriptionModel(ClassAd(jdl))
71+
assert res["OK"], res["Message"]
72+
73+
data = res["Value"].dict()
74+
with patch(
75+
"DIRAC.WorkloadManagementSystem.Utilities.JobModel.getDIRACPlatforms",
76+
return_value=S_OK(["x86_64-slc6-gcc49-opt"]),
77+
):
78+
with patch(
79+
"DIRAC.WorkloadManagementSystem.Utilities.JobModel.getSites",
80+
return_value=S_OK(["LCG.IN2P3.fr"]),
81+
):
82+
assert JobDescriptionModel(owner="owner", ownerGroup="ownerGroup", vo="lhcb", **data)
83+
84+
85+
@pytest.mark.parametrize(
86+
"jdl",
87+
[
88+
"""[]""", # No executable
89+
"""[Executable="";]""", # Empty executable
90+
"""Executable="executable";""", # Missing brackets
91+
],
92+
)
93+
def test_jdlToBaseJobDescriptionModel_invalid(jdl):
94+
"""This test makes sure that a job object without an executable raises an error"""
95+
# Arrange
96+
97+
# Act
98+
res = jdlToBaseJobDescriptionModel(ClassAd(jdl))
99+
100+
# Assert
101+
assert not res["OK"], res["Value"]

src/DIRAC/Interfaces/API/Job.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
4141
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
4242
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
43-
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping
43+
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping, getDIRACPlatforms
4444
from DIRAC.Interfaces.API.Dirac import Dirac
4545
from DIRAC.Workflow.Utilities.Utils import getStepDefinition, addStepToWorkflow
4646

@@ -82,12 +82,6 @@ def __init__(self, script=None, stdout="std.out", stderr="std.err"):
8282
self.wfArguments = {}
8383
self.parametricWFArguments = {}
8484

85-
# loading the function that will be used to determine the platform (it can be VO specific)
86-
res = ObjectLoader().loadObject("ConfigurationSystem.Client.Helpers.Resources", "getDIRACPlatforms")
87-
if not res["OK"]:
88-
self.log.fatal(res["Message"])
89-
self.getDIRACPlatforms = res["Value"]
90-
9185
self.script = script
9286
if not script:
9387
self.workflow = Workflow()
@@ -451,7 +445,7 @@ def setPlatform(self, platform):
451445
return self._reportError("Expected string for platform", **kwargs)
452446

453447
if not platform.lower() == "any":
454-
availablePlatforms = self.getDIRACPlatforms()
448+
availablePlatforms = getDIRACPlatforms()
455449
if not availablePlatforms["OK"]:
456450
return self._reportError("Can't check for platform", **kwargs)
457451
if platform in availablePlatforms["Value"]:

src/DIRAC/WorkloadManagementSystem/Client/JobMinorStatus.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,4 @@
7676
#
7777
INPUT_NOT_AVAILABLE = "Input Data Not Available"
7878
#
79-
INPUT_CONTAINS_SLASHES = "Input data contains //"
80-
#
81-
INPUT_INCORRECT = "Input data not correctly specified"
82-
#
8379
NO_CANDIDATE_SITE_FOUND = "No candidate sites available"

0 commit comments

Comments
 (0)