Skip to content

Commit a76b7e5

Browse files
[update-checkout] reduce the script's default verbosity
1 parent 3ee222f commit a76b7e5

File tree

3 files changed

+169
-82
lines changed

3 files changed

+169
-82
lines changed
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
from .update_checkout import main
32

43
__all__ = ["main"]
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
from multiprocessing.managers import ListProxy, ValueProxy
2+
import sys
3+
from multiprocessing import Pool, cpu_count, Manager
4+
import time
5+
from typing import Callable, List, Any
6+
from threading import Thread, Event, Lock
7+
import shutil
8+
9+
class MonitoredFunction:
10+
def __init__(self, fn: Callable, running_tasks: ListProxy, updated_repos: ValueProxy, lock: Lock):
11+
self.fn = fn
12+
self.running_tasks = running_tasks
13+
self.updated_repos = updated_repos
14+
self._lock = lock
15+
16+
def __call__(self, *args):
17+
task_name = args[0][2]
18+
self.running_tasks.append(task_name)
19+
try:
20+
return self.fn(*args)
21+
finally:
22+
self._lock.acquire()
23+
self.running_tasks.remove(task_name)
24+
self.updated_repos.set(self.updated_repos.get() + 1)
25+
self._lock.release()
26+
27+
28+
class ParallelRunner:
29+
def __init__(self, fn: Callable, pool_args: List[List[Any]], n_processes: int = 0):
30+
self._monitor_polling_period = 0.1
31+
if n_processes == 0:
32+
n_processes = cpu_count() * 2
33+
self._terminal_width = shutil.get_terminal_size().columns
34+
self._n_processes = n_processes
35+
self._pool_args = pool_args
36+
self._fn = fn
37+
self._lock = Manager().Lock()
38+
self._pool = Pool(
39+
processes=self._n_processes, initializer=self._child_init, initargs=(self._lock,)
40+
)
41+
self._verbose = pool_args[0][len(pool_args[0]) - 1]
42+
self._nb_repos = len(pool_args)
43+
self._stop_event = Event()
44+
self._running_tasks = Manager().list()
45+
self._updated_repos = Manager().Value('i', 0)
46+
self._monitored_fn = MonitoredFunction(self._fn, self._running_tasks, self._updated_repos, self._lock)
47+
48+
def run(self) -> List[Any]:
49+
print(
50+
"Running ``%s`` with up to %d processes."
51+
% (self._fn.__name__, self._n_processes)
52+
)
53+
54+
if self._verbose:
55+
results = self._pool.map_async(
56+
func=self._fn, iterable=self._pool_args
57+
).get()
58+
self._pool.close()
59+
self._pool.join()
60+
else:
61+
monitor_thread = Thread(target=self._monitor, daemon=True)
62+
monitor_thread.start()
63+
results = self._pool.map_async(
64+
func=self._monitored_fn, iterable=self._pool_args
65+
).get()
66+
self._pool.close()
67+
self._pool.join()
68+
self._stop_event.set()
69+
monitor_thread.join()
70+
return results
71+
72+
def _monitor(self):
73+
last_output = ""
74+
while not self._stop_event.is_set():
75+
current = list(self._running_tasks)
76+
current_line = ", ".join(current)
77+
78+
if current_line != last_output:
79+
truncated = f"Updating [{self._updated_repos.get()}/{self._nb_repos}] ({current_line})"
80+
if len(truncated) > self._terminal_width:
81+
ellipsis_marker = " ..."
82+
truncated = (
83+
truncated[: self._terminal_width - len(ellipsis_marker)]
84+
+ ellipsis_marker
85+
)
86+
sys.stdout.write("\r" + truncated.ljust(self._terminal_width))
87+
sys.stdout.flush()
88+
last_output = current_line
89+
90+
time.sleep(self._monitor_polling_period)
91+
92+
sys.stdout.write("\r" + " " * len(last_output) + "\r")
93+
sys.stdout.flush()
94+
95+
@staticmethod
96+
def _clear_lines(n):
97+
for _ in range(n):
98+
sys.stdout.write("\x1b[1A")
99+
sys.stdout.write("\x1b[2K")
100+
101+
@staticmethod
102+
def check_results(results, op):
103+
"""Function used to check the results of ParallelRunner.
104+
105+
NOTE: This function was originally located in the shell module of
106+
swift_build_support and should eventually be replaced with a better
107+
parallel implementation.
108+
"""
109+
110+
fail_count = 0
111+
if results is None:
112+
return 0
113+
for r in results:
114+
if r is not None:
115+
if fail_count == 0:
116+
print("======%s FAILURES======" % op)
117+
fail_count += 1
118+
if isinstance(r, str):
119+
print(r)
120+
continue
121+
print("%s failed (ret=%d): %s" % (r.repo_path, r.ret, r))
122+
if r.stderr:
123+
print(r.stderr.decode())
124+
return fail_count
125+
126+
@staticmethod
127+
def _child_init(lck):
128+
global lock
129+
lock = lck

0 commit comments

Comments
 (0)