diff --git a/app/lib/package/api_export/exported_api.dart b/app/lib/package/api_export/exported_api.dart index fb9f6b9664..f66f446ff3 100644 --- a/app/lib/package/api_export/exported_api.dart +++ b/app/lib/package/api_export/exported_api.dart @@ -270,6 +270,79 @@ final class ExportedPackage { Duration(hours: 2), ); + /// Synchronize tarballs from [versions]. + /// + /// [versions] is a map from version number to [SourceObjectInfo], where + /// the [SourceObjectInfo] is the GCS object from which the tarball can be + /// copied. + /// + /// This method will copy GCS objects, when necessary, relying on + /// [SourceObjectInfo.md5Hash] to avoid copying objects that haven't changed. + /// + /// [versions] **must** have an entry for each version that exists. + /// This will **delete** tarballs for versions that do not exist in + /// [versions]. + Future synchronizeTarballs( + Map versions, + ) async { + await Future.wait([ + ..._owner._prefixes.map((prefix) async { + final pfx = '$prefix/api/archives/$_package-'; + + final versionsForUpload = versions.keys.toSet(); + await _owner._listBucket(prefix: pfx, delimiter: '', (entry) async { + final item = switch (entry) { + final BucketDirectoryEntry _ => throw AssertionError('unreachable'), + final BucketObjectEntry item => item, + }; + if (!item.name.endsWith('.tar.gz')) { + _log.pubNoticeShout( + 'stray-file', + 'Found stray file "${item.name}" in ExportedApi' + ' while garbage collecting for "$_package" (ignoring it!)', + ); + return; + } + final version = Uri.decodeComponent( + item.name.without(prefix: pfx, suffix: '.tar.gz'), + ); + + final info = versions[version]; + if (info != null) { + await tarball(version)._copyToPrefixFromIfNotContentEquals( + prefix, + info, + item, + ); + // This version needs not be uploaded again + versionsForUpload.remove(version); + + // Done, we don't need to delete this item + return; + } + + // Delete the item, if it's old enough. + if (item.updated.isBefore(clock.agoBy(_minGarbageAge))) { + // Only delete if the item if it's older than _minGarbageAge + // This avoids any races where we delete files we've just created + await _owner._bucket.tryDelete(item.name); + } + }); + + // Upload missing versions + await Future.wait(versionsForUpload.map((v) async { + await _owner._pool.withResource(() async { + await tarball(v)._copyToPrefixFromIfNotContentEquals( + prefix, + versions[v]!, + null, + ); + }); + })); + }), + ]); + } + /// Garbage collect versions from this package not in [allVersionNumbers]. /// /// [allVersionNumbers] must be encoded as canonical versions. @@ -321,6 +394,32 @@ final class ExportedPackage { } } +/// Information about an object to be used as source in a copy operation. +/// +/// The [absoluteObjectName] must be a `gs://` style URL. +/// These can be created with [Bucket.absoluteObjectName]. +/// +/// The [length] must be the length of the object, and [md5Hash] must be the +/// MD5 hash of the object. +final class SourceObjectInfo { + final String absoluteObjectName; + final int length; + final List md5Hash; + + SourceObjectInfo({ + required this.absoluteObjectName, + required this.length, + required this.md5Hash, + }); + + factory SourceObjectInfo.fromObjectInfo(Bucket bucket, ObjectInfo info) => + SourceObjectInfo( + absoluteObjectName: bucket.absoluteObjectName(info.name), + length: info.length, + md5Hash: info.md5Hash, + ); +} + /// Interface for an exported file. sealed class ExportedObject { final ExportedApi _owner; @@ -450,49 +549,78 @@ final class ExportedBlob extends ExportedObject { })); } - /// Copy binary blob from [absoluteSourceObjectName] to this file. + /// Copy binary blob from [SourceObjectInfo] to this file. /// - /// Requires that [absoluteSourceObjectName] is a `gs://` + /// Requires that `absoluteObjectName` is a `gs://` /// style URL. These can be created with [Bucket.absoluteObjectName]. /// - /// [sourceInfo] is required to be [ObjectInfo] for the source object. + /// [source] is required to be [SourceObjectInfo] for the source object. /// This method will use [ObjectInfo.length] and [ObjectInfo.md5Hash] to /// determine if it's necessary to copy the object. - Future copyFrom( - String absoluteSourceObjectName, - ObjectInfo sourceInfo, - ) async { - final metadata = _metadata(); - + Future copyFrom(SourceObjectInfo source) async { await Future.wait(_owner._prefixes.map((prefix) async { await _owner._pool.withResource(() async { final dst = prefix + _objectName; - // Check if the dst already exists - if (await _owner._bucket.tryInfo(dst) case final dstInfo?) { - if (dstInfo.contentEquals(sourceInfo)) { - // If both source and dst exists, and their content matches, then - // we only need to update the "validated" metadata. And we only - // need to update the "validated" timestamp if it's older than - // _retouchDeadline - final retouchDeadline = clock.agoBy(_updateValidatedAfter); - if (dstInfo.metadata.validated.isBefore(retouchDeadline)) { - await _owner._bucket.updateMetadata(dst, metadata); - } - return; - } - } - - // If dst or source doesn't exist, then we shall attempt to make a copy. - // (if source doesn't exist we'll consistently get an error from here!) - await _owner._storage.copyObject( - absoluteSourceObjectName, - _owner._bucket.absoluteObjectName(dst), - metadata: metadata, + await _copyToPrefixFromIfNotContentEquals( + prefix, + source, + await _owner._bucket.tryInfo(dst), ); }); })); } + + /// Copy from [source] to [prefix] if required by [destinationInfo]. + /// + /// This will skip copying if [destinationInfo] indicates that the file + /// already exists, and the [ObjectInfo.length] and [ObjectInfo.md5Hash] + /// indicates that the contents is the same as [source]. + /// + /// Even if the copy is skipped, this will update the [_validatedCustomHeader] + /// header, if it's older than [_updateValidatedAfter]. This ensures that we + /// can detect stray files that are not being updated (but also not deleted). + /// + /// Throws, if [destinationInfo] is not `null` and its [ObjectInfo.name] + /// doesn't match the intended target object in [prefix]. + Future _copyToPrefixFromIfNotContentEquals( + String prefix, + SourceObjectInfo source, + ObjectInfo? destinationInfo, + ) async { + final dst = prefix + _objectName; + + // Check if the dst already exists + if (destinationInfo != null) { + if (destinationInfo.name != dst) { + throw ArgumentError.value( + destinationInfo, + 'destinationInfo', + 'should have name "$dst" not "${destinationInfo.name}"', + ); + } + + if (destinationInfo.contentEquals(source)) { + // If both source and dst exists, and their content matches, then + // we only need to update the "validated" metadata. And we only + // need to update the "validated" timestamp if it's older than + // _retouchDeadline + final retouchDeadline = clock.agoBy(_updateValidatedAfter); + if (destinationInfo.metadata.validated.isBefore(retouchDeadline)) { + await _owner._bucket.updateMetadata(dst, _metadata()); + } + return; + } + } + + // If dst or source doesn't exist, then we shall attempt to make a copy. + // (if source doesn't exist we'll consistently get an error from here!) + await _owner._storage.copyObject( + source.absoluteObjectName, + _owner._bucket.absoluteObjectName(dst), + metadata: _metadata(), + ); + } } extension on Bucket { @@ -530,7 +658,7 @@ extension on ObjectInfo { return fixedTimeIntListEquals(md5Hash, bytesHash); } - bool contentEquals(ObjectInfo info) { + bool contentEquals(SourceObjectInfo info) { if (length != info.length) { return false; } diff --git a/app/test/package/api_export/exported_api_test.dart b/app/test/package/api_export/exported_api_test.dart index 296f9b61e4..9838d97b21 100644 --- a/app/test/package/api_export/exported_api_test.dart +++ b/app/test/package/api_export/exported_api_test.dart @@ -105,8 +105,10 @@ void main() { await exportedApi.package('retry').tarball('1.2.3').write([1, 2, 3]); await exportedApi.package('retry').tarball('1.2.4').copyFrom( - bucket.absoluteObjectName('latest/api/archives/retry-1.2.3.tar.gz'), - await bucket.info('latest/api/archives/retry-1.2.3.tar.gz'), + SourceObjectInfo.fromObjectInfo( + bucket, + await bucket.info('latest/api/archives/retry-1.2.3.tar.gz'), + ), ); // Files are present