Skip to content

Commit 7ff88ef

Browse files
author
Andrei Tsaregorodtsev
authored
Merge pull request #5588 from andresailer/v7r2_fixDirSync
[v7r2] fix (DMS): directory-sync: better failure logging, later abort
2 parents aa6c6fd + f7f849c commit 7ff88ef

File tree

1 file changed

+19
-57
lines changed

1 file changed

+19
-57
lines changed

src/DIRAC/DataManagementSystem/scripts/dirac_dms_directory_sync.py

Lines changed: 19 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ def main():
6363
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
6464
from DIRAC.Resources.Storage.StorageElement import StorageElement
6565

66+
from multiprocessing import Manager
67+
68+
listOfFailedFiles = Manager().list()
69+
6670
def getSetOfLocalDirectoriesAndFiles(path):
6771
"""Return a set of all directories and subdirectories and a set of
6872
files contained therein for a given local path
@@ -145,20 +149,6 @@ def getSetOfRemoteDirectoriesAndFiles(fc, path):
145149

146150
return S_OK(tree)
147151

148-
def isInFileCatalog(fc, path):
149-
"""
150-
Check if the file is in the File Catalog
151-
"""
152-
153-
result = fc.listDirectory(path)
154-
if result["OK"]:
155-
if result["Value"]["Successful"]:
156-
return S_OK()
157-
else:
158-
return S_ERROR()
159-
else:
160-
return S_ERROR()
161-
162152
def getContentToSync(upload, fc, source_dir, dest_dir):
163153
"""
164154
Return list of files and directories to be create and deleted
@@ -228,26 +218,6 @@ def removeRemoteFiles(dm, lfns):
228218
else:
229219
return S_OK()
230220

231-
def uploadLocalFile(dm, lfn, localfile, storage):
232-
"""
233-
Upload a local file to a storage element
234-
"""
235-
res = dm.putAndRegister(lfn, localfile, storage, None)
236-
if not res["OK"]:
237-
return S_ERROR("Error: failed to upload %s to %s" % (lfn, storage))
238-
else:
239-
return S_OK("Successfully uploaded file to %s" % storage)
240-
241-
def downloadRemoteFile(dm, lfn, destination):
242-
"""
243-
Download a file from the system
244-
"""
245-
res = dm.getFile(lfn, destination)
246-
if not res["OK"]:
247-
return S_ERROR("Error: failed to download %s " % lfn)
248-
else:
249-
return S_OK("Successfully uploaded file %s" % lfn)
250-
251221
def removeStorageDirectoryFromSE(directory, storageElement):
252222
"""
253223
Delete directory on selected storage element
@@ -285,19 +255,6 @@ def removeRemoteDirectory(fc, lfn):
285255

286256
return S_OK("Successfully removed directory")
287257

288-
def createRemoteDirectory(fc, newdir):
289-
"""
290-
Create directory in file catalog
291-
"""
292-
result = fc.createDirectory(newdir)
293-
if result["OK"]:
294-
if result["Value"]["Successful"] and newdir in result["Value"]["Successful"]:
295-
return S_OK("Successfully created directory:" + newdir)
296-
elif result["Value"]["Failed"] and newdir in result["Value"]["Failed"]:
297-
return S_ERROR("Failed to create directory: " + result["Value"]["Failed"][newdir])
298-
else:
299-
return S_ERROR("Failed to create directory:" + result["Message"])
300-
301258
def createLocalDirectory(directory):
302259
"""
303260
Create local directory
@@ -314,7 +271,7 @@ def removeLocalFile(path):
314271
try:
315272
os.remove(path)
316273
except OSError as e:
317-
return S_ERROR("Directory creation failed:" + e.strerror)
274+
return S_ERROR("File deletion failed:" + e.strerror)
318275

319276
if os.path.isfile(path):
320277
return S_ERROR("File deleting failed")
@@ -356,15 +313,15 @@ def doUpload(fc, dm, result, source_dir, dest_dir, storage, delete, nthreads):
356313
gLogger.notice("Deleting " + directoryname + " -> [DONE]")
357314

358315
for directoryname in result["Value"]["Create"]["Directories"]:
359-
res = createRemoteDirectory(fc, dest_dir + "/" + directoryname)
316+
res = returnSingleResult(fc.createDirectory(dest_dir + "/" + directoryname))
360317
if not res["OK"]:
361318
gLogger.fatal("Creation of directory: " + directoryname + " -X- [FAILED] " + res["Message"])
362319
DIRAC.exit(1)
363320
else:
364321
gLogger.notice("Creating " + directoryname + " -> [DONE]")
365322

366323
listOfFiles = result["Value"]["Create"]["Files"]
367-
# Chech that we do not have to many threads
324+
# Check that we do not have too many threads
368325
if nthreads > len(listOfFiles):
369326
nthreads = len(listOfFiles)
370327

@@ -387,10 +344,11 @@ def uploadListOfFiles(dm, source_dir, dest_dir, storage, listOfFiles, tID):
387344
log = gLogger.getLocalSubLogger("[Thread %s] " % tID)
388345
threadLine = "[Thread %s]" % tID
389346
for filename in listOfFiles:
390-
res = uploadLocalFile(dm, dest_dir + "/" + filename, source_dir + "/" + filename, storage)
347+
destLFN = os.path.join(dest_dir, filename)
348+
res = returnSingleResult(dm.putAndRegister(destLFN, source_dir + "/" + filename, storage, None))
391349
if not res["OK"]:
392350
log.fatal(threadLine + " Uploading " + filename + " -X- [FAILED] " + res["Message"])
393-
DIRAC.exit(1)
351+
listOfFailedFiles.append("%s: %s" % (destLFN, res["Message"]))
394352
else:
395353
log.notice(threadLine + " Uploading " + filename + " -> [DONE]")
396354

@@ -464,10 +422,11 @@ def downloadListOfFiles(dm, source_dir, dest_dir, listOfFiles, tID):
464422
log = gLogger.getLocalSubLogger("[Thread %s] " % tID)
465423
threadLine = "[Thread %s]" % tID
466424
for filename in listOfFiles:
467-
res = downloadRemoteFile(dm, source_dir + "/" + filename, dest_dir + ("/" + filename).rsplit("/", 1)[0])
425+
sourceLFN = os.path.join(source_dir, filename)
426+
res = returnSingleResult(dm.getFile(sourceLFN, dest_dir + ("/" + filename).rsplit("/", 1)[0]))
468427
if not res["OK"]:
469428
log.fatal(threadLine + " Downloading " + filename + " -X- [FAILED] " + res["Message"])
470-
DIRAC.exit(1)
429+
listOfFailedFiles.append("%s: %s" % (sourceLFN, res["Message"]))
471430
else:
472431
log.notice(threadLine + " Downloading " + filename + " -> [DONE]")
473432

@@ -486,9 +445,8 @@ def runInParallel(arguments, listOfLists, function):
486445
for process in processes:
487446
process.join()
488447

489-
for process in processes:
490-
if process.exitcode == 1:
491-
return S_ERROR()
448+
if any(process.exitcode == 1 for process in processes):
449+
return S_ERROR()
492450
return S_OK()
493451

494452
def syncDestinations(upload, source_dir, dest_dir, storage, delete, nthreads):
@@ -546,7 +504,11 @@ def run(parameters, delete, nthreads):
546504

547505
return S_OK("Successfully mirrored " + source_dir + " into " + dest_dir)
548506

507+
# This is the execution
549508
returnValue = run(args, sync, parallel)
509+
if listOfFailedFiles:
510+
gLogger.error("Some file operations failed:\n\t", "\n\t".join(listOfFailedFiles))
511+
DIRAC.exit(1)
550512
if not returnValue["OK"]:
551513
gLogger.fatal(returnValue["Message"])
552514
DIRAC.exit(1)

0 commit comments

Comments
 (0)