@@ -26,6 +26,7 @@ import '../publisher/models.dart';
2626import '../service/email/email_templates.dart'
2727 show isValidEmail, looksLikeEmail;
2828import '../shared/env_config.dart' ;
29+ import '../shared/monitoring.dart' ;
2930import 'configuration.dart' ;
3031import 'datastore.dart' ;
3132import 'storage.dart' ;
@@ -947,15 +948,31 @@ class IntegrityChecker {
947948 }
948949
949950 Stream <String > _queryWithPool <R extends Model >(
950- Stream <String > Function (R model) fn) async * {
951+ Stream <String > Function (R model) fn, {
952+ /// Note: This time limit aborts the integrity check after a reasonable
953+ /// amount of time has passed with an entity-related operation.
954+ ///
955+ /// The integrity check process should be restarted soon after, and
956+ /// hopefully it should complete on the next round.
957+ Duration timeLimit = const Duration (minutes: 15 ),
958+ }) async * {
951959 final query = _db.query <R >();
952960 final pool = Pool (_concurrency);
953961 final futures = < Future <List <String >>> [];
954962 try {
955963 await for (final m in query.run ()) {
956964 _updateUnmappedFields (m);
957- final f = pool.withResource (() => fn (m).toList ());
958- futures.add (f);
965+ final taskFuture = pool.withResource (() async {
966+ final f = fn (m).toList ();
967+ try {
968+ return await f.timeout (timeLimit);
969+ } on TimeoutException catch (e, st) {
970+ _logger.pubNoticeShout ('integrity-check-timeout' ,
971+ 'Integrity check operation timed out.' , e, st);
972+ rethrow ;
973+ }
974+ });
975+ futures.add (taskFuture);
959976 }
960977 for (final f in futures) {
961978 for (final item in await f) {
0 commit comments