diff --git a/app/lib/package/api_export/api_exporter.dart b/app/lib/package/api_export/api_exporter.dart index a0d3be3328..becb38f366 100644 --- a/app/lib/package/api_export/api_exporter.dart +++ b/app/lib/package/api_export/api_exporter.dart @@ -2,28 +2,26 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -import 'package:basics/basics.dart'; +import 'dart:async'; + +import 'package:_pub_shared/data/package_api.dart'; import 'package:clock/clock.dart'; import 'package:gcloud/service_scope.dart' as ss; import 'package:gcloud/storage.dart'; import 'package:logging/logging.dart'; -import 'package:meta/meta.dart'; -import 'package:pool/pool.dart'; -import 'package:retry/retry.dart'; +import 'package:pub_dev/service/security_advisories/backend.dart'; +import 'package:pub_dev/shared/exceptions.dart'; +import 'package:pub_dev/shared/parallel_foreach.dart'; import '../../search/backend.dart'; import '../../shared/datastore.dart'; -import '../../shared/storage.dart'; import '../../shared/versions.dart'; import '../../task/global_lock.dart'; import '../backend.dart'; import '../models.dart'; import 'exported_api.dart'; -final Logger _logger = Logger('export_api_to_bucket'); - -/// The default concurrency to upload API JSON files to the bucket. -const _defaultBucketUpdateConcurrency = 8; +final Logger _log = Logger('api_export.api_exporter'); /// Sets the API Exporter service. void registerApiExporter(ApiExporter value) => @@ -32,207 +30,233 @@ void registerApiExporter(ApiExporter value) => /// The active API Exporter service or null if it hasn't been initialized. ApiExporter? get apiExporter => ss.lookup(#_apiExporter) as ApiExporter?; +const _concurrency = 30; + class ApiExporter { final ExportedApi _api; - final Bucket _bucket; - final int _concurrency; - final _pkgLastUpdated = {}; + + /// If [stop] has been called to stop background processes. + /// + /// `null` when not started yet, or we have been fully stopped. + Completer? _aborted; + + /// If background processes created by [start] have stopped. + /// + /// This won't be resolved if [start] has not been called! + /// `null` when not started yet. + Completer? _stopped; ApiExporter({ required Bucket bucket, - int concurrency = _defaultBucketUpdateConcurrency, - }) : _api = ExportedApi(storageService, bucket), - _bucket = bucket, - _concurrency = concurrency; + }) : _api = ExportedApi(storageService, bucket); - /// Runs a forever loop and tries to get a global lock. - /// - /// Once it has the claim, it scans the package entities and uploads - /// the package API JSONs to the bucket. - /// Tracks the package updates for the next up-to 24 hours and writes - /// the API JSONs after every few minutes. + /// Start continuous background processes for scheduling of tasks. /// - /// When other process has the claim, the loop waits a minute before - /// attempting to get the claim. - Future uploadInForeverLoop() async { - final lock = GlobalLock.create( - '$runtimeVersion/package/update-api-bucket', - expiration: Duration(minutes: 20), - ); - while (true) { + /// Calling [start] without first calling [stop] is an error. + Future start() async { + if (_aborted != null) { + throw StateError('ApiExporter.start() has already been called!'); + } + // Note: During testing we call [start] and [stop] in a [FakeAsync.run], + // this only works because the completers are created here. + // If we create the completers in the constructor which gets called + // outside [FakeAsync.run], then this won't work. + // In the future we hopefully support running the entire service using + // FakeAsync, but this point we rely on completers being created when + // [start] is called -- and not in the [ApiExporter] constructor. + final aborted = _aborted = Completer(); + final stopped = _stopped = Completer(); + + // Start scanning for packages to be tracked + scheduleMicrotask(() async { try { - await lock.withClaim((claim) async { - await incrementalPkgScanAndUpload(claim); - }); + // Create a lock for task scheduling, so tasks + final lock = GlobalLock.create( + '$runtimeVersion/package/scan-sync-export-api', + expiration: Duration(minutes: 25), + ); + + while (!aborted.isCompleted) { + // Acquire the global lock and scan for package changes while lock is + // valid. + try { + await lock.withClaim((claim) async { + await _scanForPackageUpdates(claim, abort: aborted); + }, abort: aborted); + } catch (e, st) { + // Log this as very bad, and then move on. Nothing good can come + // from straight up stopping. + _log.shout( + 'scanning failed (will retry when lock becomes free)', + e, + st, + ); + // Sleep 5 minutes to reduce risk of degenerate behavior + await Future.delayed(Duration(minutes: 5)); + } + } } catch (e, st) { - _logger.warning('Package API bucket update failed.', e, st); + _log.severe('scanning loop crashed', e, st); + } finally { + _log.info('scanning loop stopped'); + // Report background processes as stopped + stopped.complete(); } - // Wait for 1 minutes for sanity, before trying again. - await Future.delayed(Duration(minutes: 1)); - } + }); } - /// Gets and uploads the package name completion data. - Future uploadPkgNameCompletionData() async { - await _api.packageNameCompletionData - .write(await searchBackend.getPackageNameCompletionData()); - } - - /// Note: there is no global locking here, the full scan should be called - /// only once every day, and it may be racing against the incremental - /// updates. - @visibleForTesting - Future fullPkgScanAndUpload() async { - final pool = Pool(_concurrency); - final futures = []; - await for (final mp in dbService.query().run()) { - final f = pool.withResource(() => _deletePackageFromBucket(mp.name!)); - futures.add(f); + /// Stop any background process that may be running. + /// + /// Calling this method is always safe. + Future stop() async { + final aborted = _aborted; + if (aborted == null) { + return; } - await Future.wait(futures); - futures.clear(); - - await for (final package in dbService.query().run()) { - final f = pool.withResource(() async { - if (package.isVisible) { - await _uploadPackageToBucket(package.name!); - } else { - await _deletePackageFromBucket(package.name!); - } - }); - futures.add(f); + if (!aborted.isCompleted) { + aborted.complete(); } - await Future.wait(futures); - await pool.close(); + await _stopped!.future; + _aborted = null; + _stopped = null; } - @visibleForTesting - Future incrementalPkgScanAndUpload( - GlobalLockClaim claim, { - Duration sleepDuration = const Duration(minutes: 2), - }) async { - final pool = Pool(_concurrency); - // The claim will be released after a day, another process may - // start to upload the API JSONs from scratch again. - final workUntil = clock.now().add(Duration(days: 1)); - - // start monitoring with a window of 7 days lookback - var lastQueryStarted = clock.now().subtract(Duration(days: 7)); - while (claim.valid) { - final now = clock.now().toUtc(); - if (now.isAfter(workUntil)) { - break; + /// Gets and uploads the package name completion data. + Future synchronizePackageNameCompletionData() async { + await _api.packageNameCompletionData.write( + await searchBackend.getPackageNameCompletionData(), + ); + } + + /// Synchronize all exported API. + /// + /// This is intended to be scheduled from a daily background task. + Future synchronizeExportedApi() async { + final allPackageNames = {}; + final packageQuery = dbService.query(); + await packageQuery.run().parallelForEach(_concurrency, (pkg) async { + final name = pkg.name!; + if (pkg.isNotVisible) { + return; } + allPackageNames.add(name); - // clear old entries from last seen cache - _pkgLastUpdated.removeWhere((key, event) => - now.difference(event.updated) > const Duration(hours: 1)); + // TODO: Consider retries around all this logic + await synchronizePackage(name); + }); - lastQueryStarted = now; - final futures = []; - final eventsSince = lastQueryStarted.subtract(Duration(minutes: 5)); - await for (final event in _queryRecentPkgUpdatedEvents(eventsSince)) { - if (!claim.valid) { - break; - } - final f = pool.withResource(() async { - if (!claim.valid) { - return; - } - final last = _pkgLastUpdated[event.package]; - if (last != null && last.updated.isAtOrAfter(event.updated)) { - return; - } - _pkgLastUpdated[event.package] = event; - if (event.isVisible) { - await _uploadPackageToBucket(event.package); - } else { - await _deletePackageFromBucket(event.package); - } - }); - futures.add(f); + await synchronizePackageNameCompletionData(); + + await _api.garbageCollect(allPackageNames); + } + + /// Sync package and into [ExportedApi], this will synchronize package into + /// [ExportedApi]. + /// + /// This method will update [ExportedApi] ensuring: + /// * Version listing for [package] is up-to-date, + /// * Advisories for [package] is up-to-date, + /// * Tarballs for each version of [package] is up-to-date, + /// * Delete tarballs from old versions that no-longer exist. + /// + /// This is intended when: + /// * Running a full background synchronization. + /// * When a change in [Package.updated] is detected. + /// * A package is moderated, or other admin action is applied. + Future synchronizePackage(String package) async { + _log.info('synchronizePackage("$package")'); + + final PackageData versionListing; + try { + versionListing = await packageBackend.listVersions(package); + } on NotFoundException { + // Handle the case where package is moderated. + final pkg = await packageBackend.lookupPackage(package); + if (pkg != null && pkg.isNotVisible) { + // We only delete the package if it is explicitly not visible. + // If we can't find it, then it's safer to assume that it's a bug. + await _api.package(package).delete(); } - await Future.wait(futures); - futures.clear(); - await Future.delayed(sleepDuration); + return; } - await pool.close(); - } - /// Updates the API files after a version has changed (e.g. new version was uploaded). - Future updatePackageVersion(String package, String version) async { - await _uploadPackageToBucket(package); - } + final advisories = await securityAdvisoryBackend.listAdvisoriesResponse( + package, + skipCache: true, // Skipping the cache when fetching security advisories + ); - /// Uploads the package version API response bytes to the bucket, mirroring - /// the endpoint name in the file location. - Future _uploadPackageToBucket(String package) async { - final data = await retry(() => packageBackend.listVersions(package)); - await _api.package(package).versions.write(data); - } + final versions = await packageBackend.tarballStorage + .listVersionsInCanonicalBucket(package); + + // Remove versions that are not exposed in the public API. + versions.removeWhere( + (version, _) => !versionListing.versions.any((v) => v.version == version), + ); - Future _deletePackageFromBucket(String package) async { - await _api.package(package).delete(); + await _api.package(package).synchronizeTarballs(versions); + await _api.package(package).advisories.write(advisories); + await _api.package(package).versions.write(versionListing); } - Stream<_PkgUpdatedEvent> _queryRecentPkgUpdatedEvents(DateTime since) async* { - final q1 = dbService.query() - ..filter('moderated >=', since) - ..order('-moderated'); - yield* q1.run().map((mp) => mp.asPkgUpdatedEvent()); + /// Scan for updates from packages until [abort] is resolved, or [claim] + /// is lost. + Future _scanForPackageUpdates( + GlobalLockClaim claim, { + Completer? abort, + }) async { + abort ??= Completer(); - final q2 = dbService.query() - ..filter('updated >=', since) - ..order('-updated'); - yield* q2.run().map((p) => p.asPkgUpdatedEvent()); - } + // Map from package to updated that has been seen. + final seen = {}; - /// Deletes obsolete runtime-versions from the bucket. - Future deleteObsoleteRuntimeContent() async { - final versions = {}; - - // Objects in the bucket are stored under the following pattern: - // `current/api/` - // `/api/` - // Thus, we list with `/` as delimiter and get a list of runtimeVersions - await for (final d in _bucket.list(prefix: '', delimiter: '/')) { - if (!d.isDirectory) { - _logger.warning( - 'Bucket `${_bucket.bucketName}` should not contain any top-level object: `${d.name}`'); - continue; - } + // We will schedule longer overlaps every 6 hours. + var nextLongScan = clock.fromNow(hours: 6); - // Remove trailing slash from object prefix, to get a runtimeVersion - if (!d.name.endsWith('/')) { - _logger.warning( - 'Unexpected top-level directory name in bucket `${_bucket.bucketName}`: `${d.name}`'); - return; - } - final rtVersion = d.name.substring(0, d.name.length - 1); - if (runtimeVersionPattern.matchAsPrefix(rtVersion) == null) { - continue; - } + // In theory 30 minutes overlap should be enough. In practice we should + // allow an ample room for missed windows, and 3 days seems to be large enough. + var since = clock.ago(days: 3); + while (claim.valid && !abort.isCompleted) { + // Look at all packages changed in [since] + final q = dbService.query() + ..filter('updated >', since) + ..order('-updated'); - // Check if the runtimeVersion should be GC'ed - if (shouldGCVersion(rtVersion)) { - versions.add(rtVersion); + if (clock.now().isAfter(nextLongScan)) { + // Next time we'll do a longer scan + since = clock.ago(days: 1); + nextLongScan = clock.fromNow(hours: 6); + } else { + // Next time we'll only consider changes since now - 30 minutes + since = clock.ago(minutes: 30); } - } - for (final v in versions) { - await deleteBucketFolderRecursively(_bucket, '$v/', concurrency: 4); - } - } -} + // Look at all packages that has changed + await for (final p in q.run()) { + // Abort, if claim is invalid or abort has been resolved! + if (!claim.valid || abort.isCompleted) { + return; + } -typedef _PkgUpdatedEvent = ({String package, DateTime updated, bool isVisible}); + // Check if the [updated] timestamp has been seen before. + // If so, we skip checking it! + final lastSeen = seen[p.name!]; + if (lastSeen != null && lastSeen.toUtc() == p.updated!.toUtc()) { + continue; + } + // Remember the updated time for this package, so we don't check it + // again... + seen[p.name!] = p.updated!; -extension on ModeratedPackage { - _PkgUpdatedEvent asPkgUpdatedEvent() => - (package: name!, updated: moderated, isVisible: false); -} + // Check the package + await synchronizePackage(p.name!); + } -extension on Package { - _PkgUpdatedEvent asPkgUpdatedEvent() => - (package: name!, updated: updated!, isVisible: isVisible); + // Cleanup the [seen] map for anything older than [since], as this won't + // be relevant to the next iteration. + seen.removeWhere((_, updated) => updated.isBefore(since)); + + // Wait until aborted or 10 minutes before scanning again! + await abort.future.timeout(Duration(minutes: 10), onTimeout: () => null); + } + } } diff --git a/app/lib/package/api_export/exported_api.dart b/app/lib/package/api_export/exported_api.dart index 59460f22ec..5efe290958 100644 --- a/app/lib/package/api_export/exported_api.dart +++ b/app/lib/package/api_export/exported_api.dart @@ -19,7 +19,7 @@ import '../../shared/storage.dart'; import '../../shared/versions.dart' show runtimeVersion, runtimeVersionPattern, shouldGCVersion; -final _log = Logger('api_export:exported_bucket'); +final _log = Logger('api_export.exported_api'); /// Minimum age before an item can be consider garbage. /// diff --git a/app/lib/package/backend.dart b/app/lib/package/backend.dart index 3af745fda4..5aabe52ec3 100644 --- a/app/lib/package/backend.dart +++ b/app/lib/package/backend.dart @@ -1255,8 +1255,7 @@ class PackageBackend { emailBackend.trySendOutgoingEmail(outgoingEmail), taskBackend.trackPackage(newVersion.package, updateDependents: true), if (apiExporter != null) - apiExporter! - .updatePackageVersion(newVersion.package, newVersion.version!), + apiExporter!.synchronizePackage(newVersion.package), ]); await tarballStorage.updateContentDispositionOnPublicBucket( newVersion.package, newVersion.version!); diff --git a/app/lib/service/entrypoint/analyzer.dart b/app/lib/service/entrypoint/analyzer.dart index 065d709b2a..fe5c004123 100644 --- a/app/lib/service/entrypoint/analyzer.dart +++ b/app/lib/service/entrypoint/analyzer.dart @@ -85,5 +85,5 @@ Future _apiExporterMain(EntryMessage message) async { message.protocolSendPort.send(ReadyMessage()); await popularityStorage.start(); await downloadCountsBackend.start(); - await apiExporter!.uploadInForeverLoop(); + await apiExporter!.start(); } diff --git a/app/lib/service/security_advisories/backend.dart b/app/lib/service/security_advisories/backend.dart index 2450a24935..714b2cb541 100644 --- a/app/lib/service/security_advisories/backend.dart +++ b/app/lib/service/security_advisories/backend.dart @@ -39,22 +39,34 @@ class SecurityAdvisoryBackend { } Future> lookupSecurityAdvisories( - String package, - ) async { - return (await cache.securityAdvisories(package).get(() async { + String package, { + bool skipCache = false, + }) async { + final loadAdvisories = () async { final query = _db.query() ..filter('affectedPackages =', package); return query .run() .map((SecurityAdvisory adv) => SecurityAdvisoryData.fromModel(adv)) .toList(); - }))!; + }; + if (skipCache) { + return await loadAdvisories(); + } + + return (await cache.securityAdvisories(package).get(loadAdvisories))!; } /// Create a [ListAdvisoriesResponse] for [package] using advisories from /// cache. - Future listAdvisoriesResponse(String package) async { - final advisories = await lookupSecurityAdvisories(package); + Future listAdvisoriesResponse( + String package, { + bool skipCache = false, + }) async { + final advisories = await lookupSecurityAdvisories( + package, + skipCache: skipCache, + ); return ListAdvisoriesResponse( advisories: advisories.map((a) => a.advisory).toList(), advisoriesUpdated: advisories.map((a) => a.syncTime).maxOrNull, diff --git a/app/lib/tool/neat_task/pub_dev_tasks.dart b/app/lib/tool/neat_task/pub_dev_tasks.dart index 10026cbd66..2ee78bac5d 100644 --- a/app/lib/tool/neat_task/pub_dev_tasks.dart +++ b/app/lib/tool/neat_task/pub_dev_tasks.dart @@ -119,9 +119,9 @@ void _setupGenericPeriodicTasks() { // Exports the package name completion data to a bucket. _daily( - name: 'export-package-name-completion-data-to-bucket', + name: 'synchronize-exported-api', isRuntimeVersioned: true, - task: () async => await apiExporter?.uploadPkgNameCompletionData(), + task: () async => await apiExporter?.synchronizeExportedApi(), ); // Deletes moderated packages, versions, publishers and users. @@ -167,13 +167,6 @@ void _setupGenericPeriodicTasks() { task: taskBackend.garbageCollect, ); - // Deletes exported API data for old runtime versions - _weekly( - name: 'garbage-collect-api-exports', - isRuntimeVersioned: true, - task: () async => apiExporter?.deleteObsoleteRuntimeContent(), - ); - // Delete very old instances that have been abandoned _daily( name: 'garbage-collect-old-instances', diff --git a/app/test/package/api_export/api_exporter_test.dart b/app/test/package/api_export/api_exporter_test.dart index a142e9e0be..121103b9de 100644 --- a/app/test/package/api_export/api_exporter_test.dart +++ b/app/test/package/api_export/api_exporter_test.dart @@ -1,98 +1,447 @@ -// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -import 'dart:convert'; import 'dart:io'; +import 'dart:typed_data'; -import 'package:clock/clock.dart'; +import 'package:_pub_shared/data/admin_api.dart'; +import 'package:_pub_shared/data/package_api.dart'; import 'package:gcloud/storage.dart'; +import 'package:googleapis/storage/v1.dart' show DetailedApiRequestError; +import 'package:logging/logging.dart'; +import 'package:pub_dev/fake/backend/fake_auth_provider.dart'; import 'package:pub_dev/package/api_export/api_exporter.dart'; -import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/storage.dart'; +import 'package:pub_dev/shared/utils.dart'; import 'package:pub_dev/shared/versions.dart'; +import 'package:pub_dev/tool/test_profile/import_source.dart'; +import 'package:pub_dev/tool/test_profile/importer.dart'; +import 'package:pub_dev/tool/test_profile/models.dart'; import 'package:test/test.dart'; import '../../shared/test_models.dart'; import '../../shared/test_services.dart'; -import '../backend_test_utils.dart'; +import '../../task/fake_time.dart'; + +final _log = Logger('api_export.test'); + +final _testProfile = TestProfile( + defaultUser: userAtPubDevEmail, + packages: [ + TestPackage( + name: 'foo', + versions: [ + TestVersion(version: '1.0.0'), + ], + ), + ], + users: [ + TestUser(email: userAtPubDevEmail, likes: []), + ], +); void main() { - group('export API to bucket', () { - testWithProfile( - 'export and cleanup', - fn: () async { - await storageService.createBucket('bucket'); - final bucket = storageService.bucket('bucket'); - final exporter = ApiExporter( - bucket: bucket, - concurrency: 2, - ); - await exporter.uploadPkgNameCompletionData(); - await exporter.fullPkgScanAndUpload(); - - final claim = - FakeGlobalLockClaim(clock.now().add(Duration(seconds: 3))); - await exporter.incrementalPkgScanAndUpload( - claim, - sleepDuration: Duration(milliseconds: 300), - ); - await exporter.deleteObsoleteRuntimeContent(); - - final files = await bucket - .list(delimiter: 'bogus-delimiter-for-full-file-list') - .map((e) => e.name) - .toList(); - expect(files.toSet(), { - '$runtimeVersion/api/package-name-completion-data', - 'latest/api/package-name-completion-data', - '$runtimeVersion/api/packages/flutter_titanium', - '$runtimeVersion/api/packages/neon', - '$runtimeVersion/api/packages/oxygen', - 'latest/api/packages/flutter_titanium', - 'latest/api/packages/neon', - 'latest/api/packages/oxygen', - }); - - Future readAndDecodeJson(String path) async => json - .decode(utf8.decode(gzip.decode(await bucket.readAsBytes(path)))); - - expect( - await readAndDecodeJson('latest/api/packages/neon'), - { - 'name': 'neon', - 'latest': isNotEmpty, - 'versions': hasLength(1), - }, - ); - - expect( - await readAndDecodeJson('latest/api/package-name-completion-data'), - { - 'packages': hasLength(3), - }, - ); + testWithFakeTime('synchronizeExportedApi()', testProfile: _testProfile, + (fakeTime) async { + await storageService.createBucket('bucket'); + final bucket = storageService.bucket('bucket'); + final apiExporter = ApiExporter(bucket: bucket); + + await _testExportedApiSynchronization( + fakeTime, + bucket, + apiExporter.synchronizeExportedApi, + ); + }); + + testWithFakeTime( + 'apiExporter.start()', + testProfile: _testProfile, + (fakeTime) async { + await storageService.createBucket('bucket'); + final bucket = storageService.bucket('bucket'); + final apiExporter = ApiExporter(bucket: bucket); + + await apiExporter.synchronizeExportedApi(); + + await apiExporter.start(); + + await _testExportedApiSynchronization( + fakeTime, + bucket, + () async => await fakeTime.elapse(minutes: 15), + ); + + await apiExporter.stop(); + }, + ); +} + +Future _testExportedApiSynchronization( + FakeTime fakeTime, + Bucket bucket, + Future Function() synchronize, +) async { + _log.info('## Existing package'); + { + await synchronize(); + + // Check that exsting package was synchronized + expect( + await bucket.readGzippedJson('latest/api/packages/foo'), + { + 'name': 'foo', + 'latest': isNotEmpty, + 'versions': hasLength(1), + }, + ); + expect( + await bucket.readGzippedJson('latest/api/package-name-completion-data'), + {'packages': hasLength(1)}, + ); + expect( + await bucket.readBytes('$runtimeVersion/api/archives/foo-1.0.0.tar.gz'), + isNotNull, + ); + expect( + await bucket.readGzippedJson('$runtimeVersion/api/packages/foo'), + { + 'name': 'foo', + 'latest': isNotEmpty, + 'versions': hasLength(1), }, ); + expect( + await bucket + .readGzippedJson('$runtimeVersion/api/package-name-completion-data'), + {'packages': hasLength(1)}, + ); + expect( + await bucket.readBytes('$runtimeVersion/api/archives/foo-1.0.0.tar.gz'), + isNotNull, + ); + } - testWithProfile('new upload', fn: () async { - await apiExporter!.fullPkgScanAndUpload(); + _log.info('## New package'); + { + await importProfile( + source: ImportSource.autoGenerated(), + profile: TestProfile( + defaultUser: userAtPubDevEmail, + packages: [ + TestPackage( + name: 'bar', + versions: [TestVersion(version: '2.0.0')], + publisher: 'example.com', + ), + ], + ), + ); - final bucket = - storageService.bucket(activeConfiguration.exportedApiBucketName!); - final originalBytes = - await bucket.readAsBytes('latest/api/packages/oxygen'); + // Synchronize again + await synchronize(); - final pubspecContent = generatePubspecYaml('oxygen', '9.0.0'); - final message = await createPubApiClient(authToken: adminClientToken) - .uploadPackageBytes( - await packageArchiveBytes(pubspecContent: pubspecContent)); - expect(message.success.message, contains('Successfully uploaded')); + // Check that exsting package is still there + expect( + await bucket.readGzippedJson('latest/api/packages/foo'), + isNotNull, + ); + expect( + await bucket.readBytes('latest/api/archives/foo-1.0.0.tar.gz'), + isNotNull, + ); + // Note. that name completion data won't be updated until search caches + // are purged, so we won't test that it is updated. - await Future.delayed(Duration(seconds: 1)); - final updatedBytes = - await bucket.readAsBytes('latest/api/packages/oxygen'); - expect(originalBytes.length, lessThan(updatedBytes.length)); - }); - }); + // Check that new package was synchronized + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(1), + }, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-2.0.0.tar.gz'), + isNotNull, + ); + } + + _log.info('## New package version'); + { + await importProfile( + source: ImportSource.autoGenerated(), + profile: TestProfile( + defaultUser: userAtPubDevEmail, + packages: [ + TestPackage( + name: 'bar', + versions: [TestVersion(version: '3.0.0')], + publisher: 'example.com', + ), + ], + ), + ); + + // Synchronize again + await synchronize(); + + // Check that version listing was updated + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(2), + }, + ); + // Check that versions are there + expect( + await bucket.readBytes('latest/api/archives/bar-2.0.0.tar.gz'), + isNotNull, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-3.0.0.tar.gz'), + isNotNull, + ); + } + + _log.info('## Discontinued flipped on'); + { + final api = await createFakeAuthPubApiClient(email: userAtPubDevEmail); + await api.setPackageOptions( + 'bar', + PkgOptions(isDiscontinued: true), + ); + + // Synchronize again + await synchronize(); + + // Check that version listing was updated + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(2), + 'isDiscontinued': true, + }, + ); + } + + _log.info('## Discontinued flipped off'); + { + final api = await createFakeAuthPubApiClient(email: userAtPubDevEmail); + await api.setPackageOptions( + 'bar', + PkgOptions(isDiscontinued: false), + ); + + // Synchronize again + await synchronize(); + + // Check that version listing was updated + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(2), + }, + ); + } + + _log.info('## Version retracted'); + { + final api = await createFakeAuthPubApiClient(email: userAtPubDevEmail); + await api.setVersionOptions( + 'bar', + '2.0.0', + VersionOptions(isRetracted: true), + ); + + // Synchronize again + await synchronize(); + + // Check that version listing was updated + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': contains(containsPair('retracted', true)) + }, + ); + } + + _log.info('## Version moderated'); + { + // Elapse time before moderating package, because exported-api won't delete + // recently created files as a guard against race conditions. + fakeTime.elapseSync(days: 1); + + final adminApi = createPubApiClient( + authToken: createFakeServiceAccountToken(email: 'admin@pub.dev'), + ); + await adminApi.adminInvokeAction( + 'moderate-package-version', + AdminInvokeActionArguments(arguments: { + 'case': 'none', + 'package': 'bar', + 'version': '2.0.0', + 'state': 'true', + }), + ); + + // Synchronize again + await synchronize(); + + // Check that version listing was updated + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(1), + }, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-2.0.0.tar.gz'), + isNull, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-3.0.0.tar.gz'), + isNotNull, + ); + } + + _log.info('## Version reinstated'); + { + final adminApi = createPubApiClient( + authToken: createFakeServiceAccountToken(email: 'admin@pub.dev'), + ); + await adminApi.adminInvokeAction( + 'moderate-package-version', + AdminInvokeActionArguments(arguments: { + 'case': 'none', + 'package': 'bar', + 'version': '2.0.0', + 'state': 'false', + }), + ); + + // Synchronize again + await synchronize(); + + // Check that version listing was updated + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(2), + }, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-2.0.0.tar.gz'), + isNotNull, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-3.0.0.tar.gz'), + isNotNull, + ); + } + + _log.info('## Package moderated'); + { + // Elapse time before moderating package, because exported-api won't delete + // recently created files as a guard against race conditions. + fakeTime.elapseSync(days: 1); + + final adminApi = createPubApiClient( + authToken: createFakeServiceAccountToken(email: 'admin@pub.dev'), + ); + await adminApi.adminInvokeAction( + 'moderate-package', + AdminInvokeActionArguments(arguments: { + 'case': 'none', + 'package': 'bar', + 'state': 'true', + }), + ); + + // Synchronize again + await synchronize(); + + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + isNull, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-2.0.0.tar.gz'), + isNull, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-3.0.0.tar.gz'), + isNull, + ); + } + + _log.info('## Package reinstated'); + { + final adminApi = createPubApiClient( + authToken: createFakeServiceAccountToken(email: 'admin@pub.dev'), + ); + await adminApi.adminInvokeAction( + 'moderate-package', + AdminInvokeActionArguments(arguments: { + 'case': 'none', + 'package': 'bar', + 'state': 'false', + }), + ); + + // Synchronize again + await synchronize(); + + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(2), + }, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-2.0.0.tar.gz'), + isNotNull, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-3.0.0.tar.gz'), + isNotNull, + ); + } +} + +extension on Bucket { + /// Read bytes from bucket, retur null if missing + Future readBytes(String path) async { + try { + return await readAsBytes(path); + } on DetailedApiRequestError catch (e) { + if (e.status == 404) return null; + rethrow; + } + } + + /// Read gzipped JSON from bucket + Future readGzippedJson(String path) async { + final bytes = await readBytes(path); + if (bytes == null) { + return null; + } + return utf8JsonDecoder.convert(gzip.decode(bytes)); + } } diff --git a/app/test/service/security_advisory/security_advisory_test.dart b/app/test/service/security_advisory/security_advisory_test.dart index 04f9c63200..49ce5dd19f 100644 --- a/app/test/service/security_advisory/security_advisory_test.dart +++ b/app/test/service/security_advisory/security_advisory_test.dart @@ -200,7 +200,10 @@ void main() { expect(advisory.syncTime!, ingestTime); expect(advisory.syncTime!.isBefore(afterIngestTime), isTrue); - final list = await securityAdvisoryBackend.lookupSecurityAdvisories('a'); + final list = await securityAdvisoryBackend.lookupSecurityAdvisories( + 'a', + skipCache: true, + ); expect(list, isNotNull); expect(list.length, 1); expect(list.first.advisory.id, id); @@ -229,10 +232,16 @@ void main() { expect(updatedAdvisory.affectedPackages!.first, affectedA.package.name); expect(updatedAdvisory.affectedPackages!.last, affectedC.package.name); - final list2 = await securityAdvisoryBackend.lookupSecurityAdvisories('b'); + final list2 = await securityAdvisoryBackend.lookupSecurityAdvisories( + 'b', + skipCache: true, + ); expect(list2, isEmpty); - final list3 = await securityAdvisoryBackend.lookupSecurityAdvisories('c'); + final list3 = await securityAdvisoryBackend.lookupSecurityAdvisories( + 'c', + skipCache: true, + ); expect(list3, isNotNull); expect(list3.length, 1); expect(list3.first.advisory.id, id); @@ -269,12 +278,18 @@ void main() { expect(advisory.affectedPackages!.length, 1); expect(advisory.affectedPackages!.first, affectedA.package.name); - final list = await securityAdvisoryBackend.lookupSecurityAdvisories('a'); + final list = await securityAdvisoryBackend.lookupSecurityAdvisories( + 'a', + skipCache: true, + ); expect(list, isNotNull); expect(list.length, 1); expect(list.first.advisory.id, id); - final list2 = await securityAdvisoryBackend.lookupSecurityAdvisories('b'); + final list2 = await securityAdvisoryBackend.lookupSecurityAdvisories( + 'b', + skipCache: true, + ); expect(list2, isNotNull); expect(list2, isEmpty); });