Skip to content

Commit 854214a

Browse files
authored
Merge pull request #8031 from fstagni/cherry-pick-2-54bbdddf7-integration
[sweep:integration] fix: make the setting of inputDataBulk extendable
2 parents 1458dd6 + 479663f commit 854214a

File tree

2 files changed

+53
-18
lines changed

2 files changed

+53
-18
lines changed

src/DIRAC/TransformationSystem/Client/WorkflowTasks.py

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
88
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
99
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
10-
from DIRAC.Core.Utilities.DErrno import ETSDATA, ETSUKN
10+
from DIRAC.Core.Utilities.DErrno import ETSUKN
1111
from DIRAC.Core.Utilities.List import fromChar
1212
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
1313
from DIRAC.Interfaces.API.Job import Job
@@ -83,6 +83,8 @@ def __init__(
8383
self.outputDataModule_o = None
8484
self.objectLoader = ObjectLoader()
8585

86+
self.parametricSequencedKeys = ["JOB_ID", "PRODUCTION_ID", "InputData"]
87+
8688
def prepareTransformationTasks(self, transBody, taskDict, owner="", ownerGroup="", bulkSubmissionFlag=False):
8789
"""Prepare tasks, given a taskDict, that is created (with some manipulation) by the DB
8890
jobClass is by default "DIRAC.Interfaces.API.Job.Job". An extension of it also works.
@@ -191,22 +193,7 @@ def _prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup):
191193
method=method,
192194
)
193195

194-
# Handle Input Data
195-
inputData = paramsDict.get("InputData")
196-
if inputData:
197-
if isinstance(inputData, str):
198-
inputData = inputData.replace(" ", "").split(";")
199-
self._logVerbose(f"Setting input data to {inputData}", transID=transID, method=method)
200-
seqDict["InputData"] = inputData
201-
elif paramSeqDict.get("InputData") is not None:
202-
self._logError("Invalid mixture of jobs with and without input data")
203-
return S_ERROR(ETSDATA, "Invalid mixture of jobs with and without input data")
204-
205-
for paramName, paramValue in paramsDict.items():
206-
if paramName not in ("InputData", "Site", "TargetSE"):
207-
if paramValue:
208-
self._logVerbose(f"Setting {paramName} to {paramValue}", transID=transID, method=method)
209-
seqDict[paramName] = paramValue
196+
inputData = self._handleInputsBulk(seqDict, paramsDict, transID)
210197

211198
outputParameterList = []
212199
if self.outputDataModule:
@@ -235,7 +222,7 @@ def _prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup):
235222
paramSeqDict.setdefault(pName, []).append(seq)
236223

237224
for paramName, paramSeq in paramSeqDict.items():
238-
if paramName in ["JOB_ID", "PRODUCTION_ID", "InputData"] + outputParameterList:
225+
if paramName in self.parametricSequencedKeys + outputParameterList:
239226
res = oJob.setParameterSequence(paramName, paramSeq, addToWorkflow=paramName)
240227
else:
241228
res = oJob.setParameterSequence(paramName, paramSeq)
@@ -399,6 +386,28 @@ def _handleInputs(self, oJob, paramsDict):
399386
if not res["OK"]:
400387
self._logError(f"Could not set the inputs: {res['Message']}", transID=transID, method="_handleInputs")
401388

389+
def _handleInputsBulk(self, seqDict, paramsDict, transID):
390+
"""set job inputs (+ metadata)"""
391+
method = "_handleInputsBulk"
392+
if seqDict:
393+
self._logVerbose(f"Setting job input data to {seqDict}", transID=transID, method=method)
394+
395+
# Handle Input Data
396+
inputData = paramsDict.get("InputData")
397+
if inputData:
398+
if isinstance(inputData, str):
399+
inputData = inputData.replace(" ", "").split(";")
400+
self._logVerbose(f"Setting input data {inputData} to {seqDict}", transID=transID, method=method)
401+
seqDict["InputData"] = inputData
402+
403+
for paramName, paramValue in paramsDict.items():
404+
if paramName not in ("InputData", "Site", "TargetSE"):
405+
if paramValue:
406+
self._logVerbose(f"Setting {paramName} to {paramValue}", transID=transID, method=method)
407+
seqDict[paramName] = paramValue
408+
409+
return inputData
410+
402411
def _handleRest(self, oJob, paramsDict):
403412
"""add as JDL parameters all the other parameters that are not for inputs or destination"""
404413
transID = paramsDict["TransformationID"]

src/DIRAC/TransformationSystem/Client/test/Test_Client_WorkflowTasks.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# pylint: disable=protected-access,missing-docstring,invalid-name
44

55
from unittest.mock import MagicMock
6+
67
import pytest
78

89
from DIRAC import gLogger, S_OK
@@ -136,3 +137,28 @@ def test__handleDestination(mocker, paramsDict, expected):
136137
mocker.patch("DIRAC.TransformationSystem.Client.TaskManagerPlugin.getSitesForSE", side_effect=ourgetSitesForSE)
137138
res = wfTasks._handleDestination(paramsDict)
138139
assert sorted(res) == sorted(expected)
140+
141+
142+
@pytest.mark.parametrize(
143+
"seqDict, paramsDict, expected",
144+
[
145+
({}, {}, None),
146+
({"Site": "Site1", "JobName": "Job1", "JOB_ID": "00000001"}, {}, None),
147+
(
148+
{"Site": "Site1", "JobName": "Job1", "JOB_ID": "00000001"},
149+
{"Site": "Site1", "JobType": "Sprucing", "TransformationID": 1},
150+
None,
151+
),
152+
(
153+
{"Site": "Site1", "JobName": "Job1", "JOB_ID": "00000001"},
154+
{"Site": "Site1", "JobType": "Sprucing", "TransformationID": 1, "InputData": ["a1", "a2"]},
155+
["a1", "a2"],
156+
),
157+
# ({"a1": "aa1", "a2": "aa2", "a3": "aa3"}, {"b1": "bb1", "b2": "bb2", "b3": "bb3"}, {"b1": "bb1", "b2": "bb2"}, ["a1", "a2"]),
158+
],
159+
)
160+
def test__handleInputsBulk(mocker, seqDict, paramsDict, expected):
161+
"""Test the _handleInputsBulk method WorkflowTasks"""
162+
mocker.patch("DIRAC.TransformationSystem.Client.TaskManagerPlugin.getSitesForSE", side_effect=ourgetSitesForSE)
163+
res = wfTasks._handleInputsBulk(seqDict, paramsDict, transID=1)
164+
assert res == expected

0 commit comments

Comments
 (0)