Skip to content

Commit 48704ee

Browse files
authored
Retry streaming storage reads with callback method. (#8423)
1 parent edfb133 commit 48704ee

File tree

5 files changed

+44
-24
lines changed

5 files changed

+44
-24
lines changed

app/lib/package/backend.dart

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -902,8 +902,8 @@ class PackageBackend {
902902
throw PackageRejectedException.archiveTooLarge(
903903
UploadSignerService.maxUploadSize);
904904
}
905-
await _saveTarballToFS(
906-
_incomingBucket.read(tmpObjectName(guid)), filename);
905+
await _incomingBucket.readWithRetry(
906+
tmpObjectName(guid), (input) => _saveTarballToFS(input, filename));
907907
_logger.info('Examining tarball content ($guid).');
908908
final sw = Stopwatch()..start();
909909
final file = File(filename);
@@ -1756,6 +1756,11 @@ class InviteStatus {
17561756
Future _saveTarballToFS(Stream<List<int>> data, String filename) async {
17571757
final sw = Stopwatch()..start();
17581758
final targetFile = File(filename);
1759+
1760+
// cleanup the leftover if previous attempt failed
1761+
if (await targetFile.exists()) {
1762+
await targetFile.delete();
1763+
}
17591764
try {
17601765
int receivedBytes = 0;
17611766
final stream = data.transform<List<int>>(

app/lib/package/tarball_storage.dart

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -120,18 +120,20 @@ class TarballStorage {
120120
final raf = await file.open();
121121
var remainingLength = info.length;
122122
try {
123-
await for (final chunk in _canonicalBucket.read(objectName)) {
124-
if (chunk.isEmpty) continue;
125-
remainingLength -= chunk.length;
126-
if (remainingLength < 0) {
127-
return ContentMatchStatus.different;
123+
await _canonicalBucket.readWithRetry(objectName, (input) async {
124+
await for (final chunk in input) {
125+
if (chunk.isEmpty) continue;
126+
remainingLength -= chunk.length;
127+
if (remainingLength < 0) {
128+
return ContentMatchStatus.different;
129+
}
130+
// TODO: consider rewriting to fixed-length chunk comparison
131+
final fileChunk = await raf.read(chunk.length);
132+
if (!fileChunk.byteToByteEquals(chunk)) {
133+
return ContentMatchStatus.different;
134+
}
128135
}
129-
// TODO: consider rewriting to fixed-length chunk comparison
130-
final fileChunk = await raf.read(chunk.length);
131-
if (!fileChunk.byteToByteEquals(chunk)) {
132-
return ContentMatchStatus.different;
133-
}
134-
}
136+
});
135137
} finally {
136138
await raf.close();
137139
}

app/lib/service/download_counts/backend.dart

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,13 @@ class DownloadCountsBackend {
5050
}
5151
final data = (await storageService
5252
.bucket(activeConfiguration.reportsBucketName!)
53-
.read(downloadCounts30DaysTotalsFileName)
54-
.transform(utf8.decoder)
55-
.transform(json.decoder)
56-
.single as Map<String, dynamic>)
53+
.readWithRetry(
54+
downloadCounts30DaysTotalsFileName,
55+
(input) async => await input
56+
.transform(utf8.decoder)
57+
.transform(json.decoder)
58+
.single as Map<String, dynamic>,
59+
))
5760
.cast<String, int>();
5861
_lastData = (data: data, etag: info.etag);
5962
return data;

app/lib/shared/storage.dart

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,14 @@ extension BucketExt on Bucket {
180180
);
181181
}
182182

183+
/// Read object content as byte stream using the callback function to receive data chunks.
184+
///
185+
/// When network error occurs, the entire stream is restarted and [fn] is called again.
186+
Future<T> readWithRetry<T>(
187+
String objectName, Future<T> Function(Stream<List<int>> input) fn) async {
188+
return await _retry(() async => fn(read(objectName)));
189+
}
190+
183191
/// The HTTP URL of a publicly accessable GCS object.
184192
String objectUrl(String objectName) {
185193
return '${activeConfiguration.storageBaseUrl}/$bucketName/$objectName';
@@ -399,12 +407,14 @@ class VersionedJsonStorage {
399407
}
400408
final objectName = _objectName(version);
401409
_logger.info('Loading snapshot: $objectName');
402-
final map = await _bucket
403-
.read(objectName)
404-
.transform(_gzip.decoder)
405-
.transform(utf8.decoder)
406-
.transform(json.decoder)
407-
.single;
410+
final map = await _bucket.readWithRetry(
411+
objectName,
412+
(input) => input
413+
.transform(_gzip.decoder)
414+
.transform(utf8.decoder)
415+
.transform(json.decoder)
416+
.single,
417+
);
408418
return map as Map<String, dynamic>;
409419
}
410420

pkg/fake_gcloud/lib/retry_enforcer_storage.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ class _RetryEnforcerBucket implements Bucket {
164164

165165
@override
166166
Stream<List<int>> read(String objectName, {int? offset, int? length}) {
167-
// TODO: verify retry wrapper here
167+
_verifyRetryOnStack();
168168
return _bucket.read(objectName, offset: offset, length: length);
169169
}
170170

0 commit comments

Comments
 (0)