Skip to content

Commit f4882c6

Browse files
committed
Also test archive tarballs in canonical bucket.
1 parent 378b712 commit f4882c6

File tree

2 files changed

+80
-8
lines changed

2 files changed

+80
-8
lines changed

app/lib/package/tarball_storage.dart

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,19 @@ class TarballStorage {
6161
String getCanonicalBucketAbsoluteObjectName(String package, String version) =>
6262
_canonicalBucket.absoluteObjectName(tarballObjectName(package, version));
6363

64+
/// Get a list of package names in the canonical bucket.
65+
Future<List<String>> listPackagesInCanonicalBucket() async {
66+
final items = await _canonicalBucket.listAllItemsWithRetry(
67+
prefix: 'packages/', delimiter: '-');
68+
final packages = items
69+
.where((i) => i.isDirectory)
70+
.map((i) => i.name)
71+
.map((name) => name.substring(9).split('-').first)
72+
.toSet()
73+
.toList();
74+
return packages;
75+
}
76+
6477
/// Get map from `version` to [SourceObjectInfo] for each version of [package] in
6578
/// canonical bucket.
6679
Future<Map<String, SourceObjectInfo>> listVersionsInCanonicalBucket(

app/lib/shared/integrity.dart

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import 'dart:math' as math;
88
import 'package:_pub_shared/search/tags.dart';
99
import 'package:_pub_shared/utils/http.dart';
1010
import 'package:clock/clock.dart';
11+
import 'package:collection/collection.dart';
1112
import 'package:crypto/crypto.dart';
1213
import 'package:http/http.dart' as http;
1314
import 'package:logging/logging.dart';
@@ -72,25 +73,25 @@ class _BaseIntegrityChecker {
7273
}
7374
}
7475

75-
Stream<String> _queryWithPool<R extends Model>(
76-
Stream<String> Function(R model) fn, {
76+
Stream<String> _streamWithPool<T>(
77+
Stream<T> Function() streamFn,
78+
Stream<String> Function(T item) itemFn, {
7779
/// Note: This time limit aborts the integrity check after a reasonable
7880
/// amount of time has passed with an entity-related operation.
7981
///
8082
/// The integrity check process should be restarted soon after, and
8183
/// hopefully it should complete on the next round.
82-
Duration timeLimit = const Duration(minutes: 15),
84+
Duration? timeLimit,
8385
}) async* {
84-
final query = _db.query<R>();
86+
timeLimit ??= const Duration(minutes: 15);
8587
final pool = Pool(_concurrency);
8688
final futures = <Future<List<String>>>[];
8789
try {
88-
await for (final m in query.run()) {
89-
_updateUnmappedFields(m);
90+
await for (final item in streamFn()) {
9091
final taskFuture = pool.withResource(() async {
91-
final f = fn(m).toList();
92+
final f = itemFn(item).toList();
9293
try {
93-
return await f.timeout(timeLimit);
94+
return await f.timeout(timeLimit!);
9495
} on TimeoutException catch (e, st) {
9596
_logger.pubNoticeShout('integrity-check-timeout',
9697
'Integrity check operation timed out.', e, st);
@@ -108,6 +109,25 @@ class _BaseIntegrityChecker {
108109
await pool.close();
109110
}
110111
}
112+
113+
Stream<String> _queryWithPool<R extends Model>(
114+
Stream<String> Function(R model) fn, {
115+
/// Note: This time limit aborts the integrity check after a reasonable
116+
/// amount of time has passed with an entity-related operation.
117+
///
118+
/// The integrity check process should be restarted soon after, and
119+
/// hopefully it should complete on the next round.
120+
Duration? timeLimit,
121+
}) async* {
122+
yield* _streamWithPool(
123+
() => _db.query<R>().run(),
124+
(m) async* {
125+
_updateUnmappedFields(m);
126+
yield* fn(m);
127+
},
128+
timeLimit: timeLimit,
129+
);
130+
}
111131
}
112132

113133
/// Checks the integrity of the datastore.
@@ -1017,6 +1037,7 @@ class TarballIntegrityChecker extends _BaseIntegrityChecker {
10171037
final httpClient = httpRetryClient(lenient: true);
10181038
try {
10191039
yield* _checkVersions(httpClient);
1040+
yield* _checkCanonicalFiles();
10201041
} finally {
10211042
httpClient.close();
10221043
}
@@ -1090,4 +1111,42 @@ class TarballIntegrityChecker extends _BaseIntegrityChecker {
10901111
yield 'PackageVersion "${pv.qualifiedVersionKey}" has no matching archive file (HTTP status ${rs.statusCode}).';
10911112
}
10921113
}
1114+
1115+
/// Checks if the canonical bucket contains files that have existing database entry.
1116+
Stream<String> _checkCanonicalFiles() async* {
1117+
_logger.info('Scanning canonical bucket...');
1118+
1119+
// NOTE: To make it effiecient, we don't list all the files in the bucket,
1120+
// only the package names. This may leave out other files that are not in
1121+
// the `packages/` directory (objectname-prefix), or for some reason not
1122+
// matched as package names.
1123+
final packages = await packageBackend.tarballStorage
1124+
.listPackagesInCanonicalBucket()
1125+
.timeout(Duration(minutes: 15));
1126+
yield* _streamWithPool(
1127+
() => Stream.fromIterable(packages),
1128+
(package) async* {
1129+
final p = await packageBackend.lookupPackage(package);
1130+
if (p == null) {
1131+
yield 'Missing Package entity in database: "$package".';
1132+
return;
1133+
}
1134+
final bucketVersions = await packageBackend.tarballStorage
1135+
.listVersionsInCanonicalBucket(package);
1136+
final dbVersions = await packageBackend.versionsOfPackage(package);
1137+
1138+
for (final e in bucketVersions.entries) {
1139+
final version = e.key;
1140+
final dbv = dbVersions.singleWhereOrNull((e) => e.version == version);
1141+
if (dbv == null) {
1142+
// NOTE: This check may expose files that become stale during an aborted upload.
1143+
// If that's the case, further bucket cleanup may be required.
1144+
yield 'Missing PackageVersion entity in database: "$package/$version". May be a stale upload, investigatation needed!';
1145+
}
1146+
}
1147+
1148+
// Note: checking the other way around is already done via iterating over the version entries in the database.
1149+
},
1150+
);
1151+
}
10931152
}

0 commit comments

Comments
 (0)