Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 147 additions & 31 deletions app/lib/package/api_export/exported_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,67 @@ final class ExportedPackage {
Duration(hours: 2),
);

Future<void> synchronizeTarballs(
Map<String, SourceObjectInfo> versions,
) async {
await Future.wait([
..._owner._prefixes.map((prefix) async {
final pfx = '$prefix/api/archives/$_package-';

final versionsForUpload = versions.keys.toSet();
await _owner._listBucket(prefix: pfx, delimiter: '', (entry) async {
final item = switch (entry) {
final BucketDirectoryEntry _ => throw AssertionError('unreachable'),
final BucketObjectEntry item => item,
};
if (!item.name.endsWith('.tar.gz')) {
_log.pubNoticeShout(
'stray-file',
'Found stray file "${item.name}" in ExportedApi'
' while garbage collecting for "$_package" (ignoring it!)',
);
return;
}
final version = Uri.decodeComponent(
item.name.without(prefix: pfx, suffix: '.tar.gz'),
);

final info = versions[version];
if (info != null) {
await tarball(version)._copyToPrefixFromIfNotContentEquals(
prefix,
info,
item,
);
// This version needs not be uploaded again
versionsForUpload.remove(version);

// Done, we don't need to delete this item
return;
}

// Delete the item, if it's old enough.
if (item.updated.isBefore(clock.agoBy(_minGarbageAge))) {
// Only delete if the item if it's older than _minGarbageAge
// This avoids any races where we delete files we've just created
await _owner._bucket.tryDelete(item.name);
}
});

// Upload missing versions
await Future.wait(versionsForUpload.map((v) async {
await _owner._pool.withResource(() async {
await tarball(v)._copyToPrefixFromIfNotContentEquals(
prefix,
versions[v]!,
null,
);
});
}));
}),
]);
}

/// Garbage collect versions from this package not in [allVersionNumbers].
///
/// [allVersionNumbers] must be encoded as canonical versions.
Expand Down Expand Up @@ -321,6 +382,32 @@ final class ExportedPackage {
}
}

/// Information about an object to be used as source in a copy operation.
///
/// The [absoluteObjectName] must be a `gs:/<bucket>/<objectName>` style URL.
/// These can be created with [Bucket.absoluteObjectName].
///
/// The [length] must be the length of the object, and [md5Hash] must be the
/// MD5 hash of the object.
final class SourceObjectInfo {
final String absoluteObjectName;
final int length;
final List<int> md5Hash;

SourceObjectInfo({
required this.absoluteObjectName,
required this.length,
required this.md5Hash,
});

factory SourceObjectInfo.fromObjectInfo(Bucket bucket, ObjectInfo info) =>
SourceObjectInfo(
absoluteObjectName: bucket.absoluteObjectName(info.name),
length: info.length,
md5Hash: info.md5Hash,
);
}

/// Interface for an exported file.
sealed class ExportedObject {
final ExportedApi _owner;
Expand Down Expand Up @@ -450,49 +537,78 @@ final class ExportedBlob extends ExportedObject {
}));
}

/// Copy binary blob from [absoluteSourceObjectName] to this file.
/// Copy binary blob from [SourceObjectInfo] to this file.
///
/// Requires that [absoluteSourceObjectName] is a `gs:/<bucket>/<objectName>`
/// Requires that `absoluteObjectName` is a `gs:/<bucket>/<objectName>`
/// style URL. These can be created with [Bucket.absoluteObjectName].
///
/// [sourceInfo] is required to be [ObjectInfo] for the source object.
/// [source] is required to be [SourceObjectInfo] for the source object.
/// This method will use [ObjectInfo.length] and [ObjectInfo.md5Hash] to
/// determine if it's necessary to copy the object.
Future<void> copyFrom(
String absoluteSourceObjectName,
ObjectInfo sourceInfo,
) async {
final metadata = _metadata();

Future<void> copyFrom(SourceObjectInfo source) async {
await Future.wait(_owner._prefixes.map((prefix) async {
await _owner._pool.withResource(() async {
final dst = prefix + _objectName;

// Check if the dst already exists
if (await _owner._bucket.tryInfo(dst) case final dstInfo?) {
if (dstInfo.contentEquals(sourceInfo)) {
// If both source and dst exists, and their content matches, then
// we only need to update the "validated" metadata. And we only
// need to update the "validated" timestamp if it's older than
// _retouchDeadline
final retouchDeadline = clock.agoBy(_updateValidatedAfter);
if (dstInfo.metadata.validated.isBefore(retouchDeadline)) {
await _owner._bucket.updateMetadata(dst, metadata);
}
return;
}
}

// If dst or source doesn't exist, then we shall attempt to make a copy.
// (if source doesn't exist we'll consistently get an error from here!)
await _owner._storage.copyObject(
absoluteSourceObjectName,
_owner._bucket.absoluteObjectName(dst),
metadata: metadata,
await _copyToPrefixFromIfNotContentEquals(
prefix,
source,
await _owner._bucket.tryInfo(dst),
);
});
}));
}

/// Copy from [source] to [prefix] if required by [destinationInfo].
///
/// This will skip copying if [destinationInfo] indicates that the file
/// already exists, and the [ObjectInfo.length] and [ObjectInfo.md5Hash]
/// indicates that the contents is the same as [source].
///
/// Even if the copy is skipped, this will update the [_validatedCustomHeader]
/// header, if it's older than [_updateValidatedAfter]. This ensures that we
/// can detect stray files that are not being updated (but also not deleted).
///
/// Throws, if [destinationInfo] is not `null` and its [ObjectInfo.name]
/// doesn't match the intended target object in [prefix].
Future<void> _copyToPrefixFromIfNotContentEquals(
String prefix,
SourceObjectInfo source,
ObjectInfo? destinationInfo,
) async {
final dst = prefix + _objectName;

// Check if the dst already exists
if (destinationInfo != null) {
if (destinationInfo.name != dst) {
throw ArgumentError.value(
destinationInfo,
'destinationInfo',
'should have name "$dst" not "${destinationInfo.name}"',
);
}

if (destinationInfo.contentEquals(source)) {
// If both source and dst exists, and their content matches, then
// we only need to update the "validated" metadata. And we only
// need to update the "validated" timestamp if it's older than
// _retouchDeadline
final retouchDeadline = clock.agoBy(_updateValidatedAfter);
if (destinationInfo.metadata.validated.isBefore(retouchDeadline)) {
await _owner._bucket.updateMetadata(dst, _metadata());
}
return;
}
}

// If dst or source doesn't exist, then we shall attempt to make a copy.
// (if source doesn't exist we'll consistently get an error from here!)
await _owner._storage.copyObject(
source.absoluteObjectName,
_owner._bucket.absoluteObjectName(dst),
metadata: _metadata(),
);
}
}

extension on Bucket {
Expand Down Expand Up @@ -530,7 +646,7 @@ extension on ObjectInfo {
return fixedTimeIntListEquals(md5Hash, bytesHash);
}

bool contentEquals(ObjectInfo info) {
bool contentEquals(SourceObjectInfo info) {
if (length != info.length) {
return false;
}
Expand Down
6 changes: 4 additions & 2 deletions app/test/package/api_export/exported_api_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ void main() {
await exportedApi.package('retry').tarball('1.2.3').write([1, 2, 3]);

await exportedApi.package('retry').tarball('1.2.4').copyFrom(
bucket.absoluteObjectName('latest/api/archives/retry-1.2.3.tar.gz'),
await bucket.info('latest/api/archives/retry-1.2.3.tar.gz'),
SourceObjectInfo.fromObjectInfo(
bucket,
await bucket.info('latest/api/archives/retry-1.2.3.tar.gz'),
),
);

// Files are present
Expand Down
Loading