From 19edaa72e01fefe24565ec0d4f7440427af2ac02 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Thu, 5 Dec 2024 12:14:11 +0100 Subject: [PATCH 1/2] Refactor + merge post-test verification. --- app/test/shared/test_services.dart | 53 ++++++++++++++---------------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/app/test/shared/test_services.dart b/app/test/shared/test_services.dart index ace8d243b2..012cf95825 100644 --- a/app/test/shared/test_services.dart +++ b/app/test/shared/test_services.dart @@ -121,30 +121,35 @@ class FakeAppengineEnv { await fork(() async { await fn(); }); - final problems = - await IntegrityChecker(dbService).findProblems().toList(); - if (problems.isNotEmpty && - (integrityProblem == null || - integrityProblem.matchAsPrefix(problems.first) == null)) { - throw Exception( - '${problems.length} integrity problems detected. First: ${problems.first}'); - } else if (problems.isEmpty && integrityProblem != null) { - throw Exception('Integrity problem expected but not present.'); - } - - // TODO: run all background tasks here - await backfillNewFields(); - - // re-run integrity checks on the updated state - final laterProblems = - await IntegrityChecker(dbService).findProblems().toList(); - expect(laterProblems, problems); + await _postTestVerification(integrityProblem: integrityProblem); }, ); }) as R; } } +Future _postTestVerification({ + required Pattern? integrityProblem, +}) async { + final problems = await IntegrityChecker(dbService).findProblems().toList(); + if (problems.isNotEmpty && + (integrityProblem == null || + integrityProblem.matchAsPrefix(problems.first) == null)) { + throw Exception( + '${problems.length} integrity problems detected. First: ${problems.first}'); + } else if (problems.isEmpty && integrityProblem != null) { + throw Exception('Integrity problem expected but not present.'); + } + + // TODO: run all background tasks here + await backfillNewFields(); + + // re-run integrity checks on the updated state + final laterProblems = + await IntegrityChecker(dbService).findProblems().toList(); + expect(laterProblems, problems); +} + /// Registers test with [name] and runs it in pkg/fake_gcloud's scope, populated /// with [testProfile] data. @isTest @@ -210,17 +215,7 @@ void testWithFakeTime( await fork(() async { await fn(fakeTime); }); - // post-test integrity check - final problems = - await IntegrityChecker(dbService).findProblems().toList(); - if (problems.isNotEmpty && - (integrityProblem == null || - integrityProblem.matchAsPrefix(problems.first) == null)) { - throw Exception( - '${problems.length} integrity problems detected. First: ${problems.first}'); - } else if (problems.isEmpty && integrityProblem != null) { - throw Exception('Integrity problem expected but not present.'); - } + await _postTestVerification(integrityProblem: integrityProblem); }, ); }); From 4ad7aaec0d47af01f7440e15e59f44c08e9a4d15 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Thu, 5 Dec 2024 12:14:11 +0100 Subject: [PATCH 2/2] Refactor period task initialization + running all task at the end of tests. --- app/lib/service/entrypoint/analyzer.dart | 3 +- app/lib/tool/neat_task/pub_dev_tasks.dart | 341 +++++++++++----------- app/test/shared/test_services.dart | 8 +- 3 files changed, 176 insertions(+), 176 deletions(-) diff --git a/app/lib/service/entrypoint/analyzer.dart b/app/lib/service/entrypoint/analyzer.dart index acc0d40c87..3ef01f2d0d 100644 --- a/app/lib/service/entrypoint/analyzer.dart +++ b/app/lib/service/entrypoint/analyzer.dart @@ -68,8 +68,7 @@ Future _workerMain(EntryMessage message) async { await taskBackend.start(); registerScopeExitCallback(() => taskBackend.stop()); - setupAnalyzerPeriodicTasks(); - setupSearchPeriodicTasks(); + setupPeriodTaskSchedulers(); // wait indefinitely await Completer().future; diff --git a/app/lib/tool/neat_task/pub_dev_tasks.dart b/app/lib/tool/neat_task/pub_dev_tasks.dart index 2ee78bac5d..dcc1c53e32 100644 --- a/app/lib/tool/neat_task/pub_dev_tasks.dart +++ b/app/lib/tool/neat_task/pub_dev_tasks.dart @@ -34,199 +34,207 @@ import 'datastore_status_provider.dart'; final _logger = Logger('pub_dev_tasks'); -/// Periodic task that are not tied to a specific service. -void _setupGenericPeriodicTasks() { - // Tries to send pending outgoing emails. - _15mins( - name: 'send-outgoing-emails', - isRuntimeVersioned: false, - task: () async { - final acquireAbort = Completer(); - final acquireTimer = Timer(Duration(minutes: 2), () { - acquireAbort.complete(); - }); - - try { - final lock = GlobalLock.create( - 'send-outgoing-emails', - expiration: Duration(minutes: 20), - ); - await lock.withClaim( - (claim) async { - await emailBackend.trySendAllOutgoingEmails( - stopAfter: Duration(minutes: 10), - ); - }, - abort: acquireAbort, - ); - } finally { - acquireTimer.cancel(); - } - }, - ); +/// Creates and initialized periodic task schedulers. +void setupPeriodTaskSchedulers() { + for (final scheduler in createPeriodicTaskSchedulers()) { + ss.registerScopeExitCallback(() => scheduler.stop()); + scheduler.start(); + } +} - // Deletes outgoing email entries that had failed to deliver. - _daily( - name: 'delete-outgoing-emails', - isRuntimeVersioned: false, - task: emailBackend.deleteDeadOutgoingEmails, - ); +/// List of periodic task schedulers. +List createPeriodicTaskSchedulers() { + return [ + // Tries to send pending outgoing emails. + _15mins( + name: 'send-outgoing-emails', + isRuntimeVersioned: false, + task: () async { + final acquireAbort = Completer(); + final acquireTimer = Timer(Duration(minutes: 2), () { + acquireAbort.complete(); + }); + + try { + final lock = GlobalLock.create( + 'send-outgoing-emails', + expiration: Duration(minutes: 20), + ); + await lock.withClaim( + (claim) async { + await emailBackend.trySendAllOutgoingEmails( + stopAfter: Duration(minutes: 10), + ); + }, + abort: acquireAbort, + ); + } finally { + acquireTimer.cancel(); + } + }, + ), - // Backfills the fields that are new to the current release. - _daily( - name: 'backfill-new-fields', - isRuntimeVersioned: true, - task: backfillNewFields, - ); + // Deletes outgoing email entries that had failed to deliver. + _daily( + name: 'delete-outgoing-emails', + isRuntimeVersioned: false, + task: emailBackend.deleteDeadOutgoingEmails, + ), - // Deletes expired audit log records. - _daily( - name: 'delete-expired-audit-log-records', - isRuntimeVersioned: false, - task: () async => await auditBackend.deleteExpiredRecords(), - ); + // Backfills the fields that are new to the current release. + _daily( + name: 'backfill-new-fields', + isRuntimeVersioned: true, + task: backfillNewFields, + ), - // Deletes expired consent invites. - _daily( - name: 'delete-expired-consents', - isRuntimeVersioned: false, - task: () async => await consentBackend.deleteObsoleteConsents(), - ); + // Deletes expired audit log records. + _daily( + name: 'delete-expired-audit-log-records', + isRuntimeVersioned: false, + task: () async => await auditBackend.deleteExpiredRecords(), + ), - // Deletes expired sessions. - _daily( - name: 'delete-expired-sessions', - isRuntimeVersioned: false, - task: () async => await accountBackend.deleteExpiredSessions(), - ); + // Deletes expired consent invites. + _daily( + name: 'delete-expired-consents', + isRuntimeVersioned: false, + task: () async => await consentBackend.deleteObsoleteConsents(), + ), - // Updates Package's stable, prerelease and preview version fields in case a - // new Dart SDK got released. - _daily( - name: 'update-package-versions', - isRuntimeVersioned: false, - task: () async => await packageBackend.updateAllPackageVersions(), - ); + // Deletes expired sessions. + _daily( + name: 'delete-expired-sessions', + isRuntimeVersioned: false, + task: () async => await accountBackend.deleteExpiredSessions(), + ), - // Updates the public archive bucket from the canonical bucket, for the - // unlikely case where an archive may be missing. - _daily( - name: 'sync-public-bucket-from-canonical-bucket', - isRuntimeVersioned: false, - task: () async => - await packageBackend.tarballStorage.updatePublicArchiveBucket(), - ); + // Updates Package's stable, prerelease and preview version fields in case a + // new Dart SDK got released. + _daily( + name: 'update-package-versions', + isRuntimeVersioned: false, + task: () async => await packageBackend.updateAllPackageVersions(), + ), - // Exports the package name completion data to a bucket. - _daily( - name: 'synchronize-exported-api', - isRuntimeVersioned: true, - task: () async => await apiExporter?.synchronizeExportedApi(), - ); + // Updates the public archive bucket from the canonical bucket, for the + // unlikely case where an archive may be missing. + _daily( + name: 'sync-public-bucket-from-canonical-bucket', + isRuntimeVersioned: false, + task: () async => + await packageBackend.tarballStorage.updatePublicArchiveBucket(), + ), - // Deletes moderated packages, versions, publishers and users. - _weekly( - name: 'delete-moderated-subjects', - isRuntimeVersioned: false, - task: () async => adminBackend.deleteModeratedSubjects(), - ); + // Exports the package name completion data to a bucket. + _daily( + name: 'synchronize-exported-api', + isRuntimeVersioned: true, + task: () async => await apiExporter?.synchronizeExportedApi(), + ), - // Deletes task status entities where the status hasn't been updated - // for more than a month. - _weekly( - name: 'delete-old-neat-task-statuses', - isRuntimeVersioned: false, - task: () => deleteOldNeatTaskStatuses(dbService), - ); + // Deletes moderated packages, versions, publishers and users. + _weekly( + name: 'delete-moderated-subjects', + isRuntimeVersioned: false, + task: () async => adminBackend.deleteModeratedSubjects(), + ), - // Deletes orphaned like entities that are missing a reference. - _weekly( - name: 'remove-orphaned-likes', - isRuntimeVersioned: false, - task: removeOrphanedLikes, - ); + // Deletes task status entities where the status hasn't been updated + // for more than a month. + _weekly( + name: 'delete-old-neat-task-statuses', + isRuntimeVersioned: false, + task: () => deleteOldNeatTaskStatuses(dbService), + ), - // Updates Package.likes with the correct new value. - _weekly( - name: 'update-package-likes', - isRuntimeVersioned: false, - task: updatePackageLikes, - ); + // Deletes orphaned like entities that are missing a reference. + _weekly( + name: 'remove-orphaned-likes', + isRuntimeVersioned: false, + task: removeOrphanedLikes, + ), - // Updates PackageState in taskBackend - _weekly( - name: 'backfill-task-tracking-state', - isRuntimeVersioned: true, - task: taskBackend.backfillTrackingState, - ); + // Updates Package.likes with the correct new value. + _weekly( + name: 'update-package-likes', + isRuntimeVersioned: false, + task: updatePackageLikes, + ), - // Deletes task results for old runtime versions - _weekly( - name: 'garbage-collect-task-results', - isRuntimeVersioned: false, - task: taskBackend.garbageCollect, - ); + // Updates PackageState in taskBackend + _weekly( + name: 'backfill-task-tracking-state', + isRuntimeVersioned: true, + task: taskBackend.backfillTrackingState, + ), + + // Deletes task results for old runtime versions + _weekly( + name: 'garbage-collect-task-results', + isRuntimeVersioned: false, + task: taskBackend.garbageCollect, + ), - // Delete very old instances that have been abandoned - _daily( - name: 'garbage-collect-old-instances', - isRuntimeVersioned: false, - task: () async => await deleteAbandonedInstances( - project: activeConfiguration.taskWorkerProject!, + // Delete very old instances that have been abandoned + _daily( + name: 'garbage-collect-old-instances', + isRuntimeVersioned: false, + task: () async => await deleteAbandonedInstances( + project: activeConfiguration.taskWorkerProject!, + ), ), - ); - _daily( + _daily( name: 'sync-download-counts', isRuntimeVersioned: false, - task: syncDownloadCounts); + task: syncDownloadCounts, + ), - _daily( + _daily( name: 'compute-download-counts-30-days-totals', isRuntimeVersioned: false, - task: compute30DaysTotalTask); + task: compute30DaysTotalTask, + ), - _daily(name: 'count-topics', isRuntimeVersioned: false, task: countTopics); + _daily( + name: 'count-topics', + isRuntimeVersioned: false, + task: countTopics, + ), - _daily( + _daily( name: 'sync-security-advisories', isRuntimeVersioned: false, - task: syncSecurityAdvisories); - - // TODO: setup tasks to remove known obsolete (but now unmapped) fields from entities -} + task: syncSecurityAdvisories, + ), -/// Setup the tasks that we are running in the analyzer service. -void setupAnalyzerPeriodicTasks() { - _setupGenericPeriodicTasks(); - - // Checks the Datastore integrity of the model objects. - _weekly( - name: 'check-datastore-integrity', - isRuntimeVersioned: true, - task: () async => - await IntegrityChecker(dbService, concurrency: 2).verifyAndLogIssues(), - timeout: Duration(days: 1), - ); -} + // Checks the Datastore integrity of the model objects. + _weekly( + name: 'check-datastore-integrity', + isRuntimeVersioned: true, + task: () async => await IntegrityChecker(dbService, concurrency: 2) + .verifyAndLogIssues(), + timeout: Duration(days: 1), + ), + // Deletes the old search snapshots + _weekly( + name: 'delete-old-search-snapshots', + isRuntimeVersioned: true, + task: () => searchBackend.deleteOldData(), + ), -/// Setup the tasks that we are running in the search service. -void setupSearchPeriodicTasks() { - // Deletes the old search snapshots - _weekly( - name: 'delete-old-search-snapshots', - isRuntimeVersioned: true, - task: () => searchBackend.deleteOldData(), - ); + // TODO: setup tasks to remove known obsolete (but now unmapped) fields from entities + ]; } // ignore: non_constant_identifier_names -void _15mins({ +NeatPeriodicTaskScheduler _15mins({ required String name, required bool isRuntimeVersioned, required NeatPeriodicTask task, }) { - final scheduler = NeatPeriodicTaskScheduler( + return NeatPeriodicTaskScheduler( name: name, interval: Duration(minutes: 15), timeout: Duration(minutes: 10), @@ -234,17 +242,14 @@ void _15mins({ isRuntimeVersioned: isRuntimeVersioned), task: _wrapMemoryLogging(name, task), ); - - ss.registerScopeExitCallback(() => scheduler.stop()); - scheduler.start(); } -void _daily({ +NeatPeriodicTaskScheduler _daily({ required String name, required bool isRuntimeVersioned, required NeatPeriodicTask task, }) { - final scheduler = NeatPeriodicTaskScheduler( + return NeatPeriodicTaskScheduler( name: name, interval: Duration(hours: 24), timeout: Duration(hours: 12), @@ -252,18 +257,15 @@ void _daily({ isRuntimeVersioned: isRuntimeVersioned), task: _wrapMemoryLogging(name, task), ); - - ss.registerScopeExitCallback(() => scheduler.stop()); - scheduler.start(); } -void _weekly({ +NeatPeriodicTaskScheduler _weekly({ required String name, required bool isRuntimeVersioned, required NeatPeriodicTask task, Duration timeout = const Duration(hours: 12), }) { - final scheduler = NeatPeriodicTaskScheduler( + return NeatPeriodicTaskScheduler( name: name, interval: Duration(days: 6), // shifts the day when the task is triggered timeout: timeout, @@ -271,9 +273,6 @@ void _weekly({ isRuntimeVersioned: isRuntimeVersioned), task: _wrapMemoryLogging(name, task), ); - - ss.registerScopeExitCallback(() => scheduler.stop()); - scheduler.start(); } NeatPeriodicTask _wrapMemoryLogging(String name, NeatPeriodicTask task) { diff --git a/app/test/shared/test_services.dart b/app/test/shared/test_services.dart index 012cf95825..6ee0a8e8ab 100644 --- a/app/test/shared/test_services.dart +++ b/app/test/shared/test_services.dart @@ -31,7 +31,7 @@ import 'package:pub_dev/shared/redis_cache.dart'; import 'package:pub_dev/shared/versions.dart'; import 'package:pub_dev/task/cloudcompute/fakecloudcompute.dart'; import 'package:pub_dev/task/global_lock.dart'; -import 'package:pub_dev/tool/backfill/backfill_new_fields.dart'; +import 'package:pub_dev/tool/neat_task/pub_dev_tasks.dart'; import 'package:pub_dev/tool/test_profile/import_source.dart'; import 'package:pub_dev/tool/test_profile/importer.dart'; import 'package:pub_dev/tool/test_profile/models.dart'; @@ -141,8 +141,10 @@ Future _postTestVerification({ throw Exception('Integrity problem expected but not present.'); } - // TODO: run all background tasks here - await backfillNewFields(); + // run all background tasks here + for (final scheduler in createPeriodicTaskSchedulers()) { + await scheduler.trigger(); + } // re-run integrity checks on the updated state final laterProblems =