@@ -63,6 +63,10 @@ def main():
63
63
from DIRAC .DataManagementSystem .Client .DataManager import DataManager
64
64
from DIRAC .Resources .Storage .StorageElement import StorageElement
65
65
66
+ from multiprocessing import Manager
67
+
68
+ listOfFailedFiles = Manager ().list ()
69
+
66
70
def getSetOfLocalDirectoriesAndFiles (path ):
67
71
"""Return a set of all directories and subdirectories and a set of
68
72
files contained therein for a given local path
@@ -232,9 +236,9 @@ def uploadLocalFile(dm, lfn, localfile, storage):
232
236
"""
233
237
Upload a local file to a storage element
234
238
"""
235
- res = dm .putAndRegister (lfn , localfile , storage , None )
239
+ res = returnSingleResult ( dm .putAndRegister (lfn , localfile , storage , None ) )
236
240
if not res ["OK" ]:
237
- return S_ERROR ("Error: failed to upload %s to %s" % (lfn , storage ))
241
+ return S_ERROR ("Error: failed to upload %s to %s: %s " % (lfn , storage , res [ "Message" ] ))
238
242
else :
239
243
return S_OK ("Successfully uploaded file to %s" % storage )
240
244
@@ -364,7 +368,7 @@ def doUpload(fc, dm, result, source_dir, dest_dir, storage, delete, nthreads):
364
368
gLogger .notice ("Creating " + directoryname + " -> [DONE]" )
365
369
366
370
listOfFiles = result ["Value" ]["Create" ]["Files" ]
367
- # Chech that we do not have to many threads
371
+ # Check that we do not have too many threads
368
372
if nthreads > len (listOfFiles ):
369
373
nthreads = len (listOfFiles )
370
374
@@ -390,7 +394,7 @@ def uploadListOfFiles(dm, source_dir, dest_dir, storage, listOfFiles, tID):
390
394
res = uploadLocalFile (dm , dest_dir + "/" + filename , source_dir + "/" + filename , storage )
391
395
if not res ["OK" ]:
392
396
log .fatal (threadLine + " Uploading " + filename + " -X- [FAILED] " + res ["Message" ])
393
- DIRAC . exit ( 1 )
397
+ listOfFailedFiles . append ( "%s: %s" % ( filename , res [ "Message" ]) )
394
398
else :
395
399
log .notice (threadLine + " Uploading " + filename + " -> [DONE]" )
396
400
@@ -467,7 +471,7 @@ def downloadListOfFiles(dm, source_dir, dest_dir, listOfFiles, tID):
467
471
res = downloadRemoteFile (dm , source_dir + "/" + filename , dest_dir + ("/" + filename ).rsplit ("/" , 1 )[0 ])
468
472
if not res ["OK" ]:
469
473
log .fatal (threadLine + " Downloading " + filename + " -X- [FAILED] " + res ["Message" ])
470
- DIRAC . exit ( 1 )
474
+ listOfFailedFiles . append ( "%s: %s" % ( filename , res [ "Message" ]) )
471
475
else :
472
476
log .notice (threadLine + " Downloading " + filename + " -> [DONE]" )
473
477
@@ -486,9 +490,8 @@ def runInParallel(arguments, listOfLists, function):
486
490
for process in processes :
487
491
process .join ()
488
492
489
- for process in processes :
490
- if process .exitcode == 1 :
491
- return S_ERROR ()
493
+ if any (process .exitcode == 1 for process in processes ):
494
+ return S_ERROR ()
492
495
return S_OK ()
493
496
494
497
def syncDestinations (upload , source_dir , dest_dir , storage , delete , nthreads ):
@@ -546,7 +549,11 @@ def run(parameters, delete, nthreads):
546
549
547
550
return S_OK ("Successfully mirrored " + source_dir + " into " + dest_dir )
548
551
552
+ # This is the execution
549
553
returnValue = run (args , sync , parallel )
554
+ if listOfFailedFiles :
555
+ gLogger .error ("Some file operations failed:\n \t " , "\n \t " .join (listOfFailedFiles ))
556
+ DIRAC .exit (1 )
550
557
if not returnValue ["OK" ]:
551
558
gLogger .fatal (returnValue ["Message" ])
552
559
DIRAC .exit (1 )
0 commit comments