|
6 | 6 |
|
7 | 7 | import time
|
8 | 8 | import threading
|
| 9 | +from concurrent.futures import ThreadPoolExecutor, as_completed |
9 | 10 | from DIRAC import gLogger, S_OK
|
10 | 11 |
|
11 | 12 | from DIRAC.ConfigurationSystem.private.ServiceInterfaceBase import ServiceInterfaceBase
|
12 | 13 | from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData
|
13 |
| -from DIRAC.Core.Utilities.ThreadPool import ThreadPool |
14 | 14 |
|
15 | 15 | __RCSID__ = "$Id$"
|
16 | 16 |
|
@@ -41,19 +41,20 @@ def run(self):
|
41 | 41 |
|
42 | 42 | def _updateServiceConfiguration(self, urlSet, fromMaster=False):
|
43 | 43 | """
|
44 |
| - Update configuration in a set of service in parallel |
| 44 | + Update configuration of a set of slave services in parallel |
45 | 45 |
|
46 | 46 | :param set urlSet: a set of service URLs
|
47 | 47 | :param fromMaster: flag to force updating from the master CS
|
48 | 48 | :return: Nothing
|
49 | 49 | """
|
50 |
| - pool = ThreadPool(len(urlSet)) |
51 |
| - for url in urlSet: |
52 |
| - pool.generateJobAndQueueIt( |
53 |
| - self._forceServiceUpdate, args=[url, fromMaster], kwargs={}, oCallback=self.__processResults |
54 |
| - ) |
55 |
| - pool.processAllResults() |
56 |
| - |
57 |
| - def __processResults(self, _id, result): |
58 |
| - if not result["OK"]: |
59 |
| - gLogger.warn("Failed to update configuration on", result["URL"] + ":" + result["Message"]) |
| 50 | + if not urlSet: |
| 51 | + return |
| 52 | + with ThreadPoolExecutor(max_workers=len(urlSet)) as executor: |
| 53 | + futureUpdate = {executor.submit(self._forceServiceUpdate, url, fromMaster): url for url in urlSet} |
| 54 | + for future in as_completed(futureUpdate): |
| 55 | + url = futureUpdate[future] |
| 56 | + result = future.result() |
| 57 | + if result["OK"]: |
| 58 | + gLogger.info("Successfully updated slave configuration", url) |
| 59 | + else: |
| 60 | + gLogger.error("Failed to update slave configuration", url) |
0 commit comments