Skip to content

Commit d2b508e

Browse files
committed
feat (FTS3): support failure for the first leg of a multihop
1 parent e1eac7c commit d2b508e

File tree

1 file changed

+42
-9
lines changed

1 file changed

+42
-9
lines changed

src/DIRAC/DataManagementSystem/Client/FTS3Job.py

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ def monitor(self, context=None, ftsServer=None, ucert=None):
202202
203203
204204
"""
205+
isMultiHop = False
205206

206207
if not self.ftsGUID:
207208
return S_ERROR("FTSGUID not set, FTS job not submitted?")
@@ -221,6 +222,10 @@ def monitor(self, context=None, ftsServer=None, ucert=None):
221222
except FTS3ClientException as e:
222223
return S_ERROR(f"Error getting the job status {e}")
223224

225+
job_metadata = jobStatusDict["job_metadata"]
226+
if isinstance(job_metadata, dict):
227+
isMultiHop = job_metadata.get("isMultiHop", False)
228+
224229
now = datetime.datetime.utcnow().replace(microsecond=0)
225230
self.lastMonitor = now
226231

@@ -230,19 +235,21 @@ def monitor(self, context=None, ftsServer=None, ucert=None):
230235
self.lastUpdate = now
231236
self.error = jobStatusDict["reason"]
232237

233-
if newStatus in self.FINAL_STATES:
234-
self._fillAccountingDicts(jobStatusDict)
238+
# if newStatus in self.FINAL_STATES:
239+
# self._fillAccountingDicts(jobStatusDict)
235240

236241
filesInfoList = jobStatusDict["files"]
242+
# Make a copy, since we are potentially
243+
# deleting objects or editing
244+
orig_filesInfoList = list(filesInfoList) # Make a copy
237245
filesStatus = {}
238246
statusSummary = {}
239247

240-
# Make a copy, since we are potentially
241-
242-
# deleting objects
243-
for fileDict in list(filesInfoList):
248+
for fileDict in orig_filesInfoList:
244249
file_state = fileDict["file_state"].capitalize()
245250
file_metadata = fileDict["file_metadata"]
251+
file_error = fileDict["reason"]
252+
is_recoverable = fileDict["recoverable"]
246253

247254
# previous version of the code did not have dictionary as
248255
# file_metadata
@@ -260,19 +267,40 @@ def monitor(self, context=None, ftsServer=None, ucert=None):
260267
filesInfoList.remove(fileDict)
261268
continue
262269

263-
file_error = fileDict["reason"]
270+
# We know this is a multihop transfer, and we have a file_id,
271+
# this means that this is the final hop, and that there are only
272+
# 2 transfers in this job.
273+
# If the FTS server has the "CancelUnusedMultihopFiles" settings enabled (default False)
274+
# all NOT_USED files in a failed multihop job are canceled by FTS.
275+
# In this case, we will never enter the following if statement.
276+
# Otherwise, a multihop job that failed the first leg needs.
277+
278+
if isMultiHop and file_state == "Not_used":
279+
# First, make sure that we only have 2 transfers, and the first one is failed
280+
if len(orig_filesInfoList) != 2 or orig_filesInfoList[0]["file_state"].capitalize() != "Failed":
281+
return S_ERROR(
282+
errno.EDEADLK,
283+
f"Multihop job {self.ftsGUID} {self.status}has unexpected file states: {[f['file_state'] for f in orig_filesInfoList]}",
284+
)
285+
# Next, get the file_state, file_error and recoverable state from the first leg, as it is not propagated to the second one
286+
# Note that this will only change the values for the local loop, but not the values passed to _fillAccountingDicts.
287+
# That's okay though in this specific case, because we know we are in a failed status, and we will effectively only count
288+
# one failed transfer.
289+
file_state = orig_filesInfoList[0]["file_state"].capitalize()
290+
file_error = orig_filesInfoList[0]["reason"]
291+
is_recoverable = orig_filesInfoList[0].get("recoverable", True)
292+
264293
filesStatus[file_id] = {"status": file_state, "error": file_error}
265294

266295
# If the state of the file is final for FTS, set ftsGUID of the file to None,
267296
# such that it is "released" from this job and not updated anymore in future
268297
# monitoring calls
269298
if file_state in FTS3File.FTS_FINAL_STATES:
270299
filesStatus[file_id]["ftsGUID"] = None
271-
# TODO: update status to defunct if not recoverable here ?
272300

273301
# If the file is failed, check if it is recoverable
274302
if file_state in FTS3File.FTS_FAILED_STATES:
275-
if not fileDict.get("Recoverable", True):
303+
if not is_recoverable:
276304
filesStatus[file_id]["status"] = "Defunct"
277305

278306
# If the file is not in a final state, but the job is, we return an error
@@ -643,6 +671,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
643671
"sourceSE": self.sourceSE,
644672
"targetSE": self.targetSE,
645673
"useTokens": useTokens, # Store the information here to propagate it to submission
674+
"isMultiHop": isMultiHop,
646675
}
647676

648677
if self.activity:
@@ -994,6 +1023,10 @@ def _fillAccountingDicts(self, jobStatusDict):
9941023
if file_state in FTS3File.FTS_SUCCESS_STATES:
9951024
successfulFiles.append(fileDict)
9961025
else:
1026+
# Not that this will also catch the Not_used status
1027+
# in case the job is a multihop that failed on the
1028+
# first leg, but that's ok, because we just count
1029+
# the failed failes, we do not use their metadata
9971030
failedFiles.append(fileDict)
9981031

9991032
job_metadata = jobStatusDict["job_metadata"]

0 commit comments

Comments
 (0)