Skip to content

Commit 1d10868

Browse files
authored
Merge pull request #5651 from DIRACGridBot/cherry-pick-2-5591751d1-integration
[sweep:integration] fix (DMS): directory-sync: better failure logging, later abort
2 parents 17e6d25 + f7cda27 commit 1d10868

File tree

1 file changed

+19
-57
lines changed

1 file changed

+19
-57
lines changed

src/DIRAC/DataManagementSystem/scripts/dirac_dms_directory_sync.py

Lines changed: 19 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ def main():
7070
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
7171
from DIRAC.Resources.Storage.StorageElement import StorageElement
7272

73+
from multiprocessing import Manager
74+
75+
listOfFailedFiles = Manager().list()
76+
7377
def getSetOfLocalDirectoriesAndFiles(path):
7478
"""Return a set of all directories and subdirectories and a set of
7579
files contained therein for a given local path
@@ -152,20 +156,6 @@ def getSetOfRemoteDirectoriesAndFiles(fc, path):
152156

153157
return S_OK(tree)
154158

155-
def isInFileCatalog(fc, path):
156-
"""
157-
Check if the file is in the File Catalog
158-
"""
159-
160-
result = fc.listDirectory(path)
161-
if result["OK"]:
162-
if result["Value"]["Successful"]:
163-
return S_OK()
164-
else:
165-
return S_ERROR()
166-
else:
167-
return S_ERROR()
168-
169159
def getContentToSync(upload, fc, source_dir, dest_dir):
170160
"""
171161
Return list of files and directories to be create and deleted
@@ -235,26 +225,6 @@ def removeRemoteFiles(dm, lfns):
235225
else:
236226
return S_OK()
237227

238-
def uploadLocalFile(dm, lfn, localfile, storage):
239-
"""
240-
Upload a local file to a storage element
241-
"""
242-
res = dm.putAndRegister(lfn, localfile, storage, None)
243-
if not res["OK"]:
244-
return S_ERROR("Error: failed to upload %s to %s" % (lfn, storage))
245-
else:
246-
return S_OK("Successfully uploaded file to %s" % storage)
247-
248-
def downloadRemoteFile(dm, lfn, destination):
249-
"""
250-
Download a file from the system
251-
"""
252-
res = dm.getFile(lfn, destination)
253-
if not res["OK"]:
254-
return S_ERROR("Error: failed to download %s " % lfn)
255-
else:
256-
return S_OK("Successfully uploaded file %s" % lfn)
257-
258228
def removeStorageDirectoryFromSE(directory, storageElement):
259229
"""
260230
Delete directory on selected storage element
@@ -292,19 +262,6 @@ def removeRemoteDirectory(fc, lfn):
292262

293263
return S_OK("Successfully removed directory")
294264

295-
def createRemoteDirectory(fc, newdir):
296-
"""
297-
Create directory in file catalog
298-
"""
299-
result = fc.createDirectory(newdir)
300-
if result["OK"]:
301-
if result["Value"]["Successful"] and newdir in result["Value"]["Successful"]:
302-
return S_OK("Successfully created directory:" + newdir)
303-
elif result["Value"]["Failed"] and newdir in result["Value"]["Failed"]:
304-
return S_ERROR("Failed to create directory: " + result["Value"]["Failed"][newdir])
305-
else:
306-
return S_ERROR("Failed to create directory:" + result["Message"])
307-
308265
def createLocalDirectory(directory):
309266
"""
310267
Create local directory
@@ -321,7 +278,7 @@ def removeLocalFile(path):
321278
try:
322279
os.remove(path)
323280
except OSError as e:
324-
return S_ERROR("Directory creation failed:" + e.strerror)
281+
return S_ERROR("File deletion failed:" + e.strerror)
325282

326283
if os.path.isfile(path):
327284
return S_ERROR("File deleting failed")
@@ -363,15 +320,15 @@ def doUpload(fc, dm, result, source_dir, dest_dir, storage, delete, nthreads):
363320
gLogger.notice("Deleting " + directoryname + " -> [DONE]")
364321

365322
for directoryname in result["Value"]["Create"]["Directories"]:
366-
res = createRemoteDirectory(fc, dest_dir + "/" + directoryname)
323+
res = returnSingleResult(fc.createDirectory(dest_dir + "/" + directoryname))
367324
if not res["OK"]:
368325
gLogger.fatal("Creation of directory: " + directoryname + " -X- [FAILED] " + res["Message"])
369326
DIRAC.exit(1)
370327
else:
371328
gLogger.notice("Creating " + directoryname + " -> [DONE]")
372329

373330
listOfFiles = result["Value"]["Create"]["Files"]
374-
# Chech that we do not have to many threads
331+
# Check that we do not have too many threads
375332
if nthreads > len(listOfFiles):
376333
nthreads = len(listOfFiles)
377334

@@ -394,10 +351,11 @@ def uploadListOfFiles(dm, source_dir, dest_dir, storage, listOfFiles, tID):
394351
log = gLogger.getLocalSubLogger("[Thread %s] " % tID)
395352
threadLine = "[Thread %s]" % tID
396353
for filename in listOfFiles:
397-
res = uploadLocalFile(dm, dest_dir + "/" + filename, source_dir + "/" + filename, storage)
354+
destLFN = os.path.join(dest_dir, filename)
355+
res = returnSingleResult(dm.putAndRegister(destLFN, source_dir + "/" + filename, storage, None))
398356
if not res["OK"]:
399357
log.fatal(threadLine + " Uploading " + filename + " -X- [FAILED] " + res["Message"])
400-
DIRAC.exit(1)
358+
listOfFailedFiles.append("%s: %s" % (destLFN, res["Message"]))
401359
else:
402360
log.notice(threadLine + " Uploading " + filename + " -> [DONE]")
403361

@@ -471,10 +429,11 @@ def downloadListOfFiles(dm, source_dir, dest_dir, listOfFiles, tID):
471429
log = gLogger.getLocalSubLogger("[Thread %s] " % tID)
472430
threadLine = "[Thread %s]" % tID
473431
for filename in listOfFiles:
474-
res = downloadRemoteFile(dm, source_dir + "/" + filename, dest_dir + ("/" + filename).rsplit("/", 1)[0])
432+
sourceLFN = os.path.join(source_dir, filename)
433+
res = returnSingleResult(dm.getFile(sourceLFN, dest_dir + ("/" + filename).rsplit("/", 1)[0]))
475434
if not res["OK"]:
476435
log.fatal(threadLine + " Downloading " + filename + " -X- [FAILED] " + res["Message"])
477-
DIRAC.exit(1)
436+
listOfFailedFiles.append("%s: %s" % (sourceLFN, res["Message"]))
478437
else:
479438
log.notice(threadLine + " Downloading " + filename + " -> [DONE]")
480439

@@ -493,9 +452,8 @@ def runInParallel(arguments, listOfLists, function):
493452
for process in processes:
494453
process.join()
495454

496-
for process in processes:
497-
if process.exitcode == 1:
498-
return S_ERROR()
455+
if any(process.exitcode == 1 for process in processes):
456+
return S_ERROR()
499457
return S_OK()
500458

501459
def syncDestinations(upload, source_dir, dest_dir, storage, delete, nthreads):
@@ -553,7 +511,11 @@ def run(parameters, delete, nthreads):
553511

554512
return S_OK("Successfully mirrored " + source_dir + " into " + dest_dir)
555513

514+
# This is the execution
556515
returnValue = run(args, sync, parallel)
516+
if listOfFailedFiles:
517+
gLogger.error("Some file operations failed:\n\t", "\n\t".join(listOfFailedFiles))
518+
DIRAC.exit(1)
557519
if not returnValue["OK"]:
558520
gLogger.fatal(returnValue["Message"])
559521
DIRAC.exit(1)

0 commit comments

Comments
 (0)