Skip to content

Commit 954dd29

Browse files
committed
up & down in parallel
1 parent e431e7a commit 954dd29

File tree

3 files changed

+171
-59
lines changed

3 files changed

+171
-59
lines changed

images/utils/launcher/node/__init__.py

Lines changed: 79 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from docker.errors import NotFound
1414
from docker.types import IPAMConfig, IPAMPool
1515

16-
from launcher.utils import ArgumentParser, yes_or_no
16+
from launcher.utils import ArgumentParser, yes_or_no, parallel
1717
from .DockerTemplate import DockerTemplate
1818
from .arby import Arby
1919
from .base import Node, ContainerNotFound, NoWaiting
@@ -69,7 +69,8 @@ def execute(self, *args) -> None:
6969
args = self.parser.parse_args(args)
7070
name = args.service
7171
service = self.get_service(name)
72-
for line in service.logs(tail=args.tail, since=args.since, until=args.until, follow=args.follow, timestamps=args.timestamps):
72+
for line in service.logs(tail=args.tail, since=args.since, until=args.until, follow=args.follow,
73+
timestamps=args.timestamps):
7374
print(line)
7475

7576

@@ -151,6 +152,10 @@ class ServiceNotFound(Exception):
151152
pass
152153

153154

155+
class NetworkNotFound(Exception):
156+
pass
157+
158+
154159
class NodeManager:
155160
config: Config
156161
client: DockerClient
@@ -165,7 +170,7 @@ def __init__(self, config: Config):
165170
ctx = Context(self.config, self.client, self.image_manager, self)
166171

167172
self.nodes = {name: globals()[name.capitalize()](name, ctx) for name in self.config.nodes}
168-
self.docker_network = self.create_docker_network()
173+
# self.docker_network = self.create_docker_network()
169174

170175
self.cmd_logs = LogsCommand(self.get_service)
171176
self.cmd_start = StartCommand(self.get_service)
@@ -197,17 +202,23 @@ def get_network_ipam_pool(self):
197202
elif self.network == "mainnet":
198203
return IPAMPool(subnet='10.0.3.0/24', gateway='10.0.3.1')
199204

200-
def create_docker_network(self):
201-
name = self.network_name
202-
try:
203-
network = self.client.networks.get(name)
204-
return network
205-
except NotFound:
206-
pass
205+
def _create_docker_network(self) -> None:
207206
ipam_pool = self.get_network_ipam_pool()
208207
ipam_config = IPAMConfig(pool_configs=[ipam_pool])
209-
network = self.client.networks.create(name, driver="bridge", ipam=ipam_config)
210-
return network
208+
network = self.client.networks.create(self.network_name, driver="bridge", ipam=ipam_config)
209+
logger.info("Created network: %r", network)
210+
211+
def _remove_docker_network(self) -> None:
212+
network = self.docker_network
213+
network.remove()
214+
logger.info("Removed network: %r", network)
215+
216+
@property
217+
def docker_network(self):
218+
try:
219+
return self.client.networks.get(self.network_name)
220+
except docker.errors.NotFound as e:
221+
raise NetworkNotFound(self.network_name) from e
211222

212223
def get_service(self, name: str) -> Node:
213224
try:
@@ -216,46 +227,46 @@ def get_service(self, name: str) -> Node:
216227
raise ServiceNotFound(name) from e
217228

218229
@property
219-
def valid_nodes(self):
230+
def valid_nodes(self) -> Dict[str, Node]:
220231
return {name: node for name, node in self.nodes.items() if node.mode == "native" and not node.disabled}
221232

222-
@property
223-
def enabled_nodes(self):
224-
return {name: node for name, node in self.nodes.items() if not node.disabled}
225-
226233
def up(self):
227-
self.docker_network = self.create_docker_network()
234+
nodes = self.valid_nodes
228235

229-
nodes = self.valid_nodes.values()
236+
logger.info("Up services: %s", ", ".join(nodes))
230237

231-
executor = self.config.executor
232-
futs = {executor.submit(node.start): node for node in nodes}
233-
done, not_done = wait(futs, 60)
234-
if len(not_done) > 0:
235-
# TODO retry failed tasks
236-
raise RuntimeError("Failed to up")
238+
try:
239+
_ = self.docker_network
240+
except NetworkNotFound:
241+
self._create_docker_network()
242+
243+
def linehead(node):
244+
return "starting %s" % node.container_name
245+
246+
def start(node, stop):
247+
node.start()
248+
249+
nodes = [node for node in nodes.values() if not node.is_running]
250+
251+
parallel(self.config.executor, nodes, linehead, start)
237252

238253
def down(self):
239254
nodes = self.valid_nodes
240255

241-
for name, container in nodes.items():
242-
print(f"Stopping {name}...")
243-
container.stop()
244-
for name, container in nodes.items():
245-
print(f"Removing {name}...")
246-
container.remove()
256+
logger.info("Down services: %s", ", ".join(nodes))
257+
258+
running_nodes = [node for node in nodes.values() if node.is_running]
259+
260+
parallel(self.config.executor, running_nodes,
261+
lambda node: "stopping %s" % node.container_name,
262+
lambda node, stop: node.stop())
263+
264+
parallel(self.config.executor, list(nodes.values()),
265+
lambda node: "removing %s" % node.container_name,
266+
lambda node, stop: node.remove())
267+
247268
print(f"Removing network {self.network_name}")
248-
self.docker_network.remove()
249-
250-
def _display_container_status_text(self, status):
251-
if status == "missing":
252-
return "create"
253-
elif status == "outdated":
254-
return "recreate"
255-
elif status == "external_with_container":
256-
return "remove"
257-
elif status == "disabled_with_container":
258-
return "remove"
269+
self._remove_docker_network()
259270

260271
def check_for_updates(self) -> Dict[Node, str]:
261272
logger.info("Checking for container updates")
@@ -305,24 +316,35 @@ def update(self) -> None:
305316
if b1 and b2:
306317
print("All up-to-date.")
307318

308-
for container, action in containers.items():
319+
def linehead(node):
320+
action = containers[node]
309321
if action == "CREATE":
310-
print("Creating %s..." % container.container_name)
311-
container.create()
322+
return "creating %s" % node.container_name
312323
elif action == "RECREATE":
313-
if container.is_running:
314-
print("Stopping %s..." % container.container_name)
315-
container.stop()
316-
print("Removing %s..." % container.container_name)
317-
container.remove()
318-
print("Creating %s..." % container.container_name)
319-
container.create()
324+
return "recreating %s" % node.container_name
320325
elif action == "REMOVE":
321-
if container.is_running:
322-
print("Stopping %s..." % container.container_name)
323-
container.stop()
324-
print("Removing %s..." % container.container_name)
325-
container.remove()
326+
return "removing %s" % node.container_name
327+
328+
def update(node, stop):
329+
action = containers[node]
330+
if action == "CREATE":
331+
node.create()
332+
elif action == "RECREATE":
333+
if node.is_running:
334+
node.stop()
335+
node.remove()
336+
node.create()
337+
elif action == "REMOVE":
338+
if node.is_running:
339+
node.stop()
340+
node.remove()
341+
342+
items = []
343+
for container, action in containers.items():
344+
if action != "NONE":
345+
items.append(container)
346+
347+
parallel(self.config.executor, items, linehead, update)
326348

327349
def _get_status_nodes(self):
328350
optional_nodes = ["arby", "boltz", "webui", "proxy"]

images/utils/launcher/node/xud.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,19 @@ def getinfo(self):
3434
raise XudApiError(e.output)
3535

3636

37+
class InvalidPassword(Exception):
38+
pass
39+
40+
3741
class PasswordNotMatch(Exception):
3842
pass
3943

4044

41-
class MnemonicNot24Words(Exception):
45+
class WrongPassword(Exception):
4246
pass
4347

4448

45-
class InvalidPassword(Exception):
49+
class MnemonicNot24Words(Exception):
4650
pass
4751

4852

@@ -168,6 +172,8 @@ def extract_exception(self, cmd: str, output: str):
168172
elif cmd.startswith("unlock"):
169173
if "xud was unlocked succesfully" in output:
170174
return
175+
elif "password is incorrect" in output:
176+
raise WrongPassword
171177
else:
172178
print("^C")
173179
raise KeyboardInterrupt()

images/utils/launcher/utils.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import argparse
22
import logging
33
import os
4+
import random
5+
import threading
6+
import time
7+
from concurrent.futures import ThreadPoolExecutor
8+
from typing import List, Callable, TypeVar
49

510
logger = logging.getLogger(__name__)
611

@@ -79,3 +84,82 @@ def get_percentage(current, total):
7984
else:
8085
p = 0
8186
return "%.2f%% (%d/%d)" % (p, current, total)
87+
88+
89+
def color(text: str) -> str:
90+
if text == "done":
91+
return "\033[32mdone\033[0m"
92+
elif text == "error":
93+
return "\033[31mfailed\033[0m"
94+
else:
95+
return text
96+
97+
98+
T = TypeVar('T')
99+
100+
101+
def parallel(
102+
executor: ThreadPoolExecutor,
103+
items: List[T],
104+
linehead: Callable[[T], str],
105+
run: Callable[[T, threading.Event], None]
106+
):
107+
result = {item: None for item in items}
108+
stop = threading.Event()
109+
110+
def animate():
111+
nonlocal result
112+
nonlocal stop
113+
114+
lines = []
115+
width = 0
116+
for item in items:
117+
line = linehead(item)
118+
line = line.capitalize()
119+
if len(line) > width:
120+
width = len(line)
121+
lines.append(line)
122+
fmt = "%-{}s ...".format(width)
123+
lines = [fmt % line for line in lines]
124+
print("\n".join(lines))
125+
126+
i = 0
127+
while not stop.is_set():
128+
print("\033[%dA" % len(items), end="", flush=True)
129+
finish = 0
130+
for item in items:
131+
r = result[item]
132+
if r:
133+
suffix = "... " + color(r)
134+
suffix_len = 4 + len(r)
135+
finish += 1
136+
else:
137+
suffix = "%-3s" % ("." * abs(3 - i % 6))
138+
suffix_len = 3
139+
print("\033[%dC" % (width + 1), end="", flush=True)
140+
print(suffix, end="", flush=True)
141+
print("\033[%dD\033[1B" % (width + 1 + suffix_len), end="", flush=True)
142+
print("\033[K", end="", flush=True)
143+
if finish == len(items):
144+
break
145+
stop.wait(0.5)
146+
i += 1
147+
148+
def wrapper(item):
149+
nonlocal result
150+
nonlocal stop
151+
try:
152+
run(item, stop)
153+
# time.sleep(random.randint(3, 10))
154+
result[item] = "done"
155+
except:
156+
logger.exception("Failed to %s", linehead(item))
157+
result[item] = "error"
158+
159+
f = executor.submit(animate)
160+
try:
161+
for item in items:
162+
executor.submit(wrapper, item)
163+
f.result()
164+
finally:
165+
stop.set()

0 commit comments

Comments
 (0)