Skip to content

Commit 1afe96a

Browse files
gavinmakLUCI
authored andcommitted
sync: fix saving of fetch times and local state
Interleaved sync didn't save _fetch_times and _local_sync_state to disk. Phased sync saved them, but incorrectly applied moving average smoothing repeatedly when fetching submodules, and discarded historical data during partial syncs. Move .Save() calls to the end of main sync loops to ensure they run once. Update _FetchTimes.Save() to merge new data with existing history, preventing data loss. Change-Id: I174f98a62ac86859f1eeea1daba65eb35c227852 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/519821 Commit-Queue: Gavin Mak <[email protected]> Reviewed-by: Scott Lee <[email protected]> Tested-by: Gavin Mak <[email protected]>
1 parent 2719a8e commit 1afe96a

File tree

2 files changed

+167
-147
lines changed

2 files changed

+167
-147
lines changed

subcmds/sync.py

Lines changed: 164 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -975,9 +975,6 @@ def _ProcessResults(pool, pm, results_sets):
975975
sync_event.set()
976976
sync_progress_thread.join()
977977

978-
self._fetch_times.Save()
979-
self._local_sync_state.Save()
980-
981978
if not self.outer_client.manifest.IsArchive:
982979
self._GCProjects(projects, opt, err_event)
983980

@@ -1003,53 +1000,58 @@ def _FetchMain(
10031000
to_fetch.extend(all_projects)
10041001
to_fetch.sort(key=self._fetch_times.Get, reverse=True)
10051002

1006-
result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors)
1007-
success = result.success
1008-
fetched = result.projects
1009-
if not success:
1010-
err_event.set()
1011-
1012-
if opt.network_only:
1013-
# Bail out now; the rest touches the working tree.
1014-
if err_event.is_set():
1015-
e = SyncError(
1016-
"error: Exited sync due to fetch errors.",
1017-
aggregate_errors=errors,
1018-
)
1019-
1020-
logger.error(e)
1021-
raise e
1022-
return _FetchMainResult([])
1023-
1024-
# Iteratively fetch missing and/or nested unregistered submodules.
1025-
previously_missing_set = set()
1026-
while True:
1027-
self._ReloadManifest(None, manifest)
1028-
all_projects = self.GetProjects(
1029-
args,
1030-
missing_ok=True,
1031-
submodules_ok=opt.fetch_submodules,
1032-
manifest=manifest,
1033-
all_manifests=not opt.this_manifest_only,
1034-
)
1035-
missing = []
1036-
for project in all_projects:
1037-
if project.gitdir not in fetched:
1038-
missing.append(project)
1039-
if not missing:
1040-
break
1041-
# Stop us from non-stopped fetching actually-missing repos: If set
1042-
# of missing repos has not been changed from last fetch, we break.
1043-
missing_set = {p.name for p in missing}
1044-
if previously_missing_set == missing_set:
1045-
break
1046-
previously_missing_set = missing_set
1047-
result = self._Fetch(missing, opt, err_event, ssh_proxy, errors)
1003+
try:
1004+
result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors)
10481005
success = result.success
1049-
new_fetched = result.projects
1006+
fetched = result.projects
10501007
if not success:
10511008
err_event.set()
1052-
fetched.update(new_fetched)
1009+
1010+
if opt.network_only:
1011+
# Bail out now; the rest touches the working tree.
1012+
if err_event.is_set():
1013+
e = SyncError(
1014+
"error: Exited sync due to fetch errors.",
1015+
aggregate_errors=errors,
1016+
)
1017+
1018+
logger.error(e)
1019+
raise e
1020+
return _FetchMainResult([])
1021+
1022+
# Iteratively fetch missing and/or nested unregistered submodules.
1023+
previously_missing_set = set()
1024+
while True:
1025+
self._ReloadManifest(None, manifest)
1026+
all_projects = self.GetProjects(
1027+
args,
1028+
missing_ok=True,
1029+
submodules_ok=opt.fetch_submodules,
1030+
manifest=manifest,
1031+
all_manifests=not opt.this_manifest_only,
1032+
)
1033+
missing = []
1034+
for project in all_projects:
1035+
if project.gitdir not in fetched:
1036+
missing.append(project)
1037+
if not missing:
1038+
break
1039+
# Stop us from non-stopped fetching actually-missing repos: If
1040+
# set of missing repos has not been changed from last fetch, we
1041+
# break.
1042+
missing_set = {p.name for p in missing}
1043+
if previously_missing_set == missing_set:
1044+
break
1045+
previously_missing_set = missing_set
1046+
result = self._Fetch(missing, opt, err_event, ssh_proxy, errors)
1047+
success = result.success
1048+
new_fetched = result.projects
1049+
if not success:
1050+
err_event.set()
1051+
fetched.update(new_fetched)
1052+
finally:
1053+
self._fetch_times.Save()
1054+
self._local_sync_state.Save()
10531055

10541056
return _FetchMainResult(all_projects)
10551057

@@ -2491,107 +2493,120 @@ def _SyncInterleaved(
24912493
sync_event = _threading.Event()
24922494
sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
24932495

2494-
with multiprocessing.Manager() as manager, ssh.ProxyManager(
2495-
manager
2496-
) as ssh_proxy:
2497-
ssh_proxy.sock()
2498-
with self.ParallelContext():
2499-
self.get_parallel_context()["ssh_proxy"] = ssh_proxy
2500-
# TODO(gavinmak): Use multprocessing.Queue instead of dict.
2501-
self.get_parallel_context()[
2502-
"sync_dict"
2503-
] = multiprocessing.Manager().dict()
2504-
sync_progress_thread.start()
2496+
try:
2497+
with multiprocessing.Manager() as manager, ssh.ProxyManager(
2498+
manager
2499+
) as ssh_proxy:
2500+
ssh_proxy.sock()
2501+
with self.ParallelContext():
2502+
self.get_parallel_context()["ssh_proxy"] = ssh_proxy
2503+
# TODO(gavinmak): Use multprocessing.Queue instead of dict.
2504+
self.get_parallel_context()[
2505+
"sync_dict"
2506+
] = multiprocessing.Manager().dict()
2507+
sync_progress_thread.start()
25052508

2506-
try:
2507-
# Outer loop for dynamic project discovery. This continues
2508-
# until no unsynced projects remain.
2509-
while True:
2510-
projects_to_sync = [
2511-
p
2512-
for p in project_list
2513-
if p.relpath not in finished_relpaths
2514-
]
2515-
if not projects_to_sync:
2516-
break
2517-
2518-
pending_relpaths = {p.relpath for p in projects_to_sync}
2519-
if previously_pending_relpaths == pending_relpaths:
2520-
stalled_projects_str = "\n".join(
2521-
f" - {path}"
2522-
for path in sorted(list(pending_relpaths))
2523-
)
2524-
logger.error(
2525-
"The following projects failed and could not "
2526-
"be synced:\n%s",
2527-
stalled_projects_str,
2528-
)
2529-
err_event.set()
2530-
break
2531-
previously_pending_relpaths = pending_relpaths
2532-
2533-
self.get_parallel_context()[
2534-
"projects"
2535-
] = projects_to_sync
2536-
project_index_map = {
2537-
p: i for i, p in enumerate(projects_to_sync)
2538-
}
2539-
2540-
# Inner loop to process projects in a hierarchical
2541-
# order. This iterates through levels of project
2542-
# dependencies (e.g. 'foo' then 'foo/bar'). All projects
2543-
# in one level can be processed in parallel, but we must
2544-
# wait for a level to complete before starting the next.
2545-
for level_projects in _SafeCheckoutOrder(
2546-
projects_to_sync
2547-
):
2548-
if not level_projects:
2549-
continue
2550-
2551-
objdir_project_map = collections.defaultdict(list)
2552-
for p in level_projects:
2553-
objdir_project_map[p.objdir].append(
2554-
project_index_map[p]
2509+
try:
2510+
# Outer loop for dynamic project discovery. This
2511+
# continues until no unsynced projects remain.
2512+
while True:
2513+
projects_to_sync = [
2514+
p
2515+
for p in project_list
2516+
if p.relpath not in finished_relpaths
2517+
]
2518+
if not projects_to_sync:
2519+
break
2520+
2521+
pending_relpaths = {
2522+
p.relpath for p in projects_to_sync
2523+
}
2524+
if previously_pending_relpaths == pending_relpaths:
2525+
stalled_projects_str = "\n".join(
2526+
f" - {path}"
2527+
for path in sorted(list(pending_relpaths))
2528+
)
2529+
logger.error(
2530+
"The following projects failed and could "
2531+
"not be synced:\n%s",
2532+
stalled_projects_str,
25552533
)
2556-
2557-
work_items = list(objdir_project_map.values())
2558-
if not work_items:
2559-
continue
2560-
2561-
jobs = max(1, min(opt.jobs, len(work_items)))
2562-
callback = functools.partial(
2563-
self._ProcessSyncInterleavedResults,
2564-
finished_relpaths,
2565-
err_event,
2566-
errors,
2567-
opt,
2568-
)
2569-
if not self.ExecuteInParallel(
2570-
jobs,
2571-
functools.partial(self._SyncProjectList, opt),
2572-
work_items,
2573-
callback=callback,
2574-
output=pm,
2575-
chunksize=1,
2576-
initializer=self.InitWorker,
2577-
):
25782534
err_event.set()
2535+
break
2536+
previously_pending_relpaths = pending_relpaths
2537+
2538+
self.get_parallel_context()[
2539+
"projects"
2540+
] = projects_to_sync
2541+
project_index_map = {
2542+
p: i for i, p in enumerate(projects_to_sync)
2543+
}
2544+
2545+
# Inner loop to process projects in a hierarchical
2546+
# order. This iterates through levels of project
2547+
# dependencies (e.g. 'foo' then 'foo/bar'). All
2548+
# projects in one level can be processed in
2549+
# parallel, but we must wait for a level to complete
2550+
# before starting the next.
2551+
for level_projects in _SafeCheckoutOrder(
2552+
projects_to_sync
2553+
):
2554+
if not level_projects:
2555+
continue
25792556

2580-
if err_event.is_set() and opt.fail_fast:
2581-
raise SyncFailFastError(aggregate_errors=errors)
2582-
2583-
self._ReloadManifest(None, manifest)
2584-
project_list = self.GetProjects(
2585-
args,
2586-
missing_ok=True,
2587-
submodules_ok=opt.fetch_submodules,
2588-
manifest=manifest,
2589-
all_manifests=not opt.this_manifest_only,
2590-
)
2591-
pm.update_total(len(project_list))
2592-
finally:
2593-
sync_event.set()
2594-
sync_progress_thread.join()
2557+
objdir_project_map = collections.defaultdict(
2558+
list
2559+
)
2560+
for p in level_projects:
2561+
objdir_project_map[p.objdir].append(
2562+
project_index_map[p]
2563+
)
2564+
2565+
work_items = list(objdir_project_map.values())
2566+
if not work_items:
2567+
continue
2568+
2569+
jobs = max(1, min(opt.jobs, len(work_items)))
2570+
callback = functools.partial(
2571+
self._ProcessSyncInterleavedResults,
2572+
finished_relpaths,
2573+
err_event,
2574+
errors,
2575+
opt,
2576+
)
2577+
if not self.ExecuteInParallel(
2578+
jobs,
2579+
functools.partial(
2580+
self._SyncProjectList, opt
2581+
),
2582+
work_items,
2583+
callback=callback,
2584+
output=pm,
2585+
chunksize=1,
2586+
initializer=self.InitWorker,
2587+
):
2588+
err_event.set()
2589+
2590+
if err_event.is_set() and opt.fail_fast:
2591+
raise SyncFailFastError(
2592+
aggregate_errors=errors
2593+
)
2594+
2595+
self._ReloadManifest(None, manifest)
2596+
project_list = self.GetProjects(
2597+
args,
2598+
missing_ok=True,
2599+
submodules_ok=opt.fetch_submodules,
2600+
manifest=manifest,
2601+
all_manifests=not opt.this_manifest_only,
2602+
)
2603+
pm.update_total(len(project_list))
2604+
finally:
2605+
sync_event.set()
2606+
sync_progress_thread.join()
2607+
finally:
2608+
self._fetch_times.Save()
2609+
self._local_sync_state.Save()
25952610

25962611
pm.end()
25972612

@@ -2695,17 +2710,19 @@ def _Load(self):
26952710
self._saved = {}
26962711

26972712
def Save(self):
2698-
if self._saved is None:
2713+
if not self._seen:
26992714
return
27002715

2716+
self._Load()
2717+
27012718
for name, t in self._seen.items():
27022719
# Keep a moving average across the previous/current sync runs.
27032720
old = self._saved.get(name, t)
2704-
self._seen[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old)
2721+
self._saved[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old)
27052722

27062723
try:
27072724
with open(self._path, "w") as f:
2708-
json.dump(self._seen, f, indent=2)
2725+
json.dump(self._saved, f, indent=2)
27092726
except (OSError, TypeError):
27102727
platform_utils.remove(self._path, missing_ok=True)
27112728

tests/test_subcmds_sync.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,9 @@ def setUp(self):
681681
# Mock _GetCurrentBranchOnly for worker tests.
682682
mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start()
683683

684+
self.cmd._fetch_times = mock.Mock()
685+
self.cmd._local_sync_state = mock.Mock()
686+
684687
def tearDown(self):
685688
"""Clean up resources."""
686689
shutil.rmtree(self.repodir)

0 commit comments

Comments
 (0)