Skip to content

Commit 71f1918

Browse files
authored
Introduce synchronizeTarballs that uses Bucket.list which now yield objectinfo (#8224)
* Introduce synchronizeTarballs that uses Bucket.list which now yeild ObjectInfo * Address review comments * Documentation for public methods
1 parent 7a518f5 commit 71f1918

File tree

2 files changed

+163
-33
lines changed

2 files changed

+163
-33
lines changed

app/lib/package/api_export/exported_api.dart

Lines changed: 159 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,79 @@ final class ExportedPackage {
270270
Duration(hours: 2),
271271
);
272272

273+
/// Synchronize tarballs from [versions].
274+
///
275+
/// [versions] is a map from version number to [SourceObjectInfo], where
276+
/// the [SourceObjectInfo] is the GCS object from which the tarball can be
277+
/// copied.
278+
///
279+
/// This method will copy GCS objects, when necessary, relying on
280+
/// [SourceObjectInfo.md5Hash] to avoid copying objects that haven't changed.
281+
///
282+
/// [versions] **must** have an entry for each version that exists.
283+
/// This will **delete** tarballs for versions that do not exist in
284+
/// [versions].
285+
Future<void> synchronizeTarballs(
286+
Map<String, SourceObjectInfo> versions,
287+
) async {
288+
await Future.wait([
289+
..._owner._prefixes.map((prefix) async {
290+
final pfx = '$prefix/api/archives/$_package-';
291+
292+
final versionsForUpload = versions.keys.toSet();
293+
await _owner._listBucket(prefix: pfx, delimiter: '', (entry) async {
294+
final item = switch (entry) {
295+
final BucketDirectoryEntry _ => throw AssertionError('unreachable'),
296+
final BucketObjectEntry item => item,
297+
};
298+
if (!item.name.endsWith('.tar.gz')) {
299+
_log.pubNoticeShout(
300+
'stray-file',
301+
'Found stray file "${item.name}" in ExportedApi'
302+
' while garbage collecting for "$_package" (ignoring it!)',
303+
);
304+
return;
305+
}
306+
final version = Uri.decodeComponent(
307+
item.name.without(prefix: pfx, suffix: '.tar.gz'),
308+
);
309+
310+
final info = versions[version];
311+
if (info != null) {
312+
await tarball(version)._copyToPrefixFromIfNotContentEquals(
313+
prefix,
314+
info,
315+
item,
316+
);
317+
// This version needs not be uploaded again
318+
versionsForUpload.remove(version);
319+
320+
// Done, we don't need to delete this item
321+
return;
322+
}
323+
324+
// Delete the item, if it's old enough.
325+
if (item.updated.isBefore(clock.agoBy(_minGarbageAge))) {
326+
// Only delete if the item if it's older than _minGarbageAge
327+
// This avoids any races where we delete files we've just created
328+
await _owner._bucket.tryDelete(item.name);
329+
}
330+
});
331+
332+
// Upload missing versions
333+
await Future.wait(versionsForUpload.map((v) async {
334+
await _owner._pool.withResource(() async {
335+
await tarball(v)._copyToPrefixFromIfNotContentEquals(
336+
prefix,
337+
versions[v]!,
338+
null,
339+
);
340+
});
341+
}));
342+
}),
343+
]);
344+
}
345+
273346
/// Garbage collect versions from this package not in [allVersionNumbers].
274347
///
275348
/// [allVersionNumbers] must be encoded as canonical versions.
@@ -321,6 +394,32 @@ final class ExportedPackage {
321394
}
322395
}
323396

397+
/// Information about an object to be used as source in a copy operation.
398+
///
399+
/// The [absoluteObjectName] must be a `gs:/<bucket>/<objectName>` style URL.
400+
/// These can be created with [Bucket.absoluteObjectName].
401+
///
402+
/// The [length] must be the length of the object, and [md5Hash] must be the
403+
/// MD5 hash of the object.
404+
final class SourceObjectInfo {
405+
final String absoluteObjectName;
406+
final int length;
407+
final List<int> md5Hash;
408+
409+
SourceObjectInfo({
410+
required this.absoluteObjectName,
411+
required this.length,
412+
required this.md5Hash,
413+
});
414+
415+
factory SourceObjectInfo.fromObjectInfo(Bucket bucket, ObjectInfo info) =>
416+
SourceObjectInfo(
417+
absoluteObjectName: bucket.absoluteObjectName(info.name),
418+
length: info.length,
419+
md5Hash: info.md5Hash,
420+
);
421+
}
422+
324423
/// Interface for an exported file.
325424
sealed class ExportedObject {
326425
final ExportedApi _owner;
@@ -450,49 +549,78 @@ final class ExportedBlob extends ExportedObject {
450549
}));
451550
}
452551

453-
/// Copy binary blob from [absoluteSourceObjectName] to this file.
552+
/// Copy binary blob from [SourceObjectInfo] to this file.
454553
///
455-
/// Requires that [absoluteSourceObjectName] is a `gs:/<bucket>/<objectName>`
554+
/// Requires that `absoluteObjectName` is a `gs:/<bucket>/<objectName>`
456555
/// style URL. These can be created with [Bucket.absoluteObjectName].
457556
///
458-
/// [sourceInfo] is required to be [ObjectInfo] for the source object.
557+
/// [source] is required to be [SourceObjectInfo] for the source object.
459558
/// This method will use [ObjectInfo.length] and [ObjectInfo.md5Hash] to
460559
/// determine if it's necessary to copy the object.
461-
Future<void> copyFrom(
462-
String absoluteSourceObjectName,
463-
ObjectInfo sourceInfo,
464-
) async {
465-
final metadata = _metadata();
466-
560+
Future<void> copyFrom(SourceObjectInfo source) async {
467561
await Future.wait(_owner._prefixes.map((prefix) async {
468562
await _owner._pool.withResource(() async {
469563
final dst = prefix + _objectName;
470564

471-
// Check if the dst already exists
472-
if (await _owner._bucket.tryInfo(dst) case final dstInfo?) {
473-
if (dstInfo.contentEquals(sourceInfo)) {
474-
// If both source and dst exists, and their content matches, then
475-
// we only need to update the "validated" metadata. And we only
476-
// need to update the "validated" timestamp if it's older than
477-
// _retouchDeadline
478-
final retouchDeadline = clock.agoBy(_updateValidatedAfter);
479-
if (dstInfo.metadata.validated.isBefore(retouchDeadline)) {
480-
await _owner._bucket.updateMetadata(dst, metadata);
481-
}
482-
return;
483-
}
484-
}
485-
486-
// If dst or source doesn't exist, then we shall attempt to make a copy.
487-
// (if source doesn't exist we'll consistently get an error from here!)
488-
await _owner._storage.copyObject(
489-
absoluteSourceObjectName,
490-
_owner._bucket.absoluteObjectName(dst),
491-
metadata: metadata,
565+
await _copyToPrefixFromIfNotContentEquals(
566+
prefix,
567+
source,
568+
await _owner._bucket.tryInfo(dst),
492569
);
493570
});
494571
}));
495572
}
573+
574+
/// Copy from [source] to [prefix] if required by [destinationInfo].
575+
///
576+
/// This will skip copying if [destinationInfo] indicates that the file
577+
/// already exists, and the [ObjectInfo.length] and [ObjectInfo.md5Hash]
578+
/// indicates that the contents is the same as [source].
579+
///
580+
/// Even if the copy is skipped, this will update the [_validatedCustomHeader]
581+
/// header, if it's older than [_updateValidatedAfter]. This ensures that we
582+
/// can detect stray files that are not being updated (but also not deleted).
583+
///
584+
/// Throws, if [destinationInfo] is not `null` and its [ObjectInfo.name]
585+
/// doesn't match the intended target object in [prefix].
586+
Future<void> _copyToPrefixFromIfNotContentEquals(
587+
String prefix,
588+
SourceObjectInfo source,
589+
ObjectInfo? destinationInfo,
590+
) async {
591+
final dst = prefix + _objectName;
592+
593+
// Check if the dst already exists
594+
if (destinationInfo != null) {
595+
if (destinationInfo.name != dst) {
596+
throw ArgumentError.value(
597+
destinationInfo,
598+
'destinationInfo',
599+
'should have name "$dst" not "${destinationInfo.name}"',
600+
);
601+
}
602+
603+
if (destinationInfo.contentEquals(source)) {
604+
// If both source and dst exists, and their content matches, then
605+
// we only need to update the "validated" metadata. And we only
606+
// need to update the "validated" timestamp if it's older than
607+
// _retouchDeadline
608+
final retouchDeadline = clock.agoBy(_updateValidatedAfter);
609+
if (destinationInfo.metadata.validated.isBefore(retouchDeadline)) {
610+
await _owner._bucket.updateMetadata(dst, _metadata());
611+
}
612+
return;
613+
}
614+
}
615+
616+
// If dst or source doesn't exist, then we shall attempt to make a copy.
617+
// (if source doesn't exist we'll consistently get an error from here!)
618+
await _owner._storage.copyObject(
619+
source.absoluteObjectName,
620+
_owner._bucket.absoluteObjectName(dst),
621+
metadata: _metadata(),
622+
);
623+
}
496624
}
497625

498626
extension on Bucket {
@@ -530,7 +658,7 @@ extension on ObjectInfo {
530658
return fixedTimeIntListEquals(md5Hash, bytesHash);
531659
}
532660

533-
bool contentEquals(ObjectInfo info) {
661+
bool contentEquals(SourceObjectInfo info) {
534662
if (length != info.length) {
535663
return false;
536664
}

app/test/package/api_export/exported_api_test.dart

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,10 @@ void main() {
105105
await exportedApi.package('retry').tarball('1.2.3').write([1, 2, 3]);
106106

107107
await exportedApi.package('retry').tarball('1.2.4').copyFrom(
108-
bucket.absoluteObjectName('latest/api/archives/retry-1.2.3.tar.gz'),
109-
await bucket.info('latest/api/archives/retry-1.2.3.tar.gz'),
108+
SourceObjectInfo.fromObjectInfo(
109+
bucket,
110+
await bucket.info('latest/api/archives/retry-1.2.3.tar.gz'),
111+
),
110112
);
111113

112114
// Files are present

0 commit comments

Comments
 (0)