Skip to content

Commit 39ffd99

Browse files
kcwuLUCI
authored andcommitted
sync: reduce multiprocessing serialization overhead
Background: - Manifest object is large (for projects like Android) in terms of serialization cost and size (more than 1mb). - Lots of Project objects usually share only a few manifest objects. Before this CL, Project objects were passed to workers via function parameters. Function parameters are pickled separately (in chunk). In other words, manifests are serialized again and again. The major serialization overhead of repo sync was O(manifest_size * projects / chunksize) This CL uses following tricks to reduce serialization overhead. - All projects are pickled in one invocation. Because Project objects share manifests, pickle library remembers which objects are already seen and avoid the serialization cost. - Pass the Project objects to workers at worker intialization time. And pass project index as function parameters instead. The number of workers is much smaller than the number of projects. - Worker init state are shared on Linux (fork based). So it requires zero serialization for Project objects. On Linux (fork based), the serialization overhead is O(projects) --- one int per project On Windows (spawn based), the serialization overhead is O(manifest_size * min(workers, projects)) Moreover, use chunksize=1 to avoid the chance that some workers are idle while other workers still have more than one job in their chunk queue. Using 2.7k projects as the baseline, originally "repo sync" no-op sync takes 31s for fetch and 25s for checkout on my Linux workstation. With this CL, it takes 12s for fetch and 1s for checkout. Bug: b/371638995 Change-Id: Ifa22072ea54eacb4a5c525c050d84de371e87caa Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/439921 Tested-by: Kuang-che Wu <[email protected]> Reviewed-by: Josip Sokcevic <[email protected]> Commit-Queue: Kuang-che Wu <[email protected]>
1 parent 584863f commit 39ffd99

File tree

2 files changed

+134
-87
lines changed

2 files changed

+134
-87
lines changed

command.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import contextlib
1516
import multiprocessing
1617
import optparse
1718
import os
@@ -70,6 +71,14 @@ class Command:
7071
# migrated subcommands can set it to False.
7172
MULTI_MANIFEST_SUPPORT = True
7273

74+
# Shared data across parallel execution workers.
75+
_parallel_context = None
76+
77+
@classmethod
78+
def get_parallel_context(cls):
79+
assert cls._parallel_context is not None
80+
return cls._parallel_context
81+
7382
def __init__(
7483
self,
7584
repodir=None,
@@ -242,9 +251,36 @@ def Execute(self, opt, args):
242251
"""Perform the action, after option parsing is complete."""
243252
raise NotImplementedError
244253

245-
@staticmethod
254+
@classmethod
255+
@contextlib.contextmanager
256+
def ParallelContext(cls):
257+
"""Obtains the context, which is shared to ExecuteInParallel workers.
258+
259+
Callers can store data in the context dict before invocation of
260+
ExecuteInParallel. The dict will then be shared to child workers of
261+
ExecuteInParallel.
262+
"""
263+
assert cls._parallel_context is None
264+
cls._parallel_context = {}
265+
try:
266+
yield
267+
finally:
268+
cls._parallel_context = None
269+
270+
@classmethod
271+
def _SetParallelContext(cls, context):
272+
cls._parallel_context = context
273+
274+
@classmethod
246275
def ExecuteInParallel(
247-
jobs, func, inputs, callback, output=None, ordered=False
276+
cls,
277+
jobs,
278+
func,
279+
inputs,
280+
callback,
281+
output=None,
282+
ordered=False,
283+
chunksize=WORKER_BATCH_SIZE,
248284
):
249285
"""Helper for managing parallel execution boiler plate.
250286
@@ -269,6 +305,8 @@ def ExecuteInParallel(
269305
output: An output manager. May be progress.Progess or
270306
color.Coloring.
271307
ordered: Whether the jobs should be processed in order.
308+
chunksize: The number of jobs processed in batch by parallel
309+
workers.
272310
273311
Returns:
274312
The |callback| function's results are returned.
@@ -278,12 +316,16 @@ def ExecuteInParallel(
278316
if len(inputs) == 1 or jobs == 1:
279317
return callback(None, output, (func(x) for x in inputs))
280318
else:
281-
with multiprocessing.Pool(jobs) as pool:
319+
with multiprocessing.Pool(
320+
jobs,
321+
initializer=cls._SetParallelContext,
322+
initargs=(cls._parallel_context,),
323+
) as pool:
282324
submit = pool.imap if ordered else pool.imap_unordered
283325
return callback(
284326
pool,
285327
output,
286-
submit(func, inputs, chunksize=WORKER_BATCH_SIZE),
328+
submit(func, inputs, chunksize=chunksize),
287329
)
288330
finally:
289331
if isinstance(output, progress.Progress):

0 commit comments

Comments
 (0)