Skip to content

Commit daf87a2

Browse files
authored
Merge pull request #7032 from aldbr/rel-v8r0_FEAT_RemoteRunnerChecksum
[8.0] feat & fix: check the integrity of the outputs in RemoteRunner
2 parents 7ec0582 + a31e5d9 commit daf87a2

File tree

3 files changed

+140
-40
lines changed

3 files changed

+140
-40
lines changed

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import os
2727
import json
2828
import requests
29+
import shutil
2930

3031
from DIRAC import S_OK, S_ERROR
3132
from DIRAC.Core.Security import Locations
@@ -144,7 +145,7 @@ def _urlJoin(self, command):
144145
"""
145146
return os.path.join(self.base_url, command)
146147

147-
def _request(self, method, query, params=None, data=None, headers=None, timeout=None):
148+
def _request(self, method, query, params=None, data=None, headers=None, timeout=None, stream=False):
148149
"""Perform a request and properly handle the results/exceptions.
149150
150151
:param str method: "post", "get", "put"
@@ -164,12 +165,7 @@ def _request(self, method, query, params=None, data=None, headers=None, timeout=
164165

165166
try:
166167
response = self.session.request(
167-
method,
168-
query,
169-
headers=headers,
170-
params=params,
171-
data=data,
172-
timeout=timeout,
168+
method, query, headers=headers, params=params, data=data, timeout=timeout, stream=stream
173169
)
174170
if not response.ok:
175171
return S_ERROR(f"Response: {response.status_code} - {response.reason}")
@@ -811,20 +807,21 @@ def getJobOutput(self, jobID, workingDirectory=None):
811807
query = self._urlJoin(os.path.join("jobs", job, "session", remoteOutput))
812808

813809
# Submit the GET request to retrieve outputs
814-
result = self._request("get", query)
810+
result = self._request("get", query, stream=True)
815811
if not result["OK"]:
816812
self.log.error("Error downloading", f"{remoteOutput} for {job}: {result['Message']}")
817813
return S_ERROR(f"Error downloading {remoteOutput} for {jobID}")
818814
response = result["Value"]
819-
outputContent = response.text
815+
816+
localOutput = os.path.join(workingDirectory, remoteOutput)
817+
with open(localOutput, "wb") as f:
818+
shutil.copyfileobj(response.raw, f)
820819

821820
if remoteOutput == f"{stamp}.out":
822-
stdout = outputContent
823-
elif remoteOutput == f"{stamp}.err":
824-
stderr = outputContent
825-
else:
826-
localOutput = os.path.join(workingDirectory, remoteOutput)
827-
with open(localOutput, "w") as f:
828-
f.write(outputContent)
821+
with open(localOutput) as f:
822+
stdout = f.read()
823+
if remoteOutput == f"{stamp}.err":
824+
with open(localOutput) as f:
825+
stderr = f.read()
829826

830827
return S_OK((stdout, stderr))

src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
Mostly called by workflow modules, RemoteRunner is generally the last component to get through before
77
the script/application execution on a remote machine.
88
"""
9+
import hashlib
910
import os
1011
import shlex
1112
import time
@@ -22,6 +23,9 @@
2223
class RemoteRunner:
2324
def __init__(self, siteName=None, ceName=None, queueName=None):
2425
self.log = gLogger.getSubLogger("RemoteRunner")
26+
self.executable = "workloadExec.sh"
27+
self.checkSumOutput = "md5Checksum.txt"
28+
2529
self._workloadSite = siteName
2630
if not self._workloadSite:
2731
self.log.warn("You are expected to provide a siteName in parameters from v8.0")
@@ -61,44 +65,44 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
6165
self.log.verbose("Command to submit:", command)
6266

6367
# Check whether CE parameters are set
64-
result = self._checkParameters()
65-
if not result["OK"]:
68+
if not (result := self._checkParameters())["OK"]:
6669
result["Errno"] = DErrno.ESECTION
6770
return result
68-
self.log.verbose(
69-
"The command will be sent to",
71+
self.log.info(
72+
"Preparing and submitting the command to",
7073
f"site {self._workloadSite}, CE {self._workloadCE}, queue {self._workloadQueue}",
7174
)
7275

7376
# Set up Application Queue
74-
result = self._setUpWorkloadCE(numberOfProcessors)
75-
if not result["OK"]:
77+
if not (result := self._setUpWorkloadCE(numberOfProcessors))["OK"]:
7678
result["Errno"] = DErrno.ERESUNA
7779
return result
7880
workloadCE = result["Value"]
7981
self.log.debug("The CE interface has been set up")
8082

8183
# Add the command in an executable file
82-
executable = "workloadExec.sh"
83-
self._wrapCommand(command, workingDirectory, executable)
84+
self._wrapCommand(command, workingDirectory)
8485
self.log.debug("The command has been wrapped into an executable")
8586

8687
# Get inputs from the current working directory
8788
inputs = os.listdir(workingDirectory)
88-
inputs.remove(os.path.basename(executable))
89+
inputs.remove(os.path.basename(self.executable))
8990
self.log.verbose("The executable will be sent along with the following inputs:", ",".join(inputs))
9091
# Request the whole directory as output
9192
outputs = ["/"]
9293

9394
# Submit the command as a job
94-
result = workloadCE.submitJob(executable, workloadCE.proxy, inputs=inputs, outputs=outputs)
95-
if not result["OK"]:
95+
if not (result := workloadCE.submitJob(self.executable, workloadCE.proxy, inputs=inputs, outputs=outputs))[
96+
"OK"
97+
]:
9698
result["Errno"] = DErrno.EWMSSUBM
9799
return result
98100
jobID = result["Value"][0]
99101
stamp = result["PilotStampDict"][jobID]
102+
self.log.info("The command has been wrapped in a job and sent. Remote JobID: ", jobID)
100103

101104
# Get status of the job
105+
self.log.info("Waiting for the end of the job...")
102106
jobStatus = PilotStatus.RUNNING
103107
while jobStatus not in PilotStatus.PILOT_FINAL_STATES:
104108
time.sleep(120)
@@ -107,20 +111,27 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
107111
result["Errno"] = DErrno.EWMSSTATUS
108112
return result
109113
jobStatus = result["Value"][jobID]
110-
self.log.verbose("The final status of the application/script is: ", jobStatus)
114+
self.log.info("The final status of the application/script is: ", jobStatus)
111115

112116
# Get job outputs
113-
result = workloadCE.getJobOutput(f"{jobID}:::{stamp}", os.path.abspath("."))
114-
if not result["OK"]:
117+
self.log.info("Getting the outputs of the command...")
118+
if not (result := workloadCE.getJobOutput(f"{jobID}:::{stamp}", os.path.abspath(".")))["OK"]:
115119
result["Errno"] = DErrno.EWMSJMAN
116120
return result
117121
output, error = result["Value"]
118122

123+
# Make sure the output is correct
124+
self.log.info("Checking the integrity of the outputs...")
125+
if not (result := self._checkOutputIntegrity("."))["OK"]:
126+
result["Errno"] = DErrno.EWMSJMAN
127+
return result
128+
self.log.info("The output has been retrieved and declared complete")
129+
119130
# Clean job in the remote resource
120131
if cleanRemoteJob:
121-
result = workloadCE.cleanJob(jobID)
122-
if not result["OK"]:
132+
if not (result := workloadCE.cleanJob(jobID))["OK"]:
123133
self.log.warn("Failed to clean the output remotely", result["Message"])
134+
self.log.info("The job has been remotely removed")
124135

125136
commandStatus = {"Done": 0, "Failed": -1, "Killed": -2}
126137
return S_OK((commandStatus[jobStatus], output, error))
@@ -190,12 +201,11 @@ def _setUpWorkloadCE(self, numberOfProcessorsPayload=1):
190201

191202
return S_OK(workloadCE)
192203

193-
def _wrapCommand(self, command, workingDirectory, executable):
204+
def _wrapCommand(self, command, workingDirectory):
194205
"""Wrap the command in a file
195206
196207
:param str command: command line to write in the executable
197208
:param str workingDirectory: directory containing the inputs required by the command
198-
:param str executable: path of the executable that should contain the command to submit
199209
:return: path of the executable
200210
"""
201211
# Check whether the command contains any absolute path: there would be no way to access them remotely
@@ -219,5 +229,34 @@ def _wrapCommand(self, command, workingDirectory, executable):
219229
argumentsProcessed.append(os.path.join(".", os.path.basename(argument)))
220230

221231
command = shlex.join(argumentsProcessed)
222-
with open(executable, "w") as f:
232+
with open(self.executable, "w") as f:
223233
f.write(command)
234+
# Post-processing: compute the checksum of the outputs
235+
f.write(f"\nmd5sum * > {self.checkSumOutput}")
236+
237+
def _checkOutputIntegrity(self, workingDirectory):
238+
"""Make sure that output files are not corrupted.
239+
240+
:param str workingDirectory: path of the outputs
241+
"""
242+
checkSumOutput = os.path.join(workingDirectory, self.checkSumOutput)
243+
if not os.path.exists(checkSumOutput):
244+
return S_ERROR(f"Cannot guarantee the integrity of the outputs: {checkSumOutput} unavailable")
245+
246+
with open(checkSumOutput) as f:
247+
# for each output file, compute the md5 checksum
248+
for line in f:
249+
checkSum, remoteOutput = list(filter(None, line.strip("\n").split(" ")))
250+
251+
hash = hashlib.md5()
252+
localOutput = os.path.join(workingDirectory, remoteOutput)
253+
if not os.path.exists(localOutput):
254+
return S_ERROR(f"{localOutput} was expected but not found")
255+
256+
with open(localOutput, "rb") as f:
257+
while chunk := f.read(128 * hash.block_size):
258+
hash.update(chunk)
259+
if checkSum != hash.hexdigest():
260+
return S_ERROR(f"{localOutput} is corrupted")
261+
262+
return S_OK()

src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_RemoteRunner.py

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from diraccfg import CFG
88

99
# DIRAC Components
10-
from DIRAC import gLogger, gConfig, S_OK
10+
from DIRAC import gLogger, gConfig, S_OK, S_ERROR
1111
from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData
1212
from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error
1313
from DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner import RemoteRunner
@@ -40,13 +40,16 @@ def test__wrapCommand(command, workingDirectory, expectedContent):
4040

4141
# Instantiate a RemoteRunner and wrap the command
4242
remoteRunner = RemoteRunner("Site1", "CE1", "queue1")
43-
remoteRunner._wrapCommand(command, workingDirectory, executable)
43+
remoteRunner._wrapCommand(command, workingDirectory)
4444

4545
# Test the results
46-
assert os.path.isfile(executable)
47-
with open(executable) as f:
46+
assert os.path.isfile(remoteRunner.executable)
47+
with open(remoteRunner.executable) as f:
4848
content = f.read()
49-
os.remove(executable)
49+
os.remove(remoteRunner.executable)
50+
51+
# This line is added at the end of the wrapper for any command
52+
expectedContent += f"\nmd5sum * > {remoteRunner.checkSumOutput}"
5053
assert content == expectedContent
5154

5255

@@ -105,3 +108,64 @@ def test__setUpWorkloadCE(
105108
assert workloadCE.ceParameters["NumberOfProcessors"] == expectedNumberOfProcessors
106109
else:
107110
assert result["Message"] == expectedNumberOfProcessors
111+
112+
113+
@pytest.mark.parametrize(
114+
"checkSumDict, expectedResult",
115+
[
116+
# Normal case
117+
({"file1.txt": "826e8142e6baabe8af779f5f490cf5f5", "file2.txt": "1c1c96fd2cf8330db0bfa936ce82f3b9"}, S_OK()),
118+
# Files are corrupted
119+
(
120+
{"file1.txt": "c12f72e7b198fdbfe5f70c66dc6082c8", "file2.txt": "5ec149e38f09fb716b1e0f4cf23af679"},
121+
S_ERROR("./file1.txt is corrupted"),
122+
),
123+
(
124+
{"file1.txt": "826e8142e6baabe8af779f5f490cf5f5", "file2.txt": "5ec149e38f09fb716b1e0f4cf23af679"},
125+
S_ERROR("./file2.txt is corrupted"),
126+
),
127+
# Files do not exist
128+
(
129+
{
130+
"file3.txt": "826e8142e6baabe8af779f5f490cf5f5",
131+
},
132+
S_ERROR("./file3.txt was expected but not found"),
133+
),
134+
# remoteRunner.checkSumOutput is empty
135+
({}, S_OK()),
136+
# remoteRunner.checkSumOutput does not exist
137+
(None, S_ERROR("Cannot guarantee the integrity of the outputs")),
138+
],
139+
)
140+
def test__checkOutputIntegrity(checkSumDict, expectedResult):
141+
"""Test RemoteRunner()._checkOutputIntegrity()"""
142+
# Instantiate a RemoteRunner
143+
remoteRunner = RemoteRunner("Site1", "CE1", "queue1")
144+
145+
# Create some files in workingDirectory
146+
workingDirectory = "."
147+
with open(os.path.join(workingDirectory, "file1.txt"), "w") as f:
148+
f.write("file1")
149+
with open(os.path.join(workingDirectory, "file2.txt"), "w") as f:
150+
f.write("file2")
151+
152+
# Create remoteRunner.checkSumOutput
153+
if checkSumDict is not None:
154+
with open(os.path.join(workingDirectory, remoteRunner.checkSumOutput), "w") as f:
155+
for file, checkSum in checkSumDict.items():
156+
f.write(f"{checkSum} {file}\n")
157+
158+
# Check the integrity of the output
159+
result = remoteRunner._checkOutputIntegrity(".")
160+
161+
# Test the results
162+
print(result)
163+
assert result["OK"] is expectedResult["OK"]
164+
if not expectedResult["OK"]:
165+
assert expectedResult["Message"] in result["Message"]
166+
167+
# Delete files
168+
os.remove(os.path.join(workingDirectory, "file1.txt"))
169+
os.remove(os.path.join(workingDirectory, "file2.txt"))
170+
if os.path.exists(os.path.join(workingDirectory, remoteRunner.checkSumOutput)):
171+
os.remove(remoteRunner.checkSumOutput)

0 commit comments

Comments
 (0)