@@ -8,6 +8,7 @@ import 'dart:math' as math;
88import 'package:_pub_shared/search/tags.dart' ;
99import 'package:_pub_shared/utils/http.dart' ;
1010import 'package:clock/clock.dart' ;
11+ import 'package:collection/collection.dart' ;
1112import 'package:crypto/crypto.dart' ;
1213import 'package:http/http.dart' as http;
1314import 'package:logging/logging.dart' ;
@@ -72,25 +73,25 @@ class _BaseIntegrityChecker {
7273 }
7374 }
7475
75- Stream <String > _queryWithPool <R extends Model >(
76- Stream <String > Function (R model) fn, {
76+ Stream <String > _streamWithPool <T >(
77+ Stream <T > Function () streamFn,
78+ Stream <String > Function (T item) itemFn, {
7779 /// Note: This time limit aborts the integrity check after a reasonable
7880 /// amount of time has passed with an entity-related operation.
7981 ///
8082 /// The integrity check process should be restarted soon after, and
8183 /// hopefully it should complete on the next round.
82- Duration timeLimit = const Duration (minutes : 15 ) ,
84+ Duration ? timeLimit,
8385 }) async * {
84- final query = _db. query < R >( );
86+ timeLimit ?? = const Duration (minutes : 15 );
8587 final pool = Pool (_concurrency);
8688 final futures = < Future <List <String >>> [];
8789 try {
88- await for (final m in query.run ()) {
89- _updateUnmappedFields (m);
90+ await for (final item in streamFn ()) {
9091 final taskFuture = pool.withResource (() async {
91- final f = fn (m ).toList ();
92+ final f = itemFn (item ).toList ();
9293 try {
93- return await f.timeout (timeLimit);
94+ return await f.timeout (timeLimit! );
9495 } on TimeoutException catch (e, st) {
9596 _logger.pubNoticeShout ('integrity-check-timeout' ,
9697 'Integrity check operation timed out.' , e, st);
@@ -108,6 +109,25 @@ class _BaseIntegrityChecker {
108109 await pool.close ();
109110 }
110111 }
112+
113+ Stream <String > _queryWithPool <R extends Model >(
114+ Stream <String > Function (R model) fn, {
115+ /// Note: This time limit aborts the integrity check after a reasonable
116+ /// amount of time has passed with an entity-related operation.
117+ ///
118+ /// The integrity check process should be restarted soon after, and
119+ /// hopefully it should complete on the next round.
120+ Duration ? timeLimit,
121+ }) async * {
122+ yield * _streamWithPool (
123+ () => _db.query <R >().run (),
124+ (m) async * {
125+ _updateUnmappedFields (m);
126+ yield * fn (m);
127+ },
128+ timeLimit: timeLimit,
129+ );
130+ }
111131}
112132
113133/// Checks the integrity of the datastore.
@@ -1017,6 +1037,7 @@ class TarballIntegrityChecker extends _BaseIntegrityChecker {
10171037 final httpClient = httpRetryClient (lenient: true );
10181038 try {
10191039 yield * _checkVersions (httpClient);
1040+ yield * _checkCanonicalFiles ();
10201041 } finally {
10211042 httpClient.close ();
10221043 }
@@ -1090,4 +1111,42 @@ class TarballIntegrityChecker extends _BaseIntegrityChecker {
10901111 yield 'PackageVersion "${pv .qualifiedVersionKey }" has no matching archive file (HTTP status ${rs .statusCode }).' ;
10911112 }
10921113 }
1114+
1115+ /// Checks if the canonical bucket contains files that have existing database entry.
1116+ Stream <String > _checkCanonicalFiles () async * {
1117+ _logger.info ('Scanning canonical bucket...' );
1118+
1119+ // NOTE: To make it effiecient, we don't list all the files in the bucket,
1120+ // only the package names. This may leave out other files that are not in
1121+ // the `packages/` directory (objectname-prefix), or for some reason not
1122+ // matched as package names.
1123+ final packages = await packageBackend.tarballStorage
1124+ .listPackagesInCanonicalBucket ()
1125+ .timeout (Duration (minutes: 15 ));
1126+ yield * _streamWithPool (
1127+ () => Stream .fromIterable (packages),
1128+ (package) async * {
1129+ final p = await packageBackend.lookupPackage (package);
1130+ if (p == null ) {
1131+ yield 'Missing Package entity in database: "$package ".' ;
1132+ return ;
1133+ }
1134+ final bucketVersions = await packageBackend.tarballStorage
1135+ .listVersionsInCanonicalBucket (package);
1136+ final dbVersions = await packageBackend.versionsOfPackage (package);
1137+
1138+ for (final e in bucketVersions.entries) {
1139+ final version = e.key;
1140+ final dbv = dbVersions.singleWhereOrNull ((e) => e.version == version);
1141+ if (dbv == null ) {
1142+ // NOTE: This check may expose files that become stale during an aborted upload.
1143+ // If that's the case, further bucket cleanup may be required.
1144+ yield 'Missing PackageVersion entity in database: "$package /$version ". May be a stale upload, investigatation needed!' ;
1145+ }
1146+ }
1147+
1148+ // Note: checking the other way around is already done via iterating over the version entries in the database.
1149+ },
1150+ );
1151+ }
10931152}
0 commit comments