Skip to content

Commit b50ca8d

Browse files
committed
feat: check the integrity of the RemoteRunner outputs
1 parent 806491d commit b50ca8d

File tree

2 files changed

+123
-15
lines changed

2 files changed

+123
-15
lines changed

src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
Mostly called by workflow modules, RemoteRunner is generally the last component to get through before
77
the script/application execution on a remote machine.
88
"""
9+
import hashlib
910
import os
1011
import shlex
1112
import time
@@ -22,6 +23,9 @@
2223
class RemoteRunner:
2324
def __init__(self, siteName=None, ceName=None, queueName=None):
2425
self.log = gLogger.getSubLogger("RemoteRunner")
26+
self.executable = "workloadExec.sh"
27+
self.checkSumOutput = "md5Checksum.txt"
28+
2529
self._workloadSite = siteName
2630
if not self._workloadSite:
2731
self.log.warn("You are expected to provide a siteName in parameters from v8.0")
@@ -65,8 +69,8 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
6569
if not result["OK"]:
6670
result["Errno"] = DErrno.ESECTION
6771
return result
68-
self.log.verbose(
69-
"The command will be sent to",
72+
self.log.info(
73+
"Preparing and submitting the command to",
7074
f"site {self._workloadSite}, CE {self._workloadCE}, queue {self._workloadQueue}",
7175
)
7276

@@ -79,26 +83,27 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
7983
self.log.debug("The CE interface has been set up")
8084

8185
# Add the command in an executable file
82-
executable = "workloadExec.sh"
83-
self._wrapCommand(command, workingDirectory, executable)
86+
self._wrapCommand(command, workingDirectory)
8487
self.log.debug("The command has been wrapped into an executable")
8588

8689
# Get inputs from the current working directory
8790
inputs = os.listdir(workingDirectory)
88-
inputs.remove(os.path.basename(executable))
91+
inputs.remove(os.path.basename(self.executable))
8992
self.log.verbose("The executable will be sent along with the following inputs:", ",".join(inputs))
9093
# Request the whole directory as output
9194
outputs = ["/"]
9295

9396
# Submit the command as a job
94-
result = workloadCE.submitJob(executable, workloadCE.proxy, inputs=inputs, outputs=outputs)
97+
result = workloadCE.submitJob(self.executable, workloadCE.proxy, inputs=inputs, outputs=outputs)
9598
if not result["OK"]:
9699
result["Errno"] = DErrno.EWMSSUBM
97100
return result
98101
jobID = result["Value"][0]
99102
stamp = result["PilotStampDict"][jobID]
103+
self.log.info("The command has been wrapped in a job and sent. Remote JobID: ", jobID)
100104

101105
# Get status of the job
106+
self.log.info("Waiting for the end of the job...")
102107
jobStatus = PilotStatus.RUNNING
103108
while jobStatus not in PilotStatus.PILOT_FINAL_STATES:
104109
time.sleep(120)
@@ -107,20 +112,30 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
107112
result["Errno"] = DErrno.EWMSSTATUS
108113
return result
109114
jobStatus = result["Value"][jobID]
110-
self.log.verbose("The final status of the application/script is: ", jobStatus)
115+
self.log.info("The final status of the application/script is: ", jobStatus)
111116

112117
# Get job outputs
118+
self.log.info("Getting the outputs of the command...")
113119
result = workloadCE.getJobOutput(f"{jobID}:::{stamp}", os.path.abspath("."))
114120
if not result["OK"]:
115121
result["Errno"] = DErrno.EWMSJMAN
116122
return result
117123
output, error = result["Value"]
118124

125+
# Make sure the output is correct
126+
self.log.info("Checking the integrity of the outputs...")
127+
result = self._checkOutputIntegrity(".")
128+
if not result["OK"]:
129+
result["Errno"] = DErrno.EWMSJMAN
130+
return result
131+
self.log.info("The output has been retrieved and declared complete")
132+
119133
# Clean job in the remote resource
120134
if cleanRemoteJob:
121135
result = workloadCE.cleanJob(jobID)
122136
if not result["OK"]:
123137
self.log.warn("Failed to clean the output remotely", result["Message"])
138+
self.log.info("The job has been remotely removed")
124139

125140
commandStatus = {"Done": 0, "Failed": -1, "Killed": -2}
126141
return S_OK((commandStatus[jobStatus], output, error))
@@ -190,12 +205,11 @@ def _setUpWorkloadCE(self, numberOfProcessorsPayload=1):
190205

191206
return S_OK(workloadCE)
192207

193-
def _wrapCommand(self, command, workingDirectory, executable):
208+
def _wrapCommand(self, command, workingDirectory):
194209
"""Wrap the command in a file
195210
196211
:param str command: command line to write in the executable
197212
:param str workingDirectory: directory containing the inputs required by the command
198-
:param str executable: path of the executable that should contain the command to submit
199213
:return: path of the executable
200214
"""
201215
# Check whether the command contains any absolute path: there would be no way to access them remotely
@@ -219,5 +233,35 @@ def _wrapCommand(self, command, workingDirectory, executable):
219233
argumentsProcessed.append(os.path.join(".", os.path.basename(argument)))
220234

221235
command = shlex.join(argumentsProcessed)
222-
with open(executable, "w") as f:
236+
with open(self.executable, "w") as f:
223237
f.write(command)
238+
# Post-processing: compute the checksum of the outputs
239+
f.write(f"\nmd5sum * > {self.checkSumOutput}")
240+
241+
def _checkOutputIntegrity(self, workingDirectory):
242+
"""Make sure that output files are not corrupted.
243+
244+
:param str workingDirectory: path of the outputs
245+
"""
246+
checkSumOutput = os.path.join(workingDirectory, self.checkSumOutput)
247+
if not os.path.exists(checkSumOutput):
248+
return S_ERROR(f"Cannot guarantee the integrity of the outputs: {checkSumOutput} unavailable")
249+
250+
with open(checkSumOutput) as f:
251+
# for each output file, compute the md5 checksum
252+
for line in f:
253+
checkSum, remoteOutput = list(filter(None, line.strip("\n").split(" ")))
254+
255+
hash = hashlib.md5()
256+
localOutput = os.path.join(workingDirectory, remoteOutput)
257+
if not os.path.exists(localOutput):
258+
return S_ERROR(f"{localOutput} was expected but not found")
259+
260+
with open(localOutput, "rb") as f:
261+
while chunk := f.read(128 * hash.block_size):
262+
hash.update(chunk)
263+
if checkSum != hash.hexdigest():
264+
print(hash.hexdigest())
265+
return S_ERROR(f"{localOutput} is corrupted")
266+
267+
return S_OK()

src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_RemoteRunner.py

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from diraccfg import CFG
88

99
# DIRAC Components
10-
from DIRAC import gLogger, gConfig, S_OK
10+
from DIRAC import gLogger, gConfig, S_OK, S_ERROR
1111
from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData
1212
from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error
1313
from DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner import RemoteRunner
@@ -40,13 +40,16 @@ def test__wrapCommand(command, workingDirectory, expectedContent):
4040

4141
# Instantiate a RemoteRunner and wrap the command
4242
remoteRunner = RemoteRunner("Site1", "CE1", "queue1")
43-
remoteRunner._wrapCommand(command, workingDirectory, executable)
43+
remoteRunner._wrapCommand(command, workingDirectory)
4444

4545
# Test the results
46-
assert os.path.isfile(executable)
47-
with open(executable) as f:
46+
assert os.path.isfile(remoteRunner.executable)
47+
with open(remoteRunner.executable) as f:
4848
content = f.read()
49-
os.remove(executable)
49+
os.remove(remoteRunner.executable)
50+
51+
# This line is added at the end of the wrapper for any command
52+
expectedContent += f"\nmd5sum * > {remoteRunner.checkSumOutput}"
5053
assert content == expectedContent
5154

5255

@@ -105,3 +108,64 @@ def test__setUpWorkloadCE(
105108
assert workloadCE.ceParameters["NumberOfProcessors"] == expectedNumberOfProcessors
106109
else:
107110
assert result["Message"] == expectedNumberOfProcessors
111+
112+
113+
@pytest.mark.parametrize(
114+
"checkSumDict, expectedResult",
115+
[
116+
# Normal case
117+
({"file1.txt": "826e8142e6baabe8af779f5f490cf5f5", "file2.txt": "1c1c96fd2cf8330db0bfa936ce82f3b9"}, S_OK()),
118+
# Files are corrupted
119+
(
120+
{"file1.txt": "c12f72e7b198fdbfe5f70c66dc6082c8", "file2.txt": "5ec149e38f09fb716b1e0f4cf23af679"},
121+
S_ERROR("./file1.txt is corrupted"),
122+
),
123+
(
124+
{"file1.txt": "826e8142e6baabe8af779f5f490cf5f5", "file2.txt": "5ec149e38f09fb716b1e0f4cf23af679"},
125+
S_ERROR("./file2.txt is corrupted"),
126+
),
127+
# Files do not exist
128+
(
129+
{
130+
"file3.txt": "826e8142e6baabe8af779f5f490cf5f5",
131+
},
132+
S_ERROR("./file3.txt was expected but not found"),
133+
),
134+
# remoteRunner.checkSumOutput is empty
135+
({}, S_OK()),
136+
# remoteRunner.checkSumOutput does not exist
137+
(None, S_ERROR("Cannot guarantee the integrity of the outputs")),
138+
],
139+
)
140+
def test__checkOutputIntegrity(checkSumDict, expectedResult):
141+
"""Test RemoteRunner()._checkOutputIntegrity()"""
142+
# Instantiate a RemoteRunner
143+
remoteRunner = RemoteRunner("Site1", "CE1", "queue1")
144+
145+
# Create some files in workingDirectory
146+
workingDirectory = "."
147+
with open(os.path.join(workingDirectory, "file1.txt"), "w") as f:
148+
f.write("file1")
149+
with open(os.path.join(workingDirectory, "file2.txt"), "w") as f:
150+
f.write("file2")
151+
152+
# Create remoteRunner.checkSumOutput
153+
if checkSumDict is not None:
154+
with open(os.path.join(workingDirectory, remoteRunner.checkSumOutput), "w") as f:
155+
for file, checkSum in checkSumDict.items():
156+
f.write(f"{checkSum} {file}\n")
157+
158+
# Check the integrity of the output
159+
result = remoteRunner._checkOutputIntegrity(".")
160+
161+
# Test the results
162+
print(result)
163+
assert result["OK"] is expectedResult["OK"]
164+
if not expectedResult["OK"]:
165+
assert expectedResult["Message"] in result["Message"]
166+
167+
# Delete files
168+
os.remove(os.path.join(workingDirectory, "file1.txt"))
169+
os.remove(os.path.join(workingDirectory, "file2.txt"))
170+
if os.path.exists(os.path.join(workingDirectory, remoteRunner.checkSumOutput)):
171+
os.remove(remoteRunner.checkSumOutput)

0 commit comments

Comments
 (0)