|
29 | 29 | from __future__ import print_function
|
30 | 30 |
|
31 | 31 | import os
|
32 |
| -from multiprocessing import Manager |
| 32 | +import multiprocessing |
| 33 | + |
| 34 | +import six |
33 | 35 |
|
34 | 36 | import DIRAC
|
35 | 37 | from DIRAC import S_OK, S_ERROR, gConfig, gLogger
|
@@ -426,19 +428,25 @@ def runInParallel(arguments, listOfLists, function):
|
426 | 428 | """
|
427 | 429 | Helper for execution of uploads and downloads in parallel
|
428 | 430 | """
|
429 |
| - from multiprocessing import Process |
430 |
| - |
431 |
| - processes = [] |
432 |
| - for tID, alist in enumerate(listOfLists): |
433 |
| - argums = arguments + [alist] + [tID] |
434 |
| - pro = Process(target=function, args=argums) |
435 |
| - pro.start() |
436 |
| - processes.append(pro) |
437 |
| - for process in processes: |
438 |
| - process.join() |
439 |
| - |
440 |
| - if any(process.exitcode == 1 for process in processes): |
441 |
| - return S_ERROR() |
| 431 | + if len(listOfLists) == 1: |
| 432 | + function(*(arguments + listOfLists + [0])) |
| 433 | + else: |
| 434 | + if six.PY3 and multiprocessing.get_start_method() != "fork": # pylint: disable=no-member |
| 435 | + # This assumes that the "fork" strategy is used for new subprocesses |
| 436 | + # so they inherit DIRAC's global state. This is the default on Linux |
| 437 | + # however macOS and Windows use "spawn". |
| 438 | + raise NotImplementedError("Parallel uploading/downloading is only possible on Linux") |
| 439 | + processes = [] |
| 440 | + for tID, alist in enumerate(listOfLists): |
| 441 | + argums = arguments + [alist] + [tID] |
| 442 | + pro = multiprocessing.Process(target=function, args=argums) |
| 443 | + pro.start() |
| 444 | + processes.append(pro) |
| 445 | + for process in processes: |
| 446 | + process.join() |
| 447 | + |
| 448 | + if any(process.exitcode == 1 for process in processes): |
| 449 | + return S_ERROR() |
442 | 450 | return S_OK()
|
443 | 451 |
|
444 | 452 |
|
@@ -521,7 +529,7 @@ def main():
|
521 | 529 | if switch[0].lower() == "j" or switch[0].lower() == "parallel":
|
522 | 530 | parallel = int(switch[1])
|
523 | 531 |
|
524 |
| - listOfFailedFiles = Manager().list() |
| 532 | + listOfFailedFiles = multiprocessing.Manager().list() |
525 | 533 |
|
526 | 534 | # This is the execution
|
527 | 535 | returnValue = run(args, sync, parallel)
|
|
0 commit comments