Skip to content

Commit 8da4861

Browse files
kcwuLUCI
authored andcommitted
subcmds: reduce multiprocessing serialization overhead
Follow the same approach as 39ffd99 to reduce serialization overhead. Below benchmarks are tested with 2.7k projects on my workstation (warm cache). git tracing is disabled for benchmark. (seconds) | v2.48 | v2.48 | this CL | this CL | | -j32 | | -j32 ----------------------------------------------------------- with clean tree state: branches (none) | 5.6 | 5.9 | 1.0 | 0.9 status (clean) | 21.3 | 9.4 | 19.4 | 4.7 diff (none) | 7.6 | 7.2 | 5.7 | 2.2 prune (none) | 5.7 | 6.1 | 1.3 | 1.2 abandon (none) | 19.4 | 18.6 | 0.9 | 0.8 upload (none) | 19.7 | 18.7 | 0.9 | 0.8 forall -c true | 7.5 | 7.6 | 0.6 | 0.6 forall -c "git log -1" | 11.3 | 11.1 | 0.6 | 0.6 with branches: start BRANCH --all | 21.9 | 20.3 | 13.6 | 2.6 checkout BRANCH | 29.1 | 27.8 | 1.1 | 1.0 branches (2) | 28.0 | 28.6 | 1.5 | 1.3 abandon BRANCH | 29.2 | 27.5 | 9.7 | 2.2 Bug: b/371638995 Change-Id: I53989a3d1e43063587b3f52f852b1c2c56b49412 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/440221 Reviewed-by: Josip Sokcevic <[email protected]> Tested-by: Kuang-che Wu <[email protected]> Commit-Queue: Kuang-che Wu <[email protected]>
1 parent 39ffd99 commit 8da4861

File tree

11 files changed

+228
-172
lines changed

11 files changed

+228
-172
lines changed

command.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,10 @@ def ParallelContext(cls):
268268
cls._parallel_context = None
269269

270270
@classmethod
271-
def _SetParallelContext(cls, context):
271+
def _InitParallelWorker(cls, context, initializer):
272272
cls._parallel_context = context
273+
if initializer:
274+
initializer()
273275

274276
@classmethod
275277
def ExecuteInParallel(
@@ -281,6 +283,7 @@ def ExecuteInParallel(
281283
output=None,
282284
ordered=False,
283285
chunksize=WORKER_BATCH_SIZE,
286+
initializer=None,
284287
):
285288
"""Helper for managing parallel execution boiler plate.
286289
@@ -307,6 +310,7 @@ def ExecuteInParallel(
307310
ordered: Whether the jobs should be processed in order.
308311
chunksize: The number of jobs processed in batch by parallel
309312
workers.
313+
initializer: Worker initializer.
310314
311315
Returns:
312316
The |callback| function's results are returned.
@@ -318,8 +322,8 @@ def ExecuteInParallel(
318322
else:
319323
with multiprocessing.Pool(
320324
jobs,
321-
initializer=cls._SetParallelContext,
322-
initargs=(cls._parallel_context,),
325+
initializer=cls._InitParallelWorker,
326+
initargs=(cls._parallel_context, initializer),
323327
) as pool:
324328
submit = pool.imap if ordered else pool.imap_unordered
325329
return callback(

subcmds/abandon.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,10 @@ def ValidateOptions(self, opt, args):
7070
else:
7171
args.insert(0, "'All local branches'")
7272

73-
def _ExecuteOne(self, all_branches, nb, project):
73+
@classmethod
74+
def _ExecuteOne(cls, all_branches, nb, project_idx):
7475
"""Abandon one project."""
76+
project = cls.get_parallel_context()["projects"][project_idx]
7577
if all_branches:
7678
branches = project.GetBranches()
7779
else:
@@ -89,7 +91,7 @@ def _ExecuteOne(self, all_branches, nb, project):
8991
if status is not None:
9092
ret[name] = status
9193

92-
return (ret, project, errors)
94+
return (ret, project_idx, errors)
9395

9496
def Execute(self, opt, args):
9597
nb = args[0].split()
@@ -102,7 +104,8 @@ def Execute(self, opt, args):
102104
_RelPath = lambda p: p.RelPath(local=opt.this_manifest_only)
103105

104106
def _ProcessResults(_pool, pm, states):
105-
for results, project, errors in states:
107+
for results, project_idx, errors in states:
108+
project = all_projects[project_idx]
106109
for branch, status in results.items():
107110
if status:
108111
success[branch].append(project)
@@ -111,15 +114,18 @@ def _ProcessResults(_pool, pm, states):
111114
aggregate_errors.extend(errors)
112115
pm.update(msg="")
113116

114-
self.ExecuteInParallel(
115-
opt.jobs,
116-
functools.partial(self._ExecuteOne, opt.all, nb),
117-
all_projects,
118-
callback=_ProcessResults,
119-
output=Progress(
120-
f"Abandon {nb}", len(all_projects), quiet=opt.quiet
121-
),
122-
)
117+
with self.ParallelContext():
118+
self.get_parallel_context()["projects"] = all_projects
119+
self.ExecuteInParallel(
120+
opt.jobs,
121+
functools.partial(self._ExecuteOne, opt.all, nb),
122+
range(len(all_projects)),
123+
callback=_ProcessResults,
124+
output=Progress(
125+
f"Abandon {nb}", len(all_projects), quiet=opt.quiet
126+
),
127+
chunksize=1,
128+
)
123129

124130
width = max(
125131
itertools.chain(

subcmds/branches.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,22 @@ class Branches(Command):
9898
"""
9999
PARALLEL_JOBS = DEFAULT_LOCAL_JOBS
100100

101+
@classmethod
102+
def _ExpandProjectToBranches(cls, project_idx):
103+
"""Expands a project into a list of branch names & associated info.
104+
105+
Args:
106+
project_idx: project.Project index
107+
108+
Returns:
109+
List[Tuple[str, git_config.Branch, int]]
110+
"""
111+
branches = []
112+
project = cls.get_parallel_context()["projects"][project_idx]
113+
for name, b in project.GetBranches().items():
114+
branches.append((name, b, project_idx))
115+
return branches
116+
101117
def Execute(self, opt, args):
102118
projects = self.GetProjects(
103119
args, all_manifests=not opt.this_manifest_only
@@ -107,17 +123,20 @@ def Execute(self, opt, args):
107123
project_cnt = len(projects)
108124

109125
def _ProcessResults(_pool, _output, results):
110-
for name, b in itertools.chain.from_iterable(results):
126+
for name, b, project_idx in itertools.chain.from_iterable(results):
127+
b.project = projects[project_idx]
111128
if name not in all_branches:
112129
all_branches[name] = BranchInfo(name)
113130
all_branches[name].add(b)
114131

115-
self.ExecuteInParallel(
116-
opt.jobs,
117-
expand_project_to_branches,
118-
projects,
119-
callback=_ProcessResults,
120-
)
132+
with self.ParallelContext():
133+
self.get_parallel_context()["projects"] = projects
134+
self.ExecuteInParallel(
135+
opt.jobs,
136+
self._ExpandProjectToBranches,
137+
range(len(projects)),
138+
callback=_ProcessResults,
139+
)
121140

122141
names = sorted(all_branches)
123142

@@ -191,19 +210,3 @@ def _ProcessResults(_pool, _output, results):
191210
else:
192211
out.write(" in all projects")
193212
out.nl()
194-
195-
196-
def expand_project_to_branches(project):
197-
"""Expands a project into a list of branch names & associated information.
198-
199-
Args:
200-
project: project.Project
201-
202-
Returns:
203-
List[Tuple[str, git_config.Branch]]
204-
"""
205-
branches = []
206-
for name, b in project.GetBranches().items():
207-
b.project = project
208-
branches.append((name, b))
209-
return branches

subcmds/checkout.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
from error import GitError
2121
from error import RepoExitError
2222
from progress import Progress
23-
from project import Project
2423
from repo_logging import RepoLogger
2524

2625

@@ -30,7 +29,7 @@
3029
class CheckoutBranchResult(NamedTuple):
3130
# Whether the Project is on the branch (i.e. branch exists and no errors)
3231
result: bool
33-
project: Project
32+
project_idx: int
3433
error: Exception
3534

3635

@@ -62,15 +61,17 @@ def ValidateOptions(self, opt, args):
6261
if not args:
6362
self.Usage()
6463

65-
def _ExecuteOne(self, nb, project):
64+
@classmethod
65+
def _ExecuteOne(cls, nb, project_idx):
6666
"""Checkout one project."""
6767
error = None
6868
result = None
69+
project = cls.get_parallel_context()["projects"][project_idx]
6970
try:
7071
result = project.CheckoutBranch(nb)
7172
except GitError as e:
7273
error = e
73-
return CheckoutBranchResult(result, project, error)
74+
return CheckoutBranchResult(result, project_idx, error)
7475

7576
def Execute(self, opt, args):
7677
nb = args[0]
@@ -83,22 +84,25 @@ def Execute(self, opt, args):
8384

8485
def _ProcessResults(_pool, pm, results):
8586
for result in results:
87+
project = all_projects[result.project_idx]
8688
if result.error is not None:
8789
err.append(result.error)
88-
err_projects.append(result.project)
90+
err_projects.append(project)
8991
elif result.result:
90-
success.append(result.project)
92+
success.append(project)
9193
pm.update(msg="")
9294

93-
self.ExecuteInParallel(
94-
opt.jobs,
95-
functools.partial(self._ExecuteOne, nb),
96-
all_projects,
97-
callback=_ProcessResults,
98-
output=Progress(
99-
f"Checkout {nb}", len(all_projects), quiet=opt.quiet
100-
),
101-
)
95+
with self.ParallelContext():
96+
self.get_parallel_context()["projects"] = all_projects
97+
self.ExecuteInParallel(
98+
opt.jobs,
99+
functools.partial(self._ExecuteOne, nb),
100+
range(len(all_projects)),
101+
callback=_ProcessResults,
102+
output=Progress(
103+
f"Checkout {nb}", len(all_projects), quiet=opt.quiet
104+
),
105+
)
102106

103107
if err_projects:
104108
for p in err_projects:

subcmds/diff.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,22 @@ def _Options(self, p):
4040
help="paths are relative to the repository root",
4141
)
4242

43-
def _ExecuteOne(self, absolute, local, project):
43+
@classmethod
44+
def _ExecuteOne(cls, absolute, local, project_idx):
4445
"""Obtains the diff for a specific project.
4546
4647
Args:
4748
absolute: Paths are relative to the root.
4849
local: a boolean, if True, the path is relative to the local
4950
(sub)manifest. If false, the path is relative to the outermost
5051
manifest.
51-
project: Project to get status of.
52+
project_idx: Project index to get status of.
5253
5354
Returns:
5455
The status of the project.
5556
"""
5657
buf = io.StringIO()
58+
project = cls.get_parallel_context()["projects"][project_idx]
5759
ret = project.PrintWorkTreeDiff(absolute, output_redir=buf, local=local)
5860
return (ret, buf.getvalue())
5961

@@ -71,12 +73,15 @@ def _ProcessResults(_pool, _output, results):
7173
ret = 1
7274
return ret
7375

74-
return self.ExecuteInParallel(
75-
opt.jobs,
76-
functools.partial(
77-
self._ExecuteOne, opt.absolute, opt.this_manifest_only
78-
),
79-
all_projects,
80-
callback=_ProcessResults,
81-
ordered=True,
82-
)
76+
with self.ParallelContext():
77+
self.get_parallel_context()["projects"] = all_projects
78+
return self.ExecuteInParallel(
79+
opt.jobs,
80+
functools.partial(
81+
self._ExecuteOne, opt.absolute, opt.this_manifest_only
82+
),
83+
range(len(all_projects)),
84+
callback=_ProcessResults,
85+
ordered=True,
86+
chunksize=1,
87+
)

0 commit comments

Comments
 (0)