Skip to content

Commit 1020952

Browse files
jorgeebentsherman
andauthored
Fix unordered completed parts in AWS multipart upload. (#6560)
--------- Signed-off-by: jorgee <[email protected]> Co-authored-by: Ben Sherman <[email protected]>
1 parent c166b93 commit 1020952

File tree

2 files changed

+168
-1
lines changed

2 files changed

+168
-1
lines changed

plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.security.MessageDigest;
2828
import java.security.NoSuchAlgorithmException;
2929
import java.util.Base64;
30+
import java.util.Comparator;
3031
import java.util.List;
3132
import java.util.Queue;
3233
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -548,9 +549,13 @@ private void completeMultipartUpload() throws IOException {
548549
final int partCount = completedParts.size();
549550
log.trace("Completing upload to {} consisting of {} parts", objectId, partCount);
550551

552+
//Ensure parts are sorted by partNumber
553+
CompletedPart[] parts = completedParts.stream()
554+
.sorted(Comparator.comparingInt(CompletedPart::partNumber))
555+
.toArray(CompletedPart[]::new);
551556
try {
552557
final CompletedMultipartUpload completedUpload = CompletedMultipartUpload.builder()
553-
.parts(completedParts)
558+
.parts(parts)
554559
.build();
555560

556561
s3.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright 2020-2025, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package nextflow.cloud.aws.nio
19+
20+
import nextflow.Global
21+
import nextflow.Session
22+
import nextflow.cloud.aws.nio.util.S3MultipartOptions
23+
import nextflow.file.FileHelper
24+
import software.amazon.awssdk.services.s3.S3Client
25+
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest
26+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse
27+
import software.amazon.awssdk.services.s3.model.UploadPartResponse
28+
import spock.lang.IgnoreIf
29+
import spock.lang.Requires
30+
import spock.lang.Specification
31+
32+
import java.nio.file.Files
33+
import java.nio.file.attribute.BasicFileAttributes
34+
35+
/**
36+
* Test for S3OutputStream
37+
*
38+
* @author Jorge Ejarque <[email protected]>
39+
*/
40+
class S3OutputStreamTest extends Specification implements AwsS3BaseSpec {
41+
42+
private S3Client s3Client0
43+
44+
S3Client getS3Client() { s3Client0 }
45+
46+
static private Map config0() {
47+
def accessKey = System.getenv('AWS_S3FS_ACCESS_KEY')
48+
def secretKey = System.getenv('AWS_S3FS_SECRET_KEY')
49+
return [aws: [accessKey: accessKey, secretKey: secretKey]]
50+
}
51+
52+
def setup() {
53+
def fs = (S3FileSystem) FileHelper.getOrCreateFileSystemFor(URI.create("s3:///"), config0().aws)
54+
s3Client0 = fs.client.getClient()
55+
and:
56+
def cfg = config0()
57+
Global.config = cfg
58+
Global.session = Mock(Session) { getConfig() >> cfg }
59+
}
60+
61+
@IgnoreIf({System.getenv('NXF_SMOKE')})
62+
@Requires({System.getenv('AWS_S3FS_ACCESS_KEY') && System.getenv('AWS_S3FS_SECRET_KEY')})
63+
def 'should ensure multipart is used'() {
64+
given:
65+
def bucket = createBucket()
66+
and:
67+
def chunksize = 6 * 1024 * 1024
68+
def bytes = new byte[chunksize]
69+
new Random().nextBytes(bytes)
70+
final path = s3path("s3://$bucket/file.txt")
71+
def multipart = new S3MultipartOptions()
72+
multipart.setChunkSize(chunksize)
73+
multipart.setBufferSize(chunksize)
74+
when:
75+
def writer = new S3OutputStream(s3Client0, path.toS3ObjectId(), multipart)
76+
10.times { it ->
77+
writer.write(bytes);
78+
writer.flush()
79+
}
80+
writer.close()
81+
82+
then:
83+
writer.partsCount == 10
84+
existsPath(path)
85+
Files.readAttributes(path, BasicFileAttributes).size() == 10 * chunksize
86+
87+
cleanup:
88+
if( bucket ) deleteBucket(bucket)
89+
}
90+
91+
@IgnoreIf({System.getenv('NXF_SMOKE')})
92+
@Requires({System.getenv('AWS_S3FS_ACCESS_KEY') && System.getenv('AWS_S3FS_SECRET_KEY')})
93+
def 'should upload empty stream'() {
94+
given:
95+
def bucket = createBucket()
96+
and:
97+
final path = s3path("s3://$bucket/file.txt")
98+
def multipart = new S3MultipartOptions()
99+
when:
100+
def writer = new S3OutputStream(s3Client0, path.toS3ObjectId(), multipart)
101+
writer.close()
102+
103+
then:
104+
writer.partsCount == 0
105+
existsPath(path)
106+
Files.readAttributes(path, BasicFileAttributes).size() == 0
107+
108+
cleanup:
109+
if( bucket ) deleteBucket(bucket)
110+
}
111+
@IgnoreIf({System.getenv('NXF_SMOKE')})
112+
@Requires({System.getenv('AWS_S3FS_ACCESS_KEY') && System.getenv('AWS_S3FS_SECRET_KEY')})
113+
def 'should upload without multipart'() {
114+
given:
115+
def bucket = createBucket()
116+
and:
117+
def TEXT = randomText(50 * 1024)
118+
final path = s3path("s3://$bucket/file.txt")
119+
def multipart = new S3MultipartOptions()
120+
when:
121+
def writer = new S3OutputStream(s3Client0, path.toS3ObjectId(), multipart)
122+
writer.write(TEXT.bytes)
123+
writer.close()
124+
125+
then:
126+
writer.partsCount == 0
127+
existsPath(path)
128+
readObject(path) == TEXT
129+
130+
cleanup:
131+
if( bucket ) deleteBucket(bucket)
132+
}
133+
134+
def 'should send sorted parts to completeMultipartUpload'() {
135+
given:
136+
final path = s3path("s3://test/file.txt")
137+
def multipart = new S3MultipartOptions()
138+
def client = Mock(S3Client)
139+
def capturedParts = null
140+
141+
def writer = new S3OutputStream(client, path.toS3ObjectId(), multipart)
142+
143+
when: 'simulate unsorted uploads'
144+
writer.init()
145+
writer.uploadPart(InputStream.nullInputStream(), 25, "checksum".bytes, 2, true)
146+
writer.uploadPart(InputStream.nullInputStream(), 25, "checksum".bytes, 0, false)
147+
writer.uploadPart(InputStream.nullInputStream(), 25, "checksum".bytes, 1, false)
148+
writer.completeMultipartUpload()
149+
150+
then:
151+
1 * client.createMultipartUpload(_) >> CreateMultipartUploadResponse.builder().uploadId("upload-id").build()
152+
3 * client.uploadPart(_,_) >> { UploadPartResponse.builder().eTag('etag').build()}
153+
1 * client.completeMultipartUpload(_ as CompleteMultipartUploadRequest) >> { CompleteMultipartUploadRequest req ->
154+
capturedParts = req.multipartUpload().parts()
155+
return null
156+
}
157+
capturedParts[0].partNumber() == 0
158+
capturedParts[1].partNumber() == 1
159+
capturedParts[2].partNumber() == 2
160+
161+
}
162+
}

0 commit comments

Comments
 (0)