From 62df0cfbd1344f98834708a764e45b6376ab71d2 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 4 Jul 2025 14:25:55 +0200 Subject: [PATCH] Integrity check: increased concurrency, time limit per entity processing. --- CHANGELOG.md | 2 ++ app/lib/shared/integrity.dart | 23 ++++++++++++++++++++--- app/lib/shared/versions.dart | 4 ++-- app/lib/tool/neat_task/pub_dev_tasks.dart | 2 +- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea88edd554..941526e82c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ Important changes to data models, configuration, and migrations between each AppEngine version, listed here to ease deployment and troubleshooting. ## Next Release (replace with git tag when deployed) + * Bump runtimeVersion to `2025.07.04`. + * Note: integrity check is running with internal concurrency of 4 and 15 minutes time limit / entity. ## `20250703t113600-all` * Bump runtimeVersion to `2025.07.01`. diff --git a/app/lib/shared/integrity.dart b/app/lib/shared/integrity.dart index e2ef41488d..2e7f69a480 100644 --- a/app/lib/shared/integrity.dart +++ b/app/lib/shared/integrity.dart @@ -26,6 +26,7 @@ import '../publisher/models.dart'; import '../service/email/email_templates.dart' show isValidEmail, looksLikeEmail; import '../shared/env_config.dart'; +import '../shared/monitoring.dart'; import 'configuration.dart'; import 'datastore.dart'; import 'storage.dart'; @@ -947,15 +948,31 @@ class IntegrityChecker { } Stream _queryWithPool( - Stream Function(R model) fn) async* { + Stream Function(R model) fn, { + /// Note: This time limit aborts the integrity check after a reasonable + /// amount of time has passed with an entity-related operation. + /// + /// The integrity check process should be restarted soon after, and + /// hopefully it should complete on the next round. + Duration timeLimit = const Duration(minutes: 15), + }) async* { final query = _db.query(); final pool = Pool(_concurrency); final futures = >>[]; try { await for (final m in query.run()) { _updateUnmappedFields(m); - final f = pool.withResource(() => fn(m).toList()); - futures.add(f); + final taskFuture = pool.withResource(() async { + final f = fn(m).toList(); + try { + return await f.timeout(timeLimit); + } on TimeoutException catch (e, st) { + _logger.pubNoticeShout('integrity-check-timeout', + 'Integrity check operation timed out.', e, st); + rethrow; + } + }); + futures.add(taskFuture); } for (final f in futures) { for (final item in await f) { diff --git a/app/lib/shared/versions.dart b/app/lib/shared/versions.dart index 1bd0cabb82..cd6be60b9a 100644 --- a/app/lib/shared/versions.dart +++ b/app/lib/shared/versions.dart @@ -24,10 +24,10 @@ final RegExp runtimeVersionPattern = RegExp(r'^\d{4}\.\d{2}\.\d{2}$'); /// when the version switch happens. const _acceptedRuntimeVersions = [ // The current [runtimeVersion]. - '2025.07.01', + '2025.07.04', // Fallback runtime versions. + '2025.07.01', '2025.06.20', - '2025.06.03', ]; /// Sets the current runtime versions. diff --git a/app/lib/tool/neat_task/pub_dev_tasks.dart b/app/lib/tool/neat_task/pub_dev_tasks.dart index 30bd099405..2d6d448b08 100644 --- a/app/lib/tool/neat_task/pub_dev_tasks.dart +++ b/app/lib/tool/neat_task/pub_dev_tasks.dart @@ -244,7 +244,7 @@ List createPeriodicTaskSchedulers({ _weekly( name: 'check-datastore-integrity', isRuntimeVersioned: true, - task: () async => await IntegrityChecker(dbService, concurrency: 2) + task: () async => await IntegrityChecker(dbService, concurrency: 4) .verifyAndLogIssues(), timeout: Duration(days: 1), ),