1+ package org .microg .gms .wearable ;
2+
3+ import android .database .Cursor ;
4+ import android .os .Handler ;
5+ import android .util .Log ;
6+
7+ import com .google .android .gms .wearable .Asset ;
8+
9+ import org .microg .gms .wearable .channel .ChannelManager ;
10+ import org .microg .gms .wearable .proto .FetchAsset ;
11+ import org .microg .gms .wearable .proto .RootMessage ;
12+
13+ import java .io .IOException ;
14+ import java .util .ArrayList ;
15+ import java .util .Collections ;
16+ import java .util .List ;
17+ import java .util .Map ;
18+ import java .util .Set ;
19+ import java .util .concurrent .ConcurrentHashMap ;
20+
21+ public class AssetFetcher {
22+ private static final String TAG = "GmsWearAssetFetch" ;
23+
24+ private final NodeDatabaseHelper nodeDatabase ;
25+ private final Handler networkHandler ;
26+
27+ private final Set <String > fetchingAssets = Collections .newSetFromMap (new ConcurrentHashMap <String , Boolean >());
28+
29+ private final Map <String , AssetFetchAttempt > failedAssets = new ConcurrentHashMap <>();
30+
31+ private static final int ASSET_BATCH_SIZE = 10 ;
32+ private static final int MAX_RETRY_COUNT = 3 ;
33+ private static final long RETRY_COOLDOWN_MS = 5000 ; // 5 seconds before retry
34+ private static final long FAILED_ASSET_EXPIRY_MS = 300000 ; // 5 minutes
35+
36+ public AssetFetcher (NodeDatabaseHelper nodeDatabase , Handler networkHandler ) {
37+ this .nodeDatabase = nodeDatabase ;
38+ this .networkHandler = networkHandler ;
39+ }
40+
41+ public void fetchMissingAssets (String nodeId , WearableConnection connection ,
42+ Map <String , WearableConnection > activeConnections ,
43+ ChannelManager channelManager ) {
44+ if (connection == null ) {
45+ Log .d (TAG , "Connection no longer active for node: " + nodeId );
46+ return ;
47+ }
48+
49+ cleanupExpiredFailures ();
50+
51+ Cursor cursor = nodeDatabase .listMissingAssets ();
52+ if (cursor == null ) {
53+ return ;
54+ }
55+
56+ try {
57+ int fetchCount = 0 ;
58+ int skippedCount = 0 ;
59+ int alreadyFetchingCount = 0 ;
60+
61+ while (cursor .moveToNext ()) {
62+ if (!activeConnections .containsKey (nodeId )) {
63+ Log .d (TAG , "Connection closed during asset fetch, stopping (fetched="
64+ + fetchCount + ", skipped=" + skippedCount + ")" );
65+ break ;
66+ }
67+
68+ String assetDigest = cursor .getString (13 );
69+ String assetName = cursor .getString (12 );
70+ String packageName = cursor .getString (1 );
71+ String signatureDigest = cursor .getString (2 );
72+
73+ if (fetchingAssets .contains (assetDigest )) {
74+ alreadyFetchingCount ++;
75+ continue ;
76+ }
77+
78+ AssetFetchAttempt attempt = failedAssets .get (assetDigest );
79+ if (attempt != null ) {
80+ if (attempt .retryCount >= MAX_RETRY_COUNT ) {
81+ skippedCount ++;
82+ continue ;
83+ }
84+
85+ long timeSinceLastAttempt = System .currentTimeMillis () - attempt .lastAttemptTime ;
86+ if (timeSinceLastAttempt < RETRY_COOLDOWN_MS ) {
87+ skippedCount ++;
88+ continue ;
89+ }
90+ }
91+
92+ try {
93+ fetchingAssets .add (assetDigest );
94+
95+ connection .writeMessage (new RootMessage .Builder ()
96+ .fetchAsset (new FetchAsset .Builder ()
97+ .assetName (assetName )
98+ .packageName (packageName )
99+ .signatureDigest (signatureDigest )
100+ .build ())
101+ .build ());
102+
103+ fetchCount ++;
104+
105+ failedAssets .remove (assetDigest );
106+
107+ if (fetchCount % ASSET_BATCH_SIZE == 0 ) {
108+ try {
109+ Thread .sleep (100 );
110+ } catch (InterruptedException e ) {
111+ Log .d (TAG , "Asset fetch interrupted" );
112+ break ;
113+ }
114+ }
115+
116+ } catch (IOException e ) {
117+ Log .w (TAG , "Error fetching asset " + assetDigest +
118+ " (fetched " + fetchCount + " so far): " + e .getMessage ());
119+
120+ recordFailure (assetDigest );
121+
122+ fetchingAssets .remove (assetDigest );
123+
124+ if (isConnectionError (e )) {
125+ break ;
126+ }
127+ }
128+ }
129+
130+ if (fetchCount > 0 || skippedCount > 0 || alreadyFetchingCount > 0 ) {
131+ Log .d (TAG , "Asset fetch summary: fetched=" + fetchCount +
132+ ", skipped=" + skippedCount +
133+ ", alreadyFetching=" + alreadyFetchingCount );
134+ }
135+
136+ if (fetchCount > 100 && channelManager != null ) {
137+ Log .d (TAG , "Large asset batch (" + fetchCount + "), applying cooldown" );
138+ channelManager .setOperationCooldown (1000 );
139+ }
140+
141+ } finally {
142+ cursor .close ();
143+ }
144+ }
145+
146+ public void fetchMissingAssetsForRecord (WearableConnection connection ,
147+ DataItemRecord record ,
148+ List <Asset > missingAssets ) {
149+ int successCount = 0 ;
150+ int skipCount = 0 ;
151+
152+ for (Asset asset : missingAssets ) {
153+ String digest = asset .getDigest ();
154+
155+ if (fetchingAssets .contains (digest )) {
156+ skipCount ++;
157+ continue ;
158+ }
159+
160+ AssetFetchAttempt attempt = failedAssets .get (digest );
161+ if (attempt != null ) {
162+ if (attempt .retryCount >= MAX_RETRY_COUNT ) {
163+ Log .d (TAG , "Asset " + digest + " failed too many times, skipping" );
164+ skipCount ++;
165+ continue ;
166+ }
167+
168+ long timeSinceLastAttempt = System .currentTimeMillis () - attempt .lastAttemptTime ;
169+ if (timeSinceLastAttempt < RETRY_COOLDOWN_MS ) {
170+ skipCount ++;
171+ continue ;
172+ }
173+ }
174+
175+ try {
176+ Log .d (TAG , "Fetching missing asset for record: " + digest );
177+
178+ fetchingAssets .add (digest );
179+
180+ FetchAsset fetchAsset = new FetchAsset .Builder ()
181+ .assetName (digest )
182+ .packageName (record .packageName )
183+ .signatureDigest (record .signatureDigest )
184+ .permission (false )
185+ .build ();
186+
187+ connection .writeMessage (new RootMessage .Builder ()
188+ .fetchAsset (fetchAsset )
189+ .build ());
190+
191+ successCount ++;
192+
193+ failedAssets .remove (digest );
194+
195+ } catch (IOException e ) {
196+ Log .w (TAG , "Error fetching asset " + digest + " for record" , e );
197+
198+ recordFailure (digest );
199+ fetchingAssets .remove (digest );
200+ }
201+ }
202+
203+ if (successCount > 0 || skipCount > 0 ) {
204+ Log .d (TAG , "Record asset fetch: success=" + successCount + ", skipped=" + skipCount );
205+ }
206+ }
207+
208+ public void onAssetReceived (String digest ) {
209+ fetchingAssets .remove (digest );
210+ failedAssets .remove (digest );
211+ Log .v (TAG , "Asset received and tracked: " + digest );
212+ }
213+
214+ public void onAssetFetchFailed (String digest ) {
215+ fetchingAssets .remove (digest );
216+ recordFailure (digest );
217+ Log .d (TAG , "Asset fetch failed: " + digest );
218+ }
219+
220+ private void recordFailure (String digest ) {
221+ AssetFetchAttempt attempt = failedAssets .get (digest );
222+ if (attempt == null ) {
223+ attempt = new AssetFetchAttempt (digest );
224+ failedAssets .put (digest , attempt );
225+ }
226+ attempt .recordFailure ();
227+ }
228+
229+ private boolean isConnectionError (IOException e ) {
230+ String message = e .getMessage ();
231+ if (message == null ) return false ;
232+
233+ return message .contains ("Connection" ) ||
234+ message .contains ("Broken pipe" ) ||
235+ message .contains ("Socket closed" ) ||
236+ message .contains ("Connection reset" );
237+ }
238+
239+ private void cleanupExpiredFailures () {
240+ long now = System .currentTimeMillis ();
241+ List <String > toRemove = new ArrayList <>();
242+
243+ for (Map .Entry <String , AssetFetchAttempt > entry : failedAssets .entrySet ()) {
244+ if (now - entry .getValue ().firstAttemptTime > FAILED_ASSET_EXPIRY_MS ) {
245+ toRemove .add (entry .getKey ());
246+ }
247+ }
248+
249+ for (String digest : toRemove ) {
250+ failedAssets .remove (digest );
251+ }
252+
253+ if (!toRemove .isEmpty ()) {
254+ Log .d (TAG , "Cleaned up " + toRemove .size () + " expired failed asset records" );
255+ }
256+ }
257+
258+ public AssetFetchStats getStats () {
259+ int failedCount = 0 ;
260+ int retryingCount = 0 ;
261+
262+ for (AssetFetchAttempt attempt : failedAssets .values ()) {
263+ if (attempt .retryCount >= MAX_RETRY_COUNT ) {
264+ failedCount ++;
265+ } else {
266+ retryingCount ++;
267+ }
268+ }
269+
270+ return new AssetFetchStats (
271+ fetchingAssets .size (),
272+ retryingCount ,
273+ failedCount
274+ );
275+ }
276+
277+ public void resetTracking () {
278+ fetchingAssets .clear ();
279+ failedAssets .clear ();
280+ Log .d (TAG , "Asset fetch tracking reset" );
281+ }
282+
283+ private static class AssetFetchAttempt {
284+ final String digest ;
285+ final long firstAttemptTime ;
286+ long lastAttemptTime ;
287+ int retryCount ;
288+
289+ AssetFetchAttempt (String digest ) {
290+ this .digest = digest ;
291+ this .firstAttemptTime = System .currentTimeMillis ();
292+ this .lastAttemptTime = firstAttemptTime ;
293+ this .retryCount = 0 ;
294+ }
295+
296+ void recordFailure () {
297+ this .lastAttemptTime = System .currentTimeMillis ();
298+ this .retryCount ++;
299+ }
300+ }
301+
302+ public static class AssetFetchStats {
303+ public final int currentlyFetching ;
304+ public final int retrying ;
305+ public final int failed ;
306+
307+ AssetFetchStats (int currentlyFetching , int retrying , int failed ) {
308+ this .currentlyFetching = currentlyFetching ;
309+ this .retrying = retrying ;
310+ this .failed = failed ;
311+ }
312+
313+ @ Override
314+ public String toString () {
315+ return "AssetFetchStats" +
316+ "{fetching=" +currentlyFetching +", " +
317+ "retrying=" +retrying +", " +
318+ "failed=" +failed +"}" ;
319+ }
320+ }
321+ }
0 commit comments