Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions app/lib/admin/actions/actions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'package:pub_dev/admin/actions/exported-api-sync.dart';

import '../../shared/exceptions.dart';
import 'download_counts_backfill.dart';
import 'download_counts_delete.dart';
Expand Down Expand Up @@ -92,6 +94,7 @@ final class AdminAction {
downloadCountsBackfill,
downloadCountsDelete,
emailSend,
exportedApiSync,
mergeModeratedPackageIntoExisting,
moderatePackage,
moderatePackageVersion,
Expand Down
61 changes: 61 additions & 0 deletions app/lib/admin/actions/exported-api-sync.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'package:pub_dev/package/api_export/api_exporter.dart';

import 'actions.dart';

final exportedApiSync = AdminAction(
name: 'exported-api-sync',
summary: 'Synchronize exported API bucket',
description: '''
This command will trigger synchronization of exported API bucket.
This is the bucket from which API responses are served.
This synchronize packages specified with:
--packages="foo bar"
Synchronize all packages with:
--packages=ALL
Optionally, write rewrite of all files using:
--force-write=true
''',
options: {
'packages': 'Space separated list of packages, use "ALL" for all!',
'force-write': 'true/false if writes should be forced (default: false)',
},
invoke: (options) async {
final forceWriteString = options['force-write'] ?? 'false';
InvalidInputException.checkAnyOf(
forceWriteString,
'force-write',
['true', 'false'],
);
final forceWrite = forceWriteString == 'true';

final packagesOpt = options['packages'] ?? '';
final syncAll = packagesOpt == 'ALL';
if (syncAll) {
await apiExporter!.synchronizeExportedApi(forceWrite: forceWrite);
return {
'packages': 'ALL',
'forceWrite': forceWrite,
};
} else {
final packages = packagesOpt.split(' ').map((p) => p.trim()).toList();
for (final p in packages) {
InvalidInputException.checkPackageName(p);
}
for (final p in packages) {
await apiExporter!.synchronizePackage(p, forceWrite: forceWrite);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do these in parallel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that in a follow up

}
return {
'packages': packages,
'forceWrite': forceWrite,
};
}
},
);
33 changes: 24 additions & 9 deletions app/lib/package/api_export/api_exporter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,19 @@ class ApiExporter {
}

/// Gets and uploads the package name completion data.
Future<void> synchronizePackageNameCompletionData() async {
Future<void> synchronizePackageNameCompletionData({
bool forceWrite = false,
}) async {
await _api.packageNameCompletionData.write(
await searchBackend.getPackageNameCompletionData(),
forceWrite: forceWrite,
);
}

/// Synchronize all exported API.
///
/// This is intended to be scheduled from a daily background task.
Future<void> synchronizeExportedApi() async {
Future<void> synchronizeExportedApi({bool forceWrite = false}) async {
final allPackageNames = <String>{};
final packageQuery = dbService.query<Package>();
var errCount = 0;
Expand All @@ -143,13 +146,13 @@ class ApiExporter {
allPackageNames.add(name);

// TODO: Consider retries around all this logic
await synchronizePackage(name);
await synchronizePackage(name, forceWrite: forceWrite);
}, onError: (e, st) {
_log.warning('synchronizePackage() failed', e, st);
errCount++;
});

await synchronizePackageNameCompletionData();
await synchronizePackageNameCompletionData(forceWrite: forceWrite);

await _api.notFound.write({
'error': {
Expand All @@ -158,7 +161,7 @@ class ApiExporter {
},
'code': 'NotFound',
'message': 'Package or version requested could not be found.',
});
}, forceWrite: forceWrite);

await _api.garbageCollect(allPackageNames);

Expand All @@ -182,7 +185,10 @@ class ApiExporter {
/// * Running a full background synchronization.
/// * When a change in [Package.updated] is detected.
/// * A package is moderated, or other admin action is applied.
Future<void> synchronizePackage(String package) async {
Future<void> synchronizePackage(
String package, {
bool forceWrite = false,
}) async {
_log.info('synchronizePackage("$package")');

final PackageData versionListing;
Expand Down Expand Up @@ -212,9 +218,18 @@ class ApiExporter {
(version, _) => !versionListing.versions.any((v) => v.version == version),
);

await _api.package(package).synchronizeTarballs(versions);
await _api.package(package).advisories.write(advisories);
await _api.package(package).versions.write(versionListing);
await _api.package(package).synchronizeTarballs(
versions,
forceWrite: forceWrite,
);
await _api.package(package).advisories.write(
advisories,
forceWrite: forceWrite,
);
await _api.package(package).versions.write(
versionListing,
forceWrite: forceWrite,
);
}

/// Scan for updates from packages until [abort] is resolved, or [claim]
Expand Down
55 changes: 38 additions & 17 deletions app/lib/package/api_export/exported_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,16 @@ final class ExportedPackage {
///
/// This method will copy GCS objects, when necessary, relying on
/// [SourceObjectInfo.md5Hash] to avoid copying objects that haven't changed.
/// If [forceWrite] is given all files will be rewritten ignoring their
/// previous state.
///
/// [versions] **must** have an entry for each version that exists.
/// This will **delete** tarballs for versions that do not exist in
/// [versions].
Future<void> synchronizeTarballs(
Map<String, SourceObjectInfo> versions,
) async {
Map<String, SourceObjectInfo> versions, {
bool forceWrite = false,
}) async {
await Future.wait([
..._owner._prefixes.map((prefix) async {
final pfx = '$prefix/api/archives/$_package-';
Expand All @@ -314,7 +317,7 @@ final class ExportedPackage {
);

final info = versions[version];
if (info != null) {
if (info != null && !forceWrite) {
await tarball(version)._copyToPrefixFromIfNotContentEquals(
prefix,
info,
Expand Down Expand Up @@ -342,6 +345,7 @@ final class ExportedPackage {
prefix,
versions[v]!,
null,
forceWrite: forceWrite,
);
});
}));
Expand Down Expand Up @@ -495,7 +499,10 @@ final class ExportedJsonFile<T> extends ExportedObject {
}

/// Write [data] as gzipped JSON in UTF-8 format.
Future<void> write(T data) async {
///
/// This will only write of `Content-Length` and `md5Hash` doesn't match the
/// existing file, or if [forceWrite] is given.
Future<void> write(T data, {bool forceWrite = false}) async {
final gzipped = _jsonGzip.encode(data);
final metadata = _metadata();

Expand All @@ -505,6 +512,7 @@ final class ExportedJsonFile<T> extends ExportedObject {
prefix + _objectName,
gzipped,
metadata,
forceWrite: forceWrite,
);
});
}));
Expand Down Expand Up @@ -542,14 +550,18 @@ final class ExportedBlob extends ExportedObject {
}

/// Write binary blob to this file.
Future<void> write(List<int> data) async {
///
/// This will only write of `Content-Length` and `md5Hash` doesn't match the
/// existing file, or if [forceWrite] is given.
Future<void> write(List<int> data, {bool forceWrite = false}) async {
final metadata = _metadata();
await Future.wait(_owner._prefixes.map((prefix) async {
await _owner._pool.withResource(() async {
await _owner._bucket.writeBytesIfDifferent(
prefix + _objectName,
data,
metadata,
forceWrite: forceWrite,
);
});
}));
Expand All @@ -563,7 +575,11 @@ final class ExportedBlob extends ExportedObject {
/// [source] is required to be [SourceObjectInfo] for the source object.
/// This method will use [ObjectInfo.length] and [ObjectInfo.md5Hash] to
/// determine if it's necessary to copy the object.
Future<void> copyFrom(SourceObjectInfo source) async {
/// If [forceWrite] is given, this method will always copy the object.
Future<void> copyFrom(
SourceObjectInfo source, {
bool forceWrite = false,
}) async {
await Future.wait(_owner._prefixes.map((prefix) async {
await _owner._pool.withResource(() async {
final dst = prefix + _objectName;
Expand All @@ -572,6 +588,7 @@ final class ExportedBlob extends ExportedObject {
prefix,
source,
await _owner._bucket.tryInfo(dst),
forceWrite: forceWrite,
);
});
}));
Expand All @@ -592,12 +609,13 @@ final class ExportedBlob extends ExportedObject {
Future<void> _copyToPrefixFromIfNotContentEquals(
String prefix,
SourceObjectInfo source,
ObjectInfo? destinationInfo,
) async {
ObjectInfo? destinationInfo, {
bool forceWrite = false,
}) async {
final dst = prefix + _objectName;

// Check if the dst already exists
if (destinationInfo != null) {
if (destinationInfo != null && !forceWrite) {
if (destinationInfo.name != dst) {
throw ArgumentError.value(
destinationInfo,
Expand Down Expand Up @@ -633,15 +651,18 @@ extension on Bucket {
Future<void> writeBytesIfDifferent(
String name,
List<int> bytes,
ObjectMetadata metadata,
) async {
if (await tryInfo(name) case final info?) {
if (info.isSameContent(bytes)) {
if (info.metadata.validated
.isBefore(clock.agoBy(_updateValidatedAfter))) {
await updateMetadata(name, metadata);
ObjectMetadata metadata, {
bool forceWrite = false,
}) async {
if (!forceWrite) {
if (await tryInfo(name) case final info?) {
if (info.isSameContent(bytes)) {
if (info.metadata.validated
.isBefore(clock.agoBy(_updateValidatedAfter))) {
await updateMetadata(name, metadata);
}
return;
}
return;
}
}

Expand Down
Loading