Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ Important changes to data models, configuration, and migrations between each
AppEngine version, listed here to ease deployment and troubleshooting.

## Next Release (replace with git tag when deployed)
* Bump runtimeVersion to `2025.07.04`.
* Note: integrity check is running with internal concurrency of 4 and 15 minutes time limit / entity.

## `20250703t113600-all`
* Bump runtimeVersion to `2025.07.01`.
Expand Down
23 changes: 20 additions & 3 deletions app/lib/shared/integrity.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import '../publisher/models.dart';
import '../service/email/email_templates.dart'
show isValidEmail, looksLikeEmail;
import '../shared/env_config.dart';
import '../shared/monitoring.dart';
import 'configuration.dart';
import 'datastore.dart';
import 'storage.dart';
Expand Down Expand Up @@ -947,15 +948,31 @@ class IntegrityChecker {
}

Stream<String> _queryWithPool<R extends Model>(
Stream<String> Function(R model) fn) async* {
Stream<String> Function(R model) fn, {
/// Note: This time limit aborts the integrity check after a reasonable
/// amount of time has passed with an entity-related operation.
///
/// The integrity check process should be restarted soon after, and
/// hopefully it should complete on the next round.
Duration timeLimit = const Duration(minutes: 15),
}) async* {
final query = _db.query<R>();
final pool = Pool(_concurrency);
final futures = <Future<List<String>>>[];
try {
await for (final m in query.run()) {
_updateUnmappedFields(m);
final f = pool.withResource(() => fn(m).toList());
futures.add(f);
final taskFuture = pool.withResource(() async {
final f = fn(m).toList();
try {
return await f.timeout(timeLimit);
} on TimeoutException catch (e, st) {
_logger.pubNoticeShout('integrity-check-timeout',
'Integrity check operation timed out.', e, st);
rethrow;
}
});
futures.add(taskFuture);
}
for (final f in futures) {
for (final item in await f) {
Expand Down
4 changes: 2 additions & 2 deletions app/lib/shared/versions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ final RegExp runtimeVersionPattern = RegExp(r'^\d{4}\.\d{2}\.\d{2}$');
/// when the version switch happens.
const _acceptedRuntimeVersions = <String>[
// The current [runtimeVersion].
'2025.07.01',
'2025.07.04',
// Fallback runtime versions.
'2025.07.01',
'2025.06.20',
'2025.06.03',
];

/// Sets the current runtime versions.
Expand Down
2 changes: 1 addition & 1 deletion app/lib/tool/neat_task/pub_dev_tasks.dart
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ List<NeatPeriodicTaskScheduler> createPeriodicTaskSchedulers({
_weekly(
name: 'check-datastore-integrity',
isRuntimeVersioned: true,
task: () async => await IntegrityChecker(dbService, concurrency: 2)
task: () async => await IntegrityChecker(dbService, concurrency: 4)
.verifyAndLogIssues(),
timeout: Duration(days: 1),
),
Expand Down