Skip to content

Commit 46fe84d

Browse files
committed
Introduce synchronizeTarballs that uses Bucket.list which now yeild ObjectInfo
1 parent c661d3f commit 46fe84d

File tree

2 files changed

+133
-33
lines changed

2 files changed

+133
-33
lines changed

app/lib/package/api_export/exported_api.dart

Lines changed: 129 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,69 @@ final class ExportedPackage {
270270
Duration(hours: 2),
271271
);
272272

273+
Future<void> synchronizeTarballs(
274+
Map<String, SourceObjectInfo> versions,
275+
) async {
276+
await Future.wait([
277+
..._owner._prefixes.map((prefix) async {
278+
final pfx = prefix + '/api/archives/$_package-';
279+
280+
final versionsForUpload = versions.keys.toSet();
281+
await _owner._listBucket(prefix: pfx, delimiter: '', (entry) async {
282+
final item = switch (entry) {
283+
final BucketDirectoryEntry _ => throw AssertionError(
284+
'directory entries should not be possible',
285+
),
286+
final BucketObjectEntry item => item,
287+
};
288+
if (!item.name.endsWith('.tar.gz')) {
289+
_log.pubNoticeShout(
290+
'stray-file',
291+
'Found stray file "${item.name}" in ExportedApi'
292+
' while garbage collecting for "$_package" (ignoring it!)',
293+
);
294+
return;
295+
}
296+
final version = Uri.decodeComponent(
297+
item.name.without(prefix: pfx, suffix: '.tar.gz'),
298+
);
299+
300+
final info = versions[version];
301+
if (info != null) {
302+
await tarball(version)._copyToPrefixFromIfNotContentEquals(
303+
prefix,
304+
info,
305+
item,
306+
);
307+
// This version needs not be uploaded again
308+
versionsForUpload.remove(version);
309+
310+
// Done, we don't need to delete this item
311+
return;
312+
}
313+
314+
// Delete the item, if it's old enough.
315+
if (item.updated.isBefore(clock.agoBy(_minGarbageAge))) {
316+
// Only delete if the item if it's older than _minGarbageAge
317+
// This avoids any races where we delete files we've just created
318+
await _owner._bucket.tryDelete(item.name);
319+
}
320+
});
321+
322+
// Upload missing versions
323+
await Future.wait(versionsForUpload.map((v) async {
324+
await _owner._pool.withResource(() async {
325+
await tarball(v)._copyToPrefixFromIfNotContentEquals(
326+
prefix,
327+
versions[v]!,
328+
null,
329+
);
330+
});
331+
}));
332+
}),
333+
]);
334+
}
335+
273336
/// Garbage collect versions from this package not in [allVersionNumbers].
274337
///
275338
/// [allVersionNumbers] must be encoded as canonical versions.
@@ -321,6 +384,32 @@ final class ExportedPackage {
321384
}
322385
}
323386

387+
/// Information about an object to be used as source in a copy operation.
388+
///
389+
/// The [absoluteObjectName] must be a `gs:/<bucket>/<objectName>` style URL.
390+
/// These can be created with [Bucket.absoluteObjectName].
391+
///
392+
/// The [length] must be the length of the object, and [md5Hash] must be the
393+
/// MD5 hash of the object.
394+
final class SourceObjectInfo {
395+
final String absoluteObjectName;
396+
final int length;
397+
final List<int> md5Hash;
398+
399+
SourceObjectInfo({
400+
required this.absoluteObjectName,
401+
required this.length,
402+
required this.md5Hash,
403+
});
404+
405+
factory SourceObjectInfo.fromObjectInfo(Bucket bucket, ObjectInfo info) =>
406+
SourceObjectInfo(
407+
absoluteObjectName: bucket.absoluteObjectName(info.name),
408+
length: info.length,
409+
md5Hash: info.md5Hash,
410+
);
411+
}
412+
324413
/// Interface for an exported file.
325414
sealed class ExportedObject {
326415
final ExportedApi _owner;
@@ -450,49 +539,58 @@ final class ExportedBlob extends ExportedObject {
450539
}));
451540
}
452541

453-
/// Copy binary blob from [absoluteSourceObjectName] to this file.
542+
/// Copy binary blob from [SourceObjectInfo] to this file.
454543
///
455-
/// Requires that [absoluteSourceObjectName] is a `gs:/<bucket>/<objectName>`
544+
/// Requires that `absoluteObjectName` is a `gs:/<bucket>/<objectName>`
456545
/// style URL. These can be created with [Bucket.absoluteObjectName].
457546
///
458-
/// [sourceInfo] is required to be [ObjectInfo] for the source object.
547+
/// [source] is required to be [SourceObjectInfo] for the source object.
459548
/// This method will use [ObjectInfo.length] and [ObjectInfo.md5Hash] to
460549
/// determine if it's necessary to copy the object.
461-
Future<void> copyFrom(
462-
String absoluteSourceObjectName,
463-
ObjectInfo sourceInfo,
464-
) async {
465-
final metadata = _metadata();
466-
550+
Future<void> copyFrom(SourceObjectInfo source) async {
467551
await Future.wait(_owner._prefixes.map((prefix) async {
468552
await _owner._pool.withResource(() async {
469553
final dst = prefix + _objectName;
470554

471-
// Check if the dst already exists
472-
if (await _owner._bucket.tryInfo(dst) case final dstInfo?) {
473-
if (dstInfo.contentEquals(sourceInfo)) {
474-
// If both source and dst exists, and their content matches, then
475-
// we only need to update the "validated" metadata. And we only
476-
// need to update the "validated" timestamp if it's older than
477-
// _retouchDeadline
478-
final retouchDeadline = clock.agoBy(_updateValidatedAfter);
479-
if (dstInfo.metadata.validated.isBefore(retouchDeadline)) {
480-
await _owner._bucket.updateMetadata(dst, metadata);
481-
}
482-
return;
483-
}
484-
}
485-
486-
// If dst or source doesn't exist, then we shall attempt to make a copy.
487-
// (if source doesn't exist we'll consistently get an error from here!)
488-
await _owner._storage.copyObject(
489-
absoluteSourceObjectName,
490-
_owner._bucket.absoluteObjectName(dst),
491-
metadata: metadata,
555+
await _copyToPrefixFromIfNotContentEquals(
556+
prefix,
557+
source,
558+
await _owner._bucket.tryInfo(dst),
492559
);
493560
});
494561
}));
495562
}
563+
564+
Future<void> _copyToPrefixFromIfNotContentEquals(
565+
String prefix,
566+
SourceObjectInfo source,
567+
ObjectInfo? destinationInfo,
568+
) async {
569+
final dst = prefix + _objectName;
570+
571+
// Check if the dst already exists
572+
if (destinationInfo case final dstInfo?) {
573+
if (dstInfo.contentEquals(source)) {
574+
// If both source and dst exists, and their content matches, then
575+
// we only need to update the "validated" metadata. And we only
576+
// need to update the "validated" timestamp if it's older than
577+
// _retouchDeadline
578+
final retouchDeadline = clock.agoBy(_updateValidatedAfter);
579+
if (dstInfo.metadata.validated.isBefore(retouchDeadline)) {
580+
await _owner._bucket.updateMetadata(dst, _metadata());
581+
}
582+
return;
583+
}
584+
}
585+
586+
// If dst or source doesn't exist, then we shall attempt to make a copy.
587+
// (if source doesn't exist we'll consistently get an error from here!)
588+
await _owner._storage.copyObject(
589+
source.absoluteObjectName,
590+
_owner._bucket.absoluteObjectName(dst),
591+
metadata: _metadata(),
592+
);
593+
}
496594
}
497595

498596
extension on Bucket {
@@ -530,7 +628,7 @@ extension on ObjectInfo {
530628
return fixedTimeIntListEquals(md5Hash, bytesHash);
531629
}
532630

533-
bool contentEquals(ObjectInfo info) {
631+
bool contentEquals(SourceObjectInfo info) {
534632
if (length != info.length) {
535633
return false;
536634
}

app/test/package/api_export/exported_api_test.dart

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,10 @@ void main() {
105105
await exportedApi.package('retry').tarball('1.2.3').write([1, 2, 3]);
106106

107107
await exportedApi.package('retry').tarball('1.2.4').copyFrom(
108-
bucket.absoluteObjectName('latest/api/archives/retry-1.2.3.tar.gz'),
109-
await bucket.info('latest/api/archives/retry-1.2.3.tar.gz'),
108+
SourceObjectInfo.fromObjectInfo(
109+
bucket,
110+
await bucket.info('latest/api/archives/retry-1.2.3.tar.gz'),
111+
),
110112
);
111113

112114
// Files are present

0 commit comments

Comments
 (0)