diff --git a/app/lib/package/api_export/exported_api.dart b/app/lib/package/api_export/exported_api.dart new file mode 100644 index 0000000000..8261291bb3 --- /dev/null +++ b/app/lib/package/api_export/exported_api.dart @@ -0,0 +1,327 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; + +import 'package:clock/clock.dart'; +import 'package:gcloud/storage.dart'; +import 'package:logging/logging.dart'; +import 'package:pool/pool.dart'; +import 'package:pub_dev/shared/utils.dart'; +import '../../shared/versions.dart' + show runtimeVersion, runtimeVersionPattern, shouldGCVersion; + +final _log = Logger('api_export:exported_bucket'); + +/// Interface for [Bucket] containing exported API that is served directly from +/// Google Cloud Storage. +/// +/// This interface is responsible for: +/// * Naming of files in the bucket. +/// * Deletion of all files related to a package. +/// * Garbage collection of unknown package entries. +/// * Maintaining two prefixes with files in the bucket. +/// * "latest" (that all runtimeVersions write to) +/// * "" (that only this runtimeVersion writes to) +/// * Garbage collecting old prefixes. +/// * Limit concurrency of operations. +/// +/// All writes to the bucket containing the exported API should go through this +/// interface. +final class ExportedApi { + final Pool _pool = Pool(50); + final Storage _storage; + final Bucket _bucket; + final List _prefixes = [ + 'latest', + runtimeVersion, + ]; + + ExportedApi(this._storage, this._bucket); + + /// Interface for writing all files related to [packageName]. + ExportedPackage package(String packageName) => + ExportedPackage._(this, packageName); + + /// Interface for writing `/api/package-name-completion-data` + ExportedJsonFile get packageNameCompletionData => ExportedJsonFile._( + this, + '/api/package-name-completion-data', + Duration(hours: 8), + ); + + /// Run garbage collection on the bucket. + /// + /// This will remove all packages from `latest/` and `/`, + /// where: + /// * The name of the package is not in [allPackageNames], and, + /// * The file is more than one day old. + /// + /// This will remove prefixes other than `latest/` where [shouldGCVersion] + /// returns true. + Future garbageCollect(Set allPackageNames) async { + await Future.wait([ + _gcOldPrefixes(), + ..._prefixes.map((prefix) => _gcPrefix(prefix, allPackageNames)), + ]); + } + + /// Garbage collect unknown packages from [prefix]. + /// + /// This will remove all packages from the `/` where: + /// * The name of the package is not in [allPackageNames], and, + /// * The file is more than one day old. + Future _gcPrefix(String prefix, Set allPackageNames) async { + _log.info('Garbage collecting "$prefix"'); + + await _listBucket(prefix: prefix + '/api/packages/', delimiter: '/', + (item) async { + final String packageName; + if (item.isObject) { + assert(!item.name.endsWith('/')); + packageName = item.name.split('/').last; + } else { + assert(item.name.endsWith('/')); + packageName = item.name.without(suffix: '/').split('/').last; + } + if (!allPackageNames.contains(packageName)) { + final info = await _bucket.info(item.name); + if (info.updated.isBefore(clock.ago(days: 1))) { + // Only delete if the item is more than one day old + // This avoids any races where we delete files we've just created + await package(packageName).delete(); + } + } + }); + + await _listBucket(prefix: prefix + '/api/archives/', delimiter: '-', + (item) async { + if (item.isObject) { + throw AssertionError('Unknown package archive at ${item.name}'); + } + assert(item.name.endsWith('-')); + final packageName = item.name.without(suffix: '-').split('/').last; + if (!allPackageNames.contains(packageName)) { + final info = await _bucket.info(item.name); + if (info.updated.isBefore(clock.ago(days: 1))) { + // Only delete if the item is more than one day old + // This avoids any races where we delete files we've just created + await package(packageName).delete(); + } + } + }); + } + + /// Garbage collect old prefixes. + /// + /// This will remove prefixes other than `latest/` where [shouldGCVersion] + /// returns true. + Future _gcOldPrefixes() async { + // List all top-level prefixes, and delete the ones we don't need + final topLevelprefixes = await _pool.withResource( + () async => await _bucket.list(prefix: '', delimiter: '/').toList(), + ); + await Future.wait(topLevelprefixes.map((entry) async { + if (entry.isObject) { + return; // ignore top-level files + } + + final topLevelPrefix = entry.name.without(suffix: '/'); + if (_prefixes.contains(topLevelPrefix)) { + return; // Don't GC prefixes we are writing to + } + + if (!runtimeVersionPattern.hasMatch(topLevelPrefix)) { + return; // Don't GC non-runtimeVersions + } + + if (shouldGCVersion(topLevelPrefix)) { + _log.info( + 'Garbage collecting old prefix "$topLevelPrefix/" ' + '(removing all objects under it)', + ); + + assert(entry.name.endsWith('/')); + await _listBucket( + prefix: entry.name, + delimiter: '', + (entry) async => await _bucket.delete(entry.name), + ); + } + })); + } + + Future _listBucket( + FutureOr Function(BucketEntry entry) each, { + required String prefix, + required String delimiter, + }) async { + var p = await _pool.withResource(() async => await _bucket.page( + prefix: prefix, + delimiter: delimiter, + pageSize: 1000, + )); + while (true) { + await Future.wait(p.items.map((item) async { + await _pool.withResource(() async => await each(item)); + })); + + if (p.isLast) break; + p = await _pool.withResource(() async => await p.next(pageSize: 1000)); + } + } +} + +/// Interface for writing data about a package to the exported API bucket. +final class ExportedPackage { + final ExportedApi _owner; + final String _package; + + ExportedPackage._(this._owner, this._package); + + ExportedJsonFile _suffix(String suffix) => ExportedJsonFile._( + _owner, + '/api/packages/$_package$suffix', + Duration(minutes: 10), + ); + + /// Interface for writing `/api/packages/`. + /// + /// Which contains version listing information. + ExportedJsonFile get versions => _suffix(''); + + /// Interface for writing `/api/packages//advisories`. + ExportedJsonFile get advisories => _suffix('/advisories'); + + /// Interace for writing `/api/archives/-.tar.gz`. + ExportedBlob tarball(String version) => ExportedBlob._( + _owner, + '/api/archives/$_package-$version.tar.gz', + '$_package-$version.tar.gz', + 'application/octet', + Duration(hours: 2), + ); + + /// Delete all files related to this package. + Future delete() async { + await Future.wait([ + _owner._pool.withResource(() async => await versions.delete()), + _owner._pool.withResource(() async => await advisories.delete()), + ..._owner._prefixes.map((prefix) async { + await _owner._listBucket( + prefix: prefix + '/api/archives/$_package-', + delimiter: '', + (item) async => await _owner._bucket.delete(item.name), + ); + }), + ]); + } +} + +/// Interface for an exported file. +sealed class ExportedObject { + final ExportedApi _owner; + final String _objectName; + ExportedObject._(this._owner, this._objectName); + + /// Delete this file. + Future delete() async { + await Future.wait(_owner._prefixes.map((prefix) async { + await _owner._pool.withResource(() async { + await _owner._bucket.delete(prefix + _objectName); + }); + })); + } +} + +/// Interface for an exported JSON file. +/// +/// This will write JSON as gzipped UTF-8, adding headers for +/// * `Content-Type`, +/// * `Content-Encoding`, and, +/// * `Cache-Control`. +final class ExportedJsonFile extends ExportedObject { + static final _jsonGzip = json.fuse(utf8).fuse(gzip); + final Duration _maxAge; + + ExportedJsonFile._( + ExportedApi _owner, + String _objectName, + this._maxAge, + ) : super._(_owner, _objectName); + + late final _metadata = ObjectMetadata( + contentType: 'application/json; charset="utf-8"', + contentEncoding: 'gzip', + cacheControl: 'public, max-age=${_maxAge.inSeconds}', + ); + + /// Write [data] as gzipped JSON in UTF-8 format. + Future write(Map data) async { + final gzipped = _jsonGzip.encode(data); + await Future.wait(_owner._prefixes.map((prefix) async { + await _owner._pool.withResource(() async { + await _owner._bucket.writeBytes( + prefix + _objectName, + gzipped, + metadata: _metadata, + ); + }); + })); + } +} + +/// Interface for an exported binary file. +/// +/// This will write a binary blob as is, adding headers for +/// * `Content-Type`, +/// * `Content-Disposition`, and, +/// * `Cache-Control`. +final class ExportedBlob extends ExportedObject { + final String _contentType; + final Duration _maxAge; + final String _filename; + + ExportedBlob._( + ExportedApi _owner, + String _objectName, + this._filename, + this._contentType, + this._maxAge, + ) : super._(_owner, _objectName); + + late final _metadata = ObjectMetadata( + contentType: _contentType, + cacheControl: 'public, max-age=${_maxAge.inSeconds}', + contentDisposition: 'attachment; filename="$_filename"', + ); + + /// Write binary blob to this file. + Future write(List data) async { + await Future.wait(_owner._prefixes.map((prefix) async { + await _owner._pool.withResource(() async { + await _owner._bucket.writeBytes( + prefix + _objectName, + data, + metadata: _metadata, + ); + }); + })); + } + + /// Copy binary blob from [absoluteObjectName] to this file. + /// + /// Notice that [absoluteObjectName] must be an a GCS URI including `gs://`. + /// This means that it must include bucket name. + /// Such URIs can be created with [Bucket.absoluteObjectName]. + Future copyFrom(String absoluteObjectName) async { + await Future.wait(_owner._prefixes.map((prefix) async { + await _owner._pool.withResource(() async { + await _owner._storage.copyObject( + absoluteObjectName, + _owner._bucket.absoluteObjectName(prefix + _objectName), + metadata: _metadata, + ); + }); + })); + } +} diff --git a/app/lib/shared/utils.dart b/app/lib/shared/utils.dart index e98d013c21..0365aa3870 100644 --- a/app/lib/shared/utils.dart +++ b/app/lib/shared/utils.dart @@ -299,4 +299,25 @@ extension StringExt on String { final v = trim(); return v.isEmpty ? null : v; } + + /// Return string without [prefix] and/or [suffix]. + /// + /// If [prefix] is not present, it will not be removed. + /// If [suffix] is not present, it will not be removed. + /// + /// **Example:** + /// ``` + /// assert('dart:io'.without(prefix: 'dart:') == 'io'); + /// assert('file.txt'.without(suffix: '.txt') == 'file'); + /// ``` + String without({String prefix = '', String suffix = ''}) { + var val = this; + if (prefix.isNotEmpty && startsWith(prefix)) { + val = val.substring(prefix.length); + } + if (suffix.isNotEmpty && endsWith(suffix)) { + val = val.substring(0, val.length - suffix.length); + } + return val; + } }