Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 29 additions & 62 deletions app/lib/package/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -75,41 +75,26 @@ PackageBackend get packageBackend =>
/// Represents the backend for the pub site.
class PackageBackend {
final DatastoreDB db;
final Storage _storage;

/// The Cloud Storage bucket to use for incoming package archives.
/// The following files are present:
/// - `tmp/$guid` (incoming package archive that was uploaded, but not yet processed)
final Bucket _incomingBucket;

/// The Cloud Storage bucket to use for canonical package archives.
/// The following files are present:
/// - `packages/$package-$version.tar.gz` (package archive)
final Bucket _canonicalBucket;

/// The Cloud Storage bucket to use for public package archives.
/// The following files are present:
/// - `packages/$package-$version.tar.gz` (package archive)
final Bucket _publicBucket;

/// The storage handling for the archive files.
late final packageStorage = PackageStorage(
db,
_storage,
_canonicalBucket,
_publicBucket,
);
final PackageStorage packageStorage;

@visibleForTesting
int maxVersionsPerPackage = _defaultMaxVersionsPerPackage;

PackageBackend(
this.db,
this._storage,
Storage storage,
this._incomingBucket,
this._canonicalBucket,
this._publicBucket,
);
Bucket canonicalBucket,
Bucket publicBucket,
) : packageStorage =
PackageStorage(db, storage, canonicalBucket, publicBucket);

/// Whether the package exists and is not blocked or deleted.
Future<bool> isPackageVisible(String package) async {
Expand Down Expand Up @@ -344,8 +329,7 @@ class PackageBackend {
// NOTE: We should maybe check for existence first?
// return storage.bucket(bucket).info(object)
// .then((info) => info.downloadLink);
final object = tarballObjectName(package, Uri.encodeComponent(cv!));
return Uri.parse(_publicBucket.objectUrl(object));
return packageStorage.getPublicDownloadUrl(package, cv!);
}

/// Updates the stable, prerelease and preview versions of [package].
Expand Down Expand Up @@ -958,18 +942,15 @@ class PackageBackend {
}

// Check canonical archive.
final canonicalArchivePath =
tarballObjectName(pubspec.name, versionString);
final canonicalArchiveInfo =
await _canonicalBucket.tryInfo(canonicalArchivePath);
if (canonicalArchiveInfo != null) {
// Actually fetch the archive bytes and do full comparison.
final objectBytes =
await _canonicalBucket.readAsBytes(canonicalArchivePath);
if (!fileBytes.byteToByteEquals(objectBytes)) {
throw PackageRejectedException.versionExists(
pubspec.name, versionString);
}
final canonicalContentMatch =
await packageStorage.matchArchiveContentInCanonical(
pubspec.name,
versionString,
fileBytes,
);
if (canonicalContentMatch == ContentMatchStatus.different) {
throw PackageRejectedException.versionExists(
pubspec.name, versionString);
}

// check existences of referenced packages
Expand Down Expand Up @@ -1006,7 +987,8 @@ class PackageBackend {
agent: agent,
archive: archive,
guid: guid,
hasCanonicalArchiveObject: canonicalArchiveInfo != null,
hasCanonicalArchiveObject:
canonicalContentMatch == ContentMatchStatus.same,
);
_logger.info('Tarball uploaded in ${sw.elapsed}.');
_logger.info('Removing temporary object $guid.');
Expand Down Expand Up @@ -1202,18 +1184,15 @@ class PackageBackend {
);
if (!hasCanonicalArchiveObject) {
// Copy archive to canonical bucket.
await _storage.copyObject(
_incomingBucket.absoluteObjectName(tmpObjectName(guid)),
_canonicalBucket.absoluteObjectName(
tarballObjectName(newVersion.package, newVersion.version!)),
await packageStorage.copyFromTempToCanonicalBucket(
sourceAbsoluteObjectName:
_incomingBucket.absoluteObjectName(tmpObjectName(guid)),
package: newVersion.package,
version: newVersion.version!,
);
}
await _storage.copyObject(
_canonicalBucket.absoluteObjectName(
tarballObjectName(newVersion.package, newVersion.version!)),
_publicBucket.absoluteObjectName(
tarballObjectName(newVersion.package, newVersion.version!)),
);
await packageStorage.copyArchiveFromCanonicalToPublicBucket(
newVersion.package, newVersion.version!);

final inserts = <Model>[
package!,
Expand Down Expand Up @@ -1277,12 +1256,8 @@ class PackageBackend {
apiExporter!
.updatePackageVersion(newVersion.package, newVersion.version!),
]);
final objectName =
tarballObjectName(newVersion.package, newVersion.version!);
final info = await _publicBucket.tryInfo(objectName);
if (info != null) {
await updateContentDispositionToAttachment(info, _publicBucket);
}
await packageStorage.updateContentDispositionOnPublicBucket(
newVersion.package, newVersion.version!);
} catch (e, st) {
final v = newVersion.qualifiedVersionKey;
_logger.severe('Error post-processing package upload $v', e, st);
Expand Down Expand Up @@ -1482,12 +1457,6 @@ class PackageBackend {
return existingEmails;
}

/// Read the archive bytes from the canonical bucket.
Future<List<int>> readArchiveBytes(String package, String version) async {
final objectName = tarballObjectName(package, version);
return await _canonicalBucket.readAsBytes(objectName);
}

// Uploaders support.

Future<account_api.InviteStatus> inviteUploader(
Expand Down Expand Up @@ -1665,14 +1634,12 @@ class PackageBackend {

/// Deletes the tarball of a [package] in the given [version] permanently.
Future<void> removePackageTarball(String package, String version) async {
final object = tarballObjectName(package, version);
await deleteFromBucket(_publicBucket, object);
await deleteFromBucket(_canonicalBucket, object);
await packageStorage.deleteArchiveFromAllBuckets(package, version);
}

/// Gets the file info of a [package] in the given [version].
Future<ObjectInfo?> packageTarballInfo(String package, String version) async {
return await _publicBucket.tryInfo(tarballObjectName(package, version));
return await packageStorage.getPublicBucketArchiveInfo(package, version);
}

void _updatePackageAutomatedPublishingLock(
Expand Down
84 changes: 79 additions & 5 deletions app/lib/package/package_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:typed_data';

import 'package:gcloud/storage.dart';
import 'package:logging/logging.dart';
import 'package:pub_dev/package/backend.dart';
import 'package:pub_dev/package/models.dart';
import 'package:pub_dev/shared/datastore.dart';
import 'package:pub_dev/shared/storage.dart';
import '../shared/datastore.dart';
import '../shared/storage.dart';
import '../shared/utils.dart';
import 'backend.dart';
import 'models.dart';

final _logger = Logger('package_storage');

Expand Down Expand Up @@ -39,7 +42,7 @@ class PackageStorage {
this._publicBucket,
);

/// Gets the object info of the archive file from the public bucket.
/// Gets the object info of the archive file from the canonical bucket.
Future<ObjectInfo?> getCanonicalBucketArchiveInfo(
String package, String version) async {
final objectName = tarballObjectName(package, version);
Expand All @@ -53,6 +56,71 @@ class PackageStorage {
return await _publicBucket.tryInfo(objectName);
}

/// Returns the publicly available download URL from the storage bucket.
Future<Uri> getPublicDownloadUrl(String package, String version) async {
final object = tarballObjectName(package, Uri.encodeComponent(version));
return Uri.parse(_publicBucket.objectUrl(object));
}

/// Verifies the content of an archive in the canonical bucket.
Future<ContentMatchStatus> matchArchiveContentInCanonical(
String package,
String version,
Uint8List bytes,
) async {
final objectName = tarballObjectName(package, version);
final info = await _canonicalBucket.tryInfo(objectName);
if (info == null) {
return ContentMatchStatus.missing;
}
// TODO: implement quick md5 match that doesn't require to download full content
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds important. Perhaps open an issue and attach the issue number here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is more of a shortcut on repeated upload checks than a critical part, because it would only help the rejection path, which may be rare. Instead, I think we should move all of these checks into a separate isolate and not do it in the frontend-serving one as part of the direct request path. I'll file an issue for that and add this to it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the code to do the md5 hash check already with this refactor. PTAL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

final objectBytes = await _canonicalBucket.readAsBytes(objectName);
if (bytes.byteToByteEquals(objectBytes)) {
return ContentMatchStatus.same;
} else {
return ContentMatchStatus.different;
}
}

/// Copies the uploaded object from the temp bucket to the canonical bucket.
Future<void> copyFromTempToCanonicalBucket({
required String sourceAbsoluteObjectName,
required String package,
required String version,
}) async {
await _storage.copyObject(
sourceAbsoluteObjectName,
_canonicalBucket.absoluteObjectName(tarballObjectName(package, version)),
);
}

/// Copies archive bytes from canonical bucket to public bucket.
Future<void> copyArchiveFromCanonicalToPublicBucket(
String package, String version) async {
final objectName = tarballObjectName(package, version);
await _storage.copyObject(
_canonicalBucket.absoluteObjectName(objectName),
_publicBucket.absoluteObjectName(objectName),
);
}

/// Updates the `content-disposition` header to `attachment` on the public archive file.
Future<void> updateContentDispositionOnPublicBucket(
String package, String version) async {
final info = await getPublicBucketArchiveInfo(package, version);
if (info != null) {
await updateContentDispositionToAttachment(info, _publicBucket);
}
}

/// Deletes package archive from all buckets.
Future<void> deleteArchiveFromAllBuckets(
String package, String version) async {
final objectName = tarballObjectName(package, version);
await deleteFromBucket(_canonicalBucket, objectName);
await deleteFromBucket(_publicBucket, objectName);
}

/// Deletes the package archive file from the canonical bucket.
Future<void> deleteArchiveFromCanonicalBucket(
String package, String version) async {
Expand Down Expand Up @@ -205,3 +273,9 @@ class PublicBucketUpdateStat {
bool get isAllZero =>
archivesUpdated == 0 && archivesToBeDeleted == 0 && archivesDeleted == 0;
}

enum ContentMatchStatus {
missing,
different,
same;
}
Loading