2525import sys
2626import tempfile
2727import time
28- from typing import List , NamedTuple , Set , Union
28+ from typing import List , NamedTuple , Optional , Set , Union
2929import urllib .error
3030import urllib .parse
3131import urllib .request
@@ -194,6 +194,49 @@ class _CheckoutOneResult(NamedTuple):
194194 finish : float
195195
196196
197+ class _SyncResult (NamedTuple ):
198+ """Individual project sync result for interleaved mode.
199+
200+ Attributes:
201+ relpath (str): The project's relative path from the repo client top.
202+ fetch_success (bool): True if the fetch operation was successful.
203+ checkout_success (bool): True if the checkout operation was
204+ successful.
205+ fetch_error (Optional[Exception]): The Exception from a failed fetch,
206+ or None.
207+ checkout_error (Optional[Exception]): The Exception from a failed
208+ checkout, or None.
209+ fetch_start (Optional[float]): The time.time() when fetch started.
210+ fetch_finish (Optional[float]): The time.time() when fetch finished.
211+ checkout_start (Optional[float]): The time.time() when checkout
212+ started.
213+ checkout_finish (Optional[float]): The time.time() when checkout
214+ finished.
215+ """
216+
217+ relpath : str
218+ fetch_success : bool
219+ checkout_success : bool
220+ fetch_error : Optional [Exception ]
221+ checkout_error : Optional [Exception ]
222+
223+ fetch_start : Optional [float ]
224+ fetch_finish : Optional [float ]
225+ checkout_start : Optional [float ]
226+ checkout_finish : Optional [float ]
227+
228+
229+ class _InterleavedSyncResult (NamedTuple ):
230+ """Result of an interleaved sync.
231+
232+ Attributes:
233+ results (List[_SyncResult]): A list of results, one for each project
234+ processed. Empty if the worker failed before creating results.
235+ """
236+
237+ results : List [_SyncResult ]
238+
239+
197240class SuperprojectError (SyncError ):
198241 """Superproject sync repo."""
199242
@@ -837,15 +880,7 @@ def _Fetch(self, projects, opt, err_event, ssh_proxy, errors):
837880 )
838881
839882 sync_event = _threading .Event ()
840-
841- def _MonitorSyncLoop ():
842- while True :
843- pm .update (inc = 0 , msg = self ._GetSyncProgressMessage ())
844- if sync_event .wait (timeout = 1 ):
845- return
846-
847- sync_progress_thread = _threading .Thread (target = _MonitorSyncLoop )
848- sync_progress_thread .daemon = True
883+ sync_progress_thread = self ._CreateSyncProgressThread (pm , sync_event )
849884
850885 def _ProcessResults (pool , pm , results_sets ):
851886 ret = True
@@ -1828,6 +1863,16 @@ def _ExecuteHelper(self, opt, args, errors):
18281863 all_manifests = not opt .this_manifest_only ,
18291864 )
18301865
1866+ # Log the repo projects by existing and new.
1867+ existing = [x for x in all_projects if x .Exists ]
1868+ mp .config .SetString ("repo.existingprojectcount" , str (len (existing )))
1869+ mp .config .SetString (
1870+ "repo.newprojectcount" , str (len (all_projects ) - len (existing ))
1871+ )
1872+
1873+ self ._fetch_times = _FetchTimes (manifest )
1874+ self ._local_sync_state = LocalSyncState (manifest )
1875+
18311876 if opt .interleaved :
18321877 sync_method = self ._SyncInterleaved
18331878 else :
@@ -1864,6 +1909,34 @@ def _ExecuteHelper(self, opt, args, errors):
18641909 if not opt .quiet :
18651910 print ("repo sync has finished successfully." )
18661911
1912+ def _CreateSyncProgressThread (
1913+ self , pm : Progress , stop_event : _threading .Event
1914+ ) -> _threading .Thread :
1915+ """Creates and returns a daemon thread to update a Progress object.
1916+
1917+ The returned thread is not yet started. The thread will periodically
1918+ update the progress bar with information from _GetSyncProgressMessage
1919+ until the stop_event is set.
1920+
1921+ Args:
1922+ pm: The Progress object to update.
1923+ stop_event: The threading.Event to signal the monitor to stop.
1924+
1925+ Returns:
1926+ The configured _threading.Thread object.
1927+ """
1928+
1929+ def _monitor_loop ():
1930+ """The target function for the monitor thread."""
1931+ while True :
1932+ # Update the progress bar with the current status message.
1933+ pm .update (inc = 0 , msg = self ._GetSyncProgressMessage ())
1934+ # Wait for 1 second or until the stop_event is set.
1935+ if stop_event .wait (timeout = 1 ):
1936+ return
1937+
1938+ return _threading .Thread (target = _monitor_loop , daemon = True )
1939+
18671940 def _SyncPhased (
18681941 self ,
18691942 opt ,
@@ -1890,15 +1963,6 @@ def _SyncPhased(
18901963 err_update_projects = False
18911964 err_update_linkfiles = False
18921965
1893- # Log the repo projects by existing and new.
1894- existing = [x for x in all_projects if x .Exists ]
1895- mp .config .SetString ("repo.existingprojectcount" , str (len (existing )))
1896- mp .config .SetString (
1897- "repo.newprojectcount" , str (len (all_projects ) - len (existing ))
1898- )
1899-
1900- self ._fetch_times = _FetchTimes (manifest )
1901- self ._local_sync_state = LocalSyncState (manifest )
19021966 if not opt .local_only :
19031967 with multiprocessing .Manager () as manager :
19041968 with ssh .ProxyManager (manager ) as ssh_proxy :
@@ -2003,6 +2067,88 @@ def print_and_log(err_msg):
20032067 )
20042068 raise SyncError (aggregate_errors = errors )
20052069
2070+ @classmethod
2071+ def _SyncProjectList (cls , opt , project_indices ) -> _InterleavedSyncResult :
2072+ """Worker for interleaved sync.
2073+
2074+ This function is responsible for syncing a group of projects that share
2075+ a git object directory.
2076+
2077+ Args:
2078+ opt: Program options returned from optparse. See _Options().
2079+ project_indices: A list of indices into the projects list stored in
2080+ the parallel context.
2081+
2082+ Returns:
2083+ An `_InterleavedSyncResult` containing the results for each project.
2084+ """
2085+ results = []
2086+ context = cls .get_parallel_context ()
2087+ projects = context ["projects" ]
2088+ sync_dict = context ["sync_dict" ]
2089+
2090+ assert project_indices , "_SyncProjectList called with no indices."
2091+
2092+ # Use the first project as the representative for the progress bar.
2093+ first_project = projects [project_indices [0 ]]
2094+ key = f"{ first_project .name } @ { first_project .relpath } "
2095+ start_time = time .time ()
2096+ sync_dict [key ] = start_time
2097+
2098+ try :
2099+ for idx in project_indices :
2100+ project = projects [idx ]
2101+ # For now, simulate a successful sync.
2102+ # TODO(b/421935613): Perform the actual git fetch and checkout.
2103+ results .append (
2104+ _SyncResult (
2105+ relpath = project .relpath ,
2106+ fetch_success = True ,
2107+ checkout_success = True ,
2108+ fetch_error = None ,
2109+ checkout_error = None ,
2110+ fetch_start = None ,
2111+ fetch_finish = None ,
2112+ checkout_start = None ,
2113+ checkout_finish = None ,
2114+ )
2115+ )
2116+ finally :
2117+ del sync_dict [key ]
2118+
2119+ return _InterleavedSyncResult (results = results )
2120+
2121+ def _ProcessSyncInterleavedResults (
2122+ self ,
2123+ synced_relpaths : Set [str ],
2124+ err_event : _threading .Event ,
2125+ errors : List [Exception ],
2126+ opt : optparse .Values ,
2127+ pool : Optional [multiprocessing .Pool ],
2128+ pm : Progress ,
2129+ results_sets : List [_InterleavedSyncResult ],
2130+ ):
2131+ """Callback to process results from interleaved sync workers."""
2132+ ret = True
2133+ for result_group in results_sets :
2134+ for result in result_group .results :
2135+ pm .update ()
2136+ if result .fetch_success and result .checkout_success :
2137+ synced_relpaths .add (result .relpath )
2138+ else :
2139+ ret = False
2140+ err_event .set ()
2141+ if result .fetch_error :
2142+ errors .append (result .fetch_error )
2143+ if result .checkout_error :
2144+ errors .append (result .checkout_error )
2145+
2146+ if not ret and opt .fail_fast :
2147+ if pool :
2148+ pool .close ()
2149+ break
2150+ return ret
2151+
20062152 def _SyncInterleaved (
20072153 self ,
20082154 opt ,
@@ -2026,7 +2172,116 @@ def _SyncInterleaved(
20262172 2. Projects that share git objects are processed serially to prevent
20272173 race conditions.
20282174 """
2029- raise NotImplementedError ("Interleaved sync is not implemented yet." )
2175+ err_event = multiprocessing .Event ()
2176+ synced_relpaths = set ()
2177+ project_list = list (all_projects )
2178+ pm = Progress (
2179+ "Syncing" ,
2180+ len (project_list ),
2181+ delay = False ,
2182+ quiet = opt .quiet ,
2183+ show_elapsed = True ,
2184+ elide = True ,
2185+ )
2186+ previously_pending_relpaths = set ()
2187+
2188+ sync_event = _threading .Event ()
2189+ sync_progress_thread = self ._CreateSyncProgressThread (pm , sync_event )
2190+
2191+ with self .ParallelContext ():
2192+ # TODO(gavinmak): Use multprocessing.Queue instead of dict.
2193+ self .get_parallel_context ()[
2194+ "sync_dict"
2195+ ] = multiprocessing .Manager ().dict ()
2196+ sync_progress_thread .start ()
2197+
2198+ try :
2199+ # Outer loop for dynamic project discovery (e.g., submodules).
2200+ # It continues until no unsynced projects remain.
2201+ while True :
2202+ projects_to_sync = [
2203+ p
2204+ for p in project_list
2205+ if p .relpath not in synced_relpaths
2206+ ]
2207+ if not projects_to_sync :
2208+ break
2209+
2210+ pending_relpaths = {p .relpath for p in projects_to_sync }
2211+ if previously_pending_relpaths == pending_relpaths :
2212+ logger .error (
2213+ "Stall detected in interleaved sync, not all "
2214+ "projects could be synced."
2215+ )
2216+ err_event .set ()
2217+ break
2218+ previously_pending_relpaths = pending_relpaths
2219+
2220+ # Update the projects list for workers in the current pass.
2221+ self .get_parallel_context ()["projects" ] = projects_to_sync
2222+ project_index_map = {
2223+ p : i for i , p in enumerate (projects_to_sync )
2224+ }
2225+
2226+ # Inner loop to process projects in a hierarchical order.
2227+ # This iterates through levels of project dependencies (e.g.
2228+ # 'foo' then 'foo/bar'). All projects in one level can be
2229+ # processed in parallel, but we must wait for a level to
2230+ # complete before starting the next.
2231+ for level_projects in _SafeCheckoutOrder (projects_to_sync ):
2232+ if not level_projects :
2233+ continue
2234+
2235+ objdir_project_map = collections .defaultdict (list )
2236+ for p in level_projects :
2237+ objdir_project_map [p .objdir ].append (
2238+ project_index_map [p ]
2239+ )
2240+
2241+ work_items = list (objdir_project_map .values ())
2242+ if not work_items :
2243+ continue
2244+
2245+ jobs = max (1 , min (opt .jobs , len (work_items )))
2246+ callback = functools .partial (
2247+ self ._ProcessSyncInterleavedResults ,
2248+ synced_relpaths ,
2249+ err_event ,
2250+ errors ,
2251+ opt ,
2252+ )
2253+ if not self .ExecuteInParallel (
2254+ jobs ,
2255+ functools .partial (self ._SyncProjectList , opt ),
2256+ work_items ,
2257+ callback = callback ,
2258+ output = pm ,
2259+ chunksize = 1 ,
2260+ ):
2261+ err_event .set ()
2262+
2263+ if err_event .is_set () and opt .fail_fast :
2264+ raise SyncFailFastError (aggregate_errors = errors )
2265+
2266+ self ._ReloadManifest (None , manifest )
2267+ project_list = self .GetProjects (
2268+ args ,
2269+ missing_ok = True ,
2270+ submodules_ok = opt .fetch_submodules ,
2271+ manifest = manifest ,
2272+ all_manifests = not opt .this_manifest_only ,
2273+ )
2274+ finally :
2275+ sync_event .set ()
2276+ sync_progress_thread .join ()
2277+
2278+ pm .end ()
2279+
2280+ if err_event .is_set ():
2281+ logger .error (
2282+ "error: Unable to fully sync the tree in interleaved mode."
2283+ )
2284+ raise SyncError (aggregate_errors = errors )
20302285
20312286
20322287def _PostRepoUpgrade (manifest , quiet = False ):
0 commit comments