|
1 | 1 | import base64 |
| 2 | +import concurrent |
2 | 3 | import errno |
3 | 4 | import http.client |
4 | 5 | import logging |
@@ -3407,14 +3408,37 @@ def stop_zookeeper_nodes(self, zk_nodes): |
3407 | 3408 |
|
3408 | 3409 | # Faster than waiting for clean stop |
3409 | 3410 | def kill_zookeeper_nodes(self, zk_nodes): |
3410 | | - for n in zk_nodes: |
3411 | | - logging.info("Killing zookeeper node: %s", n) |
3412 | | - subprocess_check_call(self.base_zookeeper_cmd + ["kill", n]) |
| 3411 | + |
| 3412 | + def kill_keeper(node): |
| 3413 | + logging.info("Killing zookeeper node: %s", node) |
| 3414 | + subprocess_check_call(self.base_zookeeper_cmd + ["kill", node]) |
| 3415 | + logging.info("Killed zookeeper node: %s", node) |
| 3416 | + |
| 3417 | + with concurrent.futures.ThreadPoolExecutor( |
| 3418 | + max_workers=len(zk_nodes) |
| 3419 | + ) as executor: |
| 3420 | + futures = [] |
| 3421 | + for n in zk_nodes: |
| 3422 | + futures += [executor.submit(kill_keeper, n)] |
| 3423 | + |
| 3424 | + for future in concurrent.futures.as_completed(futures): |
| 3425 | + future.result() |
3413 | 3426 |
|
3414 | 3427 | def start_zookeeper_nodes(self, zk_nodes): |
3415 | | - for n in zk_nodes: |
3416 | | - logging.info("Starting zookeeper node: %s", n) |
3417 | | - subprocess_check_call(self.base_zookeeper_cmd + ["start", n]) |
| 3428 | + def start_keeper(node): |
| 3429 | + logging.info("Starting zookeeper node: %s", node) |
| 3430 | + subprocess_check_call(self.base_zookeeper_cmd + ["start", node]) |
| 3431 | + logging.info("Started zookeeper node: %s", node) |
| 3432 | + |
| 3433 | + with concurrent.futures.ThreadPoolExecutor( |
| 3434 | + max_workers=len(zk_nodes) |
| 3435 | + ) as executor: |
| 3436 | + futures = [] |
| 3437 | + for n in zk_nodes: |
| 3438 | + futures += [executor.submit(start_keeper, n)] |
| 3439 | + |
| 3440 | + for future in concurrent.futures.as_completed(futures): |
| 3441 | + future.result() |
3418 | 3442 |
|
3419 | 3443 | def query_all_nodes(self, sql, *args, **kwargs): |
3420 | 3444 | return { |
|
0 commit comments