Skip to content

Commit 684ce9e

Browse files
committed
Retry storage list operations using retried pagination.
1 parent c4bea03 commit 684ce9e

File tree

6 files changed

+63
-31
lines changed

6 files changed

+63
-31
lines changed

app/lib/package/api_export/exported_api.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ final class ExportedApi {
149149
Future<void> _gcOldPrefixes() async {
150150
// List all top-level prefixes, and delete the ones we don't need
151151
final topLevelprefixes = await _pool.withResource(
152-
() async => await _bucket.list(prefix: '', delimiter: '/').toList(),
152+
() async =>
153+
await _bucket.listAllItemsWithRetry(prefix: '', delimiter: '/'),
153154
);
154155
await Future.wait(topLevelprefixes.map((entry) async {
155156
if (entry.isObject) {

app/lib/package/tarball_storage.dart

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,10 @@ class TarballStorage {
6767
String package,
6868
) async {
6969
final prefix = _tarballObjectNamePackagePrefix(package);
70-
final items = await _canonicalBucket
71-
.list(
72-
prefix: prefix,
73-
delimiter: '',
74-
)
75-
.toList();
70+
final items = await _canonicalBucket.listAllItemsWithRetry(
71+
prefix: prefix,
72+
delimiter: '',
73+
);
7674
return Map.fromEntries(items.whereType<BucketObjectEntry>().map((item) {
7775
final version = item.name.without(prefix: prefix, suffix: '.tar.gz');
7876
return MapEntry(
@@ -255,32 +253,33 @@ class TarballStorage {
255253
final filterForNamePrefix = package == null
256254
? 'packages/'
257255
: _tarballObjectNamePackagePrefix(package);
258-
await for (final entry in _publicBucket.list(prefix: filterForNamePrefix)) {
256+
await _publicBucket.listWithRetry(prefix: filterForNamePrefix,
257+
(entry) async {
259258
// Skip non-objects.
260259
if (!entry.isObject) {
261-
continue;
260+
return;
262261
}
263262
// Skip objects that were matched in the previous step.
264263
if (objectNamesInPublicBucket.contains(entry.name)) {
265-
continue;
264+
return;
266265
}
267266
if (deleteObjects.contains(entry.name)) {
268-
continue;
267+
return;
269268
}
270269

271270
final publicInfo = await _publicBucket.tryInfo(entry.name);
272271
if (publicInfo == null) {
273272
_logger.warning(
274273
'Failed to get info for public bucket object "${entry.name}".');
275-
continue;
274+
return;
276275
}
277276

278277
await updateContentDispositionToAttachment(publicInfo, _publicBucket);
279278

280279
// Skip recently updated objects.
281280
if (publicInfo.age < ageCheckThreshold) {
282281
// Ignore recent files.
283-
continue;
282+
return;
284283
}
285284

286285
final canonicalInfo = await _canonicalBucket.tryInfo(entry.name);
@@ -289,11 +288,11 @@ class TarballStorage {
289288
// but it wasn't matched through the [PackageVersion] query above.
290289
if (canonicalInfo.age < ageCheckThreshold) {
291290
// Ignore recent files.
292-
continue;
291+
return;
293292
}
294293
_logger.severe(
295294
'Object without matching PackageVersion in canonical and public buckets: "${entry.name}".');
296-
continue;
295+
return;
297296
} else {
298297
// The object in the public bucket has no matching file in the canonical bucket.
299298
// We can assume it is stale and can delete it.
@@ -305,7 +304,7 @@ class TarballStorage {
305304
deleteObjects.add(entry.name);
306305
}
307306
}
308-
}
307+
});
309308

310309
for (final objectName in deleteObjects) {
311310
_logger.shout('Deleting object from public bucket: "$objectName".');

app/lib/service/download_counts/sync_download_counts.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ Future<Set<String>> processDownloadCounts(DateTime date) async {
6464

6565
final failedFiles = <String>{};
6666

67-
final bucketEntries = await bucket.list(prefix: fileNamePrefix).toList();
67+
final bucketEntries =
68+
await bucket.listAllItemsWithRetry(prefix: fileNamePrefix);
6869

6970
if (bucketEntries.isEmpty) {
7071
_logger.info('Failed to read any files with prefix "$fileNamePrefix"./n');

app/lib/shared/storage.dart

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,36 @@ extension BucketExt on Bucket {
188188
return await _retry(() async => fn(read(objectName)));
189189
}
190190

191+
/// List objects in the bucket with default retry with pagination.
192+
Future<void> listWithRetry(
193+
FutureOr<void> Function(BucketEntry input) fn, {
194+
String? prefix,
195+
String? delimiter,
196+
}) async {
197+
for (;;) {
198+
var p = await pageWithRetry(prefix: prefix, delimiter: delimiter);
199+
for (final item in p.items) {
200+
await fn(item);
201+
}
202+
if (p.isLast) break;
203+
p = await p.nextWithRetry();
204+
}
205+
}
206+
207+
/// Lists all entries with default retry pagination, returns them as List.
208+
Future<List<BucketEntry>> listAllItemsWithRetry({
209+
String? prefix,
210+
String? delimiter,
211+
}) async {
212+
final entries = <BucketEntry>[];
213+
await listWithRetry(
214+
prefix: prefix,
215+
delimiter: delimiter,
216+
entries.add,
217+
);
218+
return entries;
219+
}
220+
191221
/// The HTTP URL of a publicly accessable GCS object.
192222
String objectUrl(String objectName) {
193223
return '${activeConfiguration.storageBaseUrl}/$bucketName/$objectName';
@@ -324,8 +354,9 @@ Future<int> deleteBucketFolderRecursively(
324354
page = await retry(
325355
() async {
326356
return page == null
327-
? await bucket.page(prefix: folder, delimiter: '', pageSize: 100)
328-
: await page.next(pageSize: 100);
357+
? await bucket.pageWithRetry(
358+
prefix: folder, delimiter: '', pageSize: 100)
359+
: await page.nextWithRetry(pageSize: 100);
329360
},
330361
delayFactor: Duration(seconds: 10),
331362
maxAttempts: 3,
@@ -430,8 +461,7 @@ class VersionedJsonStorage {
430461
}
431462
// fallback to earlier runtimes
432463
final currentPath = _objectName();
433-
final list = await _bucket
434-
.list(prefix: _prefix)
464+
final list = (await _bucket.listAllItemsWithRetry(prefix: _prefix))
435465
.map((entry) => entry.name)
436466
.where((name) => name.endsWith(_extension))
437467
.where((name) => name.compareTo(currentPath) <= 0)
@@ -456,19 +486,19 @@ class VersionedJsonStorage {
456486
Future<DeleteCounts> deleteOldData({Duration? minAgeThreshold}) async {
457487
var found = 0;
458488
var deleted = 0;
459-
await for (final entry in _bucket.list(prefix: _prefix)) {
489+
await _bucket.listWithRetry(prefix: _prefix, (entry) async {
460490
if (entry.isDirectory) {
461-
continue;
491+
return;
462492
}
463493
final name = p.basename(entry.name);
464494
if (!name.endsWith(_extension)) {
465-
continue;
495+
return;
466496
}
467497
final version = name.substring(0, name.length - _extension.length);
468498
final matchesPattern = version.length == 10 &&
469499
versions.runtimeVersionPattern.hasMatch(version);
470500
if (!matchesPattern) {
471-
continue;
501+
return;
472502
}
473503
found++;
474504
if (versions.shouldGCVersion(version)) {
@@ -479,7 +509,7 @@ class VersionedJsonStorage {
479509
await deleteFromBucket(_bucket, entry.name);
480510
}
481511
}
482-
}
512+
});
483513
return DeleteCounts(found, deleted);
484514
}
485515

app/lib/task/backend.dart

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -516,10 +516,10 @@ class TaskBackend {
516516
// Objects in the bucket are stored under the following pattern:
517517
// `<runtimeVersion>/<package>/<version>/...`
518518
// Thus, we list with `/` as delimiter and get a list of runtimeVersions
519-
await for (final d in _bucket.list(prefix: '', delimiter: '/')) {
519+
await _bucket.listWithRetry(prefix: '', delimiter: '/', (d) async {
520520
if (!d.isDirectory) {
521521
_log.warning('bucket should not contain any top-level object');
522-
continue;
522+
return;
523523
}
524524

525525
// Remove trailing slash from object prefix, to get a runtimeVersion
@@ -529,7 +529,7 @@ class TaskBackend {
529529
// Check if the runtimeVersion should be GC'ed
530530
if (shouldGCVersion(rtVersion)) {
531531
// List all objects under the `<rtVersion>/`
532-
await for (final obj in _bucket.list(prefix: d.name, delimiter: '')) {
532+
await _bucket.listWithRetry(prefix: d.name, delimiter: '', (obj) async {
533533
// Limit concurrency
534534
final r = await pool.request();
535535

@@ -545,9 +545,9 @@ class TaskBackend {
545545
r.release(); // always release to avoid deadlock
546546
}
547547
});
548-
}
548+
});
549549
}
550-
}
550+
});
551551

552552
// Close the pool, and wait for all pending deletion request to complete.
553553
await pool.close();

pkg/fake_gcloud/lib/retry_enforcer_storage.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ class _RetryEnforcerBucket implements Bucket {
162162
@override
163163
Stream<BucketEntry> list({String? prefix, String? delimiter}) {
164164
// TODO: verify retry wrapper here
165+
_verifyRetryOnStack();
165166
return _bucket.list(
166167
prefix: prefix,
167168
delimiter: delimiter,

0 commit comments

Comments
 (0)