@@ -79,6 +79,8 @@ PackageBackend get packageBackend =>
79
79
class PackageBackend {
80
80
final DatastoreDB db;
81
81
82
+ final Storage _storage;
83
+
82
84
/// The Cloud Storage bucket to use for incoming package archives.
83
85
/// The following files are present:
84
86
/// - `tmp/$guid` (incoming package archive that was uploaded, but not yet processed)
@@ -92,12 +94,12 @@ class PackageBackend {
92
94
93
95
PackageBackend (
94
96
this .db,
95
- Storage storage ,
97
+ this ._storage ,
96
98
this ._incomingBucket,
97
99
Bucket canonicalBucket,
98
100
Bucket publicBucket,
99
101
) : tarballStorage =
100
- TarballStorage (db, storage , canonicalBucket, publicBucket);
102
+ TarballStorage (db, _storage , canonicalBucket, publicBucket);
101
103
102
104
/// Whether the package exists and is not blocked or deleted.
103
105
Future <bool > isPackageVisible (String package) async {
@@ -891,28 +893,49 @@ class PackageBackend {
891
893
892
894
/// Finishes the upload of a package and returns the list of messages
893
895
/// related to the publishing.
894
- Future <List <String >> publishUploadedBlob (String guid ) async {
896
+ Future <List <String >> publishUploadedBlob (String uploadGuid ) async {
895
897
final restriction = await getUploadRestrictionStatus ();
896
898
if (restriction == UploadRestrictionStatus .noUploads) {
897
899
throw PackageRejectedException .uploadRestricted ();
898
900
}
899
901
final agent = await requireAuthenticatedClient ();
900
- _logger.info ('Finishing async upload (uuid: $guid )' );
902
+ _logger.info ('Finishing async upload (uuid: $uploadGuid )' );
901
903
_logger.info ('Reading tarball from cloud storage.' );
902
904
903
905
return await withTempDirectory ((Directory dir) async {
904
- final filename = '${dir .absolute .path }/tarball.tar.gz' ;
905
- final info = await _incomingBucket.tryInfo (tmpObjectName (guid));
906
+ // Check the existence of the uploaded file
907
+ final uploadObjectName = tmpObjectName (uploadGuid);
908
+ final info = await _incomingBucket.tryInfo (uploadObjectName);
906
909
if (info? .length == null ) {
907
910
throw PackageRejectedException .archiveEmpty ();
908
911
}
912
+
913
+ // Create a temporary copy that we will continue working with.
914
+ // This will protect us against the unlikely scenario where the
915
+ // uploaded blob changes during this processing.
916
+ final workGuid = createUuid ();
917
+ final workObjectName = '${tmpObjectName (workGuid )}-$uploadGuid ' ;
918
+ try {
919
+ await _storage.copyObjectWithRetry (
920
+ _incomingBucket.absoluteObjectName (uploadObjectName),
921
+ _incomingBucket.absoluteObjectName (workObjectName),
922
+ );
923
+ } catch (e, st) {
924
+ _logger.warning ('Failed to copy uploaded file to work object.' , e, st);
925
+ throw InvalidInputException (
926
+ 'Failed to copy uploaded file (uuid:$uploadGuid ).' );
927
+ }
928
+
929
+ // Check the file size is within limits.
909
930
if (info! .length > UploadSignerService .maxUploadSize) {
910
931
throw PackageRejectedException .archiveTooLarge (
911
932
UploadSignerService .maxUploadSize);
912
933
}
934
+
935
+ final filename = '${dir .absolute .path }/tarball.tar.gz' ;
913
936
await _incomingBucket.readWithRetry (
914
- tmpObjectName (guid) , (input) => _saveTarballToFS (input, filename));
915
- _logger.info ('Examining tarball content ($guid ).' );
937
+ workObjectName , (input) => _saveTarballToFS (input, filename));
938
+ _logger.info ('Examining tarball content ($uploadGuid ).' );
916
939
final sw = Stopwatch ()..start ();
917
940
final file = File (filename);
918
941
final fileLength = await file.length ();
@@ -1009,15 +1032,16 @@ class PackageBackend {
1009
1032
entities: entities,
1010
1033
agent: agent,
1011
1034
archive: archive,
1012
- guid : guid ,
1035
+ objectName : workObjectName ,
1013
1036
hasCanonicalArchiveObject:
1014
1037
canonicalContentMatch == ContentMatchStatus .same,
1015
1038
);
1016
1039
_logger.info ('Tarball uploaded in ${sw .elapsed }.' );
1017
- _logger.info ('Removing temporary object $guid .' );
1040
+ _logger.info ('Removing temporary object $uploadGuid .' );
1018
1041
1019
1042
sw.reset ();
1020
- await _incomingBucket.deleteWithRetry (tmpObjectName (guid));
1043
+ await _incomingBucket.deleteWithRetry (uploadObjectName);
1044
+ await _incomingBucket.deleteWithRetry (workObjectName);
1021
1045
_logger.info ('Temporary object removed in ${sw .elapsed }.' );
1022
1046
return [
1023
1047
'Successfully uploaded '
@@ -1087,7 +1111,7 @@ class PackageBackend {
1087
1111
required _UploadEntities entities,
1088
1112
required AuthenticatedAgent agent,
1089
1113
required PackageSummary archive,
1090
- required String guid ,
1114
+ required String objectName ,
1091
1115
required bool hasCanonicalArchiveObject,
1092
1116
}) async {
1093
1117
final sw = Stopwatch ()..start ();
@@ -1218,7 +1242,7 @@ class PackageBackend {
1218
1242
// Copy archive to canonical bucket.
1219
1243
await tarballStorage.copyFromTempToCanonicalBucket (
1220
1244
sourceAbsoluteObjectName:
1221
- _incomingBucket.absoluteObjectName (tmpObjectName (guid) ),
1245
+ _incomingBucket.absoluteObjectName (objectName ),
1222
1246
package: newVersion.package,
1223
1247
version: newVersion.version! ,
1224
1248
);
0 commit comments