diff --git a/app/lib/package/api_export/exported_api.dart b/app/lib/package/api_export/exported_api.dart index 97937ec429..370df6c803 100644 --- a/app/lib/package/api_export/exported_api.dart +++ b/app/lib/package/api_export/exported_api.dart @@ -9,10 +9,12 @@ import 'dart:io'; import 'package:_pub_shared/data/advisories_api.dart'; import 'package:_pub_shared/data/package_api.dart'; import 'package:clock/clock.dart'; +import 'package:crypto/crypto.dart'; import 'package:gcloud/storage.dart'; import 'package:logging/logging.dart'; import 'package:pool/pool.dart'; import 'package:pub_dev/shared/utils.dart'; +import '../../shared/storage.dart'; import '../../shared/versions.dart' show runtimeVersion, runtimeVersionPattern, shouldGCVersion; @@ -268,7 +270,7 @@ final class ExportedJsonFile extends ExportedObject { final gzipped = _jsonGzip.encode(data); await Future.wait(_owner._prefixes.map((prefix) async { await _owner._pool.withResource(() async { - await _owner._bucket.writeBytes( + await _owner._bucket.writeBytesIfDifferent( prefix + _objectName, gzipped, metadata: _metadata, @@ -307,7 +309,7 @@ final class ExportedBlob extends ExportedObject { Future write(List data) async { await Future.wait(_owner._prefixes.map((prefix) async { await _owner._pool.withResource(() async { - await _owner._bucket.writeBytes( + await _owner._bucket.writeBytesIfDifferent( prefix + _objectName, data, metadata: _metadata, @@ -333,3 +335,34 @@ final class ExportedBlob extends ExportedObject { })); } } + +extension on Bucket { + Future writeBytesIfDifferent( + String name, + List bytes, { + ObjectMetadata? metadata, + }) async { + if (await _hasSameContent(name, bytes)) { + return; + } + await uploadWithRetry( + this, + name, + bytes.length, + () => Stream.value(bytes), + metadata: metadata, + ); + } + + Future _hasSameContent(String name, List bytes) async { + final info = await tryInfo(name); + if (info == null) { + return false; + } + if (info.length != bytes.length) { + return false; + } + final md5Hash = md5.convert(bytes).bytes; + return fixedTimeIntListEquals(info.md5Hash, md5Hash); + } +} diff --git a/app/lib/package/export_api_to_bucket.dart b/app/lib/package/export_api_to_bucket.dart index 5b34e40573..c1f31aa040 100644 --- a/app/lib/package/export_api_to_bucket.dart +++ b/app/lib/package/export_api_to_bucket.dart @@ -2,11 +2,8 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -import 'dart:io'; - import 'package:basics/basics.dart'; import 'package:clock/clock.dart'; -import 'package:crypto/crypto.dart'; import 'package:gcloud/service_scope.dart' as ss; import 'package:gcloud/storage.dart'; import 'package:logging/logging.dart'; @@ -17,9 +14,9 @@ import 'package:retry/retry.dart'; import '../search/backend.dart'; import '../shared/datastore.dart'; import '../shared/storage.dart'; -import '../shared/utils.dart'; import '../shared/versions.dart'; import '../task/global_lock.dart'; +import 'api_export/exported_api.dart'; import 'backend.dart'; import 'models.dart'; @@ -28,20 +25,6 @@ final Logger _logger = Logger('export_api_to_bucket'); /// The default concurrency to upload API JSON files to the bucket. const _defaultBucketUpdateConcurrency = 8; -/// The default cache timeout for content. -const _pkgApiMaxCacheAge = Duration(minutes: 10); -const _pkgNameCompletionDataMaxAge = Duration(hours: 8); - -List _apiPkgObjectNames(String package) => [ - '$runtimeVersion/api/packages/$package', - 'current/api/packages/$package', - ]; - -List _apiPkgNameCompletionDataNames() => [ - '$runtimeVersion/api/package-name-completion-data', - 'current/api/package-name-completion-data', - ]; - /// Sets the API Exporter service. void registerApiExporter(ApiExporter value) => ss.register(#_apiExporter, value); @@ -50,6 +33,7 @@ void registerApiExporter(ApiExporter value) => ApiExporter? get apiExporter => ss.lookup(#_apiExporter) as ApiExporter?; class ApiExporter { + final ExportedApi _api; final Bucket _bucket; final int _concurrency; final _pkgLastUpdated = {}; @@ -57,7 +41,8 @@ class ApiExporter { ApiExporter({ required Bucket bucket, int concurrency = _defaultBucketUpdateConcurrency, - }) : _bucket = bucket, + }) : _api = ExportedApi(storageService, bucket), + _bucket = bucket, _concurrency = concurrency; /// Runs a forever loop and tries to get a global lock. @@ -89,12 +74,8 @@ class ApiExporter { /// Gets and uploads the package name completion data. Future uploadPkgNameCompletionData() async { - final bytes = await searchBackend.getPackageNameCompletionDataJsonGz(); - final bytesAndHash = _BytesAndHash(bytes); - for (final objectName in _apiPkgNameCompletionDataNames()) { - await _upsert(objectName, bytesAndHash, - maxAge: _pkgNameCompletionDataMaxAge); - } + await _api.packageNameCompletionData + .write(await searchBackend.getPackageNameCompletionData()); } /// Note: there is no global locking here, the full scan should be called @@ -187,19 +168,11 @@ class ApiExporter { /// the endpoint name in the file location. Future _uploadPackageToBucket(String package) async { final data = await retry(() => packageBackend.listVersions(package)); - final rawBytes = jsonUtf8Encoder.convert(data.toJson()); - final bytes = gzip.encode(rawBytes); - final bytesAndHash = _BytesAndHash(bytes); - - for (final objectName in _apiPkgObjectNames(package)) { - await _upsert(objectName, bytesAndHash, maxAge: _pkgApiMaxCacheAge); - } + await _api.package(package).versions.write(data); } Future _deletePackageFromBucket(String package) async { - for (final objectName in _apiPkgObjectNames(package)) { - await _bucket.tryDelete(objectName); - } + await _api.package(package).delete(); } Stream<_PkgUpdatedEvent> _queryRecentPkgUpdatedEvents(DateTime since) async* { @@ -250,39 +223,6 @@ class ApiExporter { await deleteBucketFolderRecursively(_bucket, '$v/', concurrency: 4); } } - - Future _upsert( - String objectName, - _BytesAndHash bytesAndHash, { - required Duration maxAge, - }) async { - if (await _isSameContent(objectName, bytesAndHash)) { - return; - } - await uploadWithRetry( - _bucket, - objectName, - bytesAndHash.bytes.length, - () => Stream.value(bytesAndHash.bytes), - metadata: ObjectMetadata( - contentType: 'application/json; charset="utf-8"', - contentEncoding: 'gzip', - cacheControl: 'public, max-age=${maxAge.inSeconds}', - ), - ); - } - - Future _isSameContent( - String objectName, _BytesAndHash bytesAndHash) async { - final info = await _bucket.tryInfo(objectName); - if (info == null) { - return false; - } - if (info.length != bytesAndHash.length) { - return false; - } - return fixedTimeIntListEquals(info.md5Hash, bytesAndHash.md5Hash); - } } typedef _PkgUpdatedEvent = ({String package, DateTime updated, bool isVisible}); @@ -296,12 +236,3 @@ extension on Package { _PkgUpdatedEvent asPkgUpdatedEvent() => (package: name!, updated: updated!, isVisible: isVisible); } - -class _BytesAndHash { - final List bytes; - - _BytesAndHash(this.bytes); - - int get length => bytes.length; - late final md5Hash = md5.convert(bytes).bytes; -} diff --git a/app/lib/search/backend.dart b/app/lib/search/backend.dart index 6f5daf8d96..3c9b3d96b0 100644 --- a/app/lib/search/backend.dart +++ b/app/lib/search/backend.dart @@ -495,24 +495,29 @@ class SearchBackend { 'delete-old-search-snapshots cleared $counts entries ($runtimeVersion)'); } + /// Creates the content for the /api/package-name-completion-data endpoint. + Future> getPackageNameCompletionData() async { + final rs = await searchClient.search( + ServiceSearchQuery.parse( + tagsPredicate: TagsPredicate.regularSearch(), + limit: 20000, + ), + // Do not cache response at the search client level, as we'll be caching + // it in a processed form much longer. + skipCache: true, + // Do not apply rate limit here. + sourceIp: null, + ); + return { + 'packages': rs.packageHits.map((p) => p.package).toList(), + }; + } + /// Creates the gzipped byte content for the /api/package-name-completion-data endpoint. Future> getPackageNameCompletionDataJsonGz() async { final bytes = await cache.packageNameCompletionDataJsonGz().get(() async { - final rs = await searchClient.search( - ServiceSearchQuery.parse( - tagsPredicate: TagsPredicate.regularSearch(), - limit: 20000, - ), - // Do not cache response at the search client level, as we'll be caching - // it in a processed form much longer. - skipCache: true, - // Do not apply rate limit here. - sourceIp: null, - ); - - return gzip.encode(jsonUtf8Encoder.convert({ - 'packages': rs.packageHits.map((p) => p.package).toList(), - })); + final data = await getPackageNameCompletionData(); + return gzip.encode(jsonUtf8Encoder.convert(data)); }); return bytes!; } diff --git a/app/test/package/export_api_to_bucket_test.dart b/app/test/package/export_api_to_bucket_test.dart index 473d874300..fffcd8dc9c 100644 --- a/app/test/package/export_api_to_bucket_test.dart +++ b/app/test/package/export_api_to_bucket_test.dart @@ -45,20 +45,20 @@ void main() { .toList(); expect(files.toSet(), { '$runtimeVersion/api/package-name-completion-data', - 'current/api/package-name-completion-data', + 'latest/api/package-name-completion-data', '$runtimeVersion/api/packages/flutter_titanium', '$runtimeVersion/api/packages/neon', '$runtimeVersion/api/packages/oxygen', - 'current/api/packages/flutter_titanium', - 'current/api/packages/neon', - 'current/api/packages/oxygen', + 'latest/api/packages/flutter_titanium', + 'latest/api/packages/neon', + 'latest/api/packages/oxygen', }); Future readAndDecodeJson(String path) async => json .decode(utf8.decode(gzip.decode(await bucket.readAsBytes(path)))); expect( - await readAndDecodeJson('current/api/packages/neon'), + await readAndDecodeJson('latest/api/packages/neon'), { 'name': 'neon', 'latest': isNotEmpty, @@ -67,7 +67,7 @@ void main() { ); expect( - await readAndDecodeJson('current/api/package-name-completion-data'), + await readAndDecodeJson('latest/api/package-name-completion-data'), { 'packages': hasLength(3), }, @@ -81,7 +81,7 @@ void main() { final bucket = storageService.bucket(activeConfiguration.exportedApiBucketName!); final originalBytes = - await bucket.readAsBytes('current/api/packages/oxygen'); + await bucket.readAsBytes('latest/api/packages/oxygen'); final pubspecContent = generatePubspecYaml('oxygen', '9.0.0'); final message = await createPubApiClient(authToken: adminClientToken) @@ -91,7 +91,7 @@ void main() { await Future.delayed(Duration(seconds: 1)); final updatedBytes = - await bucket.readAsBytes('current/api/packages/oxygen'); + await bucket.readAsBytes('latest/api/packages/oxygen'); expect(originalBytes.length, lessThan(updatedBytes.length)); }); });