Skip to content

Commit 1d0a0a5

Browse files
author
Josh Chorlton
committed
working
1 parent 887443e commit 1d0a0a5

File tree

9 files changed

+219
-135
lines changed

9 files changed

+219
-135
lines changed

plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@
2424
import java.io.PrintWriter;
2525
import java.io.StringWriter;
2626
import java.nio.ByteBuffer;
27-
import java.security.MessageDigest;
28-
import java.security.NoSuchAlgorithmException;
2927
import java.util.Base64;
28+
import java.util.zip.CRC32C;
3029
import java.util.Comparator;
3130
import java.util.List;
3231
import java.util.Queue;
@@ -129,7 +128,7 @@ public final class S3OutputStream extends OutputStream {
129128
*/
130129
private ByteBuffer buf;
131130

132-
private MessageDigest md5;
131+
private CRC32C crc32c;
133132

134133
/**
135134
* Phaser object to synchronize stream termination
@@ -208,15 +207,25 @@ public S3OutputStream setContentType(String type) {
208207
}
209208

210209
/**
211-
* @return A MD5 message digester
210+
* @return A new CRC32C checksum instance
212211
*/
213-
private MessageDigest createMd5() {
214-
try {
215-
return MessageDigest.getInstance("MD5");
216-
}
217-
catch(NoSuchAlgorithmException e) {
218-
throw new IllegalStateException("Cannot find a MD5 algorithm provider",e);
219-
}
212+
private CRC32C createCrc32c() {
213+
return new CRC32C();
214+
}
215+
216+
/**
217+
* Convert CRC32C checksum value to a 4-byte big-endian array
218+
* @param crc32c The CRC32C checksum instance
219+
* @return The checksum as a 4-byte array
220+
*/
221+
private byte[] getCrc32cBytes(CRC32C crc32c) {
222+
long value = crc32c.getValue();
223+
return new byte[] {
224+
(byte) ((value >> 24) & 0xFF),
225+
(byte) ((value >> 16) & 0xFF),
226+
(byte) ((value >> 8) & 0xFF),
227+
(byte) (value & 0xFF)
228+
};
220229
}
221230

222231
/**
@@ -233,7 +242,7 @@ public void write (int b) throws IOException {
233242
}
234243
if( buf == null ) {
235244
buf = allocate();
236-
md5 = createMd5();
245+
crc32c = createCrc32c();
237246
}
238247
else if( !buf.hasRemaining() ) {
239248
if( buf.position() < bufferSize ) {
@@ -243,13 +252,13 @@ else if( !buf.hasRemaining() ) {
243252
flush();
244253
// create a new buffer
245254
buf = allocate();
246-
md5 = createMd5();
255+
crc32c = createCrc32c();
247256
}
248257
}
249258

250259
buf.put((byte) b);
251-
// update the md5 checksum
252-
md5.update((byte) b);
260+
// update the checksum
261+
crc32c.update(b);
253262
}
254263

255264
/**
@@ -263,7 +272,7 @@ public void flush() throws IOException {
263272
if( uploadBuffer(buf, false) ) {
264273
// clear the current buffer
265274
buf = null;
266-
md5 = null;
275+
crc32c = null;
267276
}
268277
}
269278

@@ -312,7 +321,7 @@ private boolean uploadBuffer(ByteBuffer buf, boolean last) throws IOException {
312321
}
313322

314323
// set the buffer in read mode and submit for upload
315-
executor.submit( task(buf, md5.digest(), ++partsCount) );
324+
executor.submit( task(buf, getCrc32cBytes(crc32c), ++partsCount) );
316325

317326
return true;
318327
}
@@ -382,10 +391,10 @@ public void close() throws IOException {
382391

383392
if (uploadId == null) {
384393
if( buf != null )
385-
putObject(buf, md5.digest());
394+
putObject(buf, getCrc32cBytes(crc32c));
386395
else
387-
// this is needed when trying to upload an empty
388-
putObject(new ByteArrayInputStream(new byte[]{}), 0, createMd5().digest());
396+
// this is needed when trying to upload an empty
397+
putObject(new ByteArrayInputStream(new byte[]{}), 0, getCrc32cBytes(createCrc32c()));
389398
}
390399
else {
391400
// -- upload remaining chunk
@@ -500,7 +509,7 @@ private void uploadPart(final InputStream content, final long contentLength, fin
500509
reqBuilder.uploadId(uploadId);
501510
reqBuilder.partNumber(partNumber);
502511
reqBuilder.contentLength(contentLength);
503-
reqBuilder.contentMD5(Base64.getEncoder().encodeToString(checksum));
512+
reqBuilder.checksumCRC32C(Base64.getEncoder().encodeToString(checksum));
504513

505514
final UploadPartResponse resp = s3.uploadPart(reqBuilder.build(), RequestBody.fromInputStream(content, contentLength));
506515
log.trace("Uploaded part {} with length {} for {}: {}", partNumber, contentLength, objectId, resp.eTag());
@@ -598,7 +607,7 @@ private void putObject(final InputStream content, final long contentLength, byte
598607
reqBuilder.bucket(objectId.bucket());
599608
reqBuilder.key(objectId.key());
600609
reqBuilder.contentLength(contentLength);
601-
reqBuilder.contentMD5( Base64.getEncoder().encodeToString(checksum) );
610+
reqBuilder.checksumCRC32C(Base64.getEncoder().encodeToString(checksum));
602611
if( cannedAcl!=null ) {
603612
reqBuilder.acl(cannedAcl);
604613
}

plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/S3ObjectId.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,14 @@ public String key() {
4646
public String versionId() {
4747
return versionId;
4848
}
49+
50+
/**
51+
* Determines if this object is in an S3 Express One Zone bucket.
52+
* S3 Express One Zone bucket names follow the pattern: base-bucket-name--zone-id--x-s3
53+
*
54+
* @return true if this is an S3 Express One Zone bucket
55+
*/
56+
public boolean isExpressOneZone() {
57+
return bucket != null && bucket.endsWith("--x-s3");
58+
}
4959
}

plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/util/S3ObjectSummaryLookup.java

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -66,35 +66,47 @@ public S3Object lookup(S3Path s3Path) throws NoSuchFileException {
6666
}
6767

6868
/*
69-
* Lookup for the object summary for the specified object key
70-
* by using a `listObjects` request
69+
* First: try HEAD request for exact object (fast, works on all bucket types including Express)
7170
*/
72-
String marker = null;
73-
while( true ) {
74-
ListObjectsRequest.Builder request = ListObjectsRequest.builder();
75-
request.bucket(s3Path.getBucket());
76-
request.prefix(s3Path.getKey());
77-
request.maxKeys(250);
78-
if( marker != null )
79-
request.marker(marker);
80-
81-
ListObjectsResponse listing = client.listObjects(request.build());
82-
List<S3Object> results = listing.contents();
83-
84-
if (results.isEmpty()){
85-
break;
71+
try {
72+
HeadObjectResponse metadata = client.getObjectMetadata(s3Path.getBucket(), s3Path.getKey());
73+
if( metadata != null ) {
74+
return S3Object.builder()
75+
.key(s3Path.getKey())
76+
.size(metadata.contentLength())
77+
.lastModified(metadata.lastModified())
78+
.eTag(metadata.eTag())
79+
.storageClass(metadata.storageClassAsString())
80+
.build();
8681
}
87-
88-
for( S3Object item : results ) {
89-
if( matchName(s3Path.getKey(), item)) {
90-
return item;
91-
}
82+
}
83+
catch (S3Exception e) {
84+
if( e.statusCode() != 404 ) {
85+
throw e;
9286
}
87+
// 404 = object doesn't exist as a file, fall through to directory check
88+
}
89+
90+
/*
91+
* Second: check if it's a "directory" by listing with trailing slash.
92+
* S3 Express One Zone buckets require prefixes to end with a delimiter.
93+
*/
94+
String prefix = s3Path.getKey();
95+
if( !prefix.endsWith("/") ) {
96+
prefix = prefix + "/";
97+
}
9398

94-
if( listing.isTruncated() )
95-
marker = listing.nextMarker();
96-
else
97-
break;
99+
ListObjectsV2Request request = ListObjectsV2Request.builder()
100+
.bucket(s3Path.getBucket())
101+
.prefix(prefix)
102+
.maxKeys(1) // only need to find one child to confirm directory exists
103+
.build();
104+
105+
for( S3Object item : client.listObjectsV2Paginator(request).contents() ) {
106+
// Found a child, so this "directory" exists - return a synthetic S3Object for it
107+
return S3Object.builder()
108+
.key(s3Path.getKey())
109+
.build();
98110
}
99111

100112
throw new NoSuchFileException("s3://" + s3Path.getBucket() + "/" + s3Path.getKey());

plugins/nf-amazon/src/main/nextflow/cloud/aws/util/S3BashLib.groovy

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import software.amazon.awssdk.services.s3.model.ObjectCannedACL
2929
@CompileStatic
3030
class S3BashLib extends BashFunLib<S3BashLib> {
3131

32-
private String storageClass = 'STANDARD'
32+
private String storageClass = ''
3333
private String storageEncryption = ''
3434
private String storageKmsKeyId = ''
3535
private String debug = ''
@@ -112,17 +112,18 @@ class S3BashLib extends BashFunLib<S3BashLib> {
112112
* @return The Bash script implementing the S3 helper functions
113113
*/
114114
protected String s3Lib() {
115+
final storageClassParam = storageClass ? "--storage-class $storageClass " : ""
115116
"""
116117
# aws helper
117118
nxf_s3_upload() {
118119
local name=\$1
119120
local s3path=\$2
120121
if [[ "\$name" == - ]]; then
121-
$cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass - "\$s3path"
122+
$cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}${storageClassParam}- "\$s3path"
122123
elif [[ -d "\$name" ]]; then
123-
$cli s3 cp --only-show-errors --recursive ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name" "\$s3path/\$name"
124+
$cli s3 cp --only-show-errors --recursive ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}${storageClassParam}"\$name" "\$s3path/\$name"
124125
else
125-
$cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name" "\$s3path/\$name"
126+
$cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}${storageClassParam}"\$name" "\$s3path/\$name"
126127
fi
127128
}
128129
@@ -148,6 +149,7 @@ class S3BashLib extends BashFunLib<S3BashLib> {
148149
*/
149150
protected String s5cmdLib() {
150151
final cli = s5cmdPath
152+
final storageClassParam = storageClass ? "--storage-class $storageClass " : ""
151153
"""
152154
# aws helper for s5cmd
153155
nxf_s3_upload() {
@@ -156,11 +158,11 @@ class S3BashLib extends BashFunLib<S3BashLib> {
156158
if [[ "\$name" == - ]]; then
157159
local tmp=\$(nxf_mktemp)
158160
cp /dev/stdin \$tmp/\$name
159-
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass \$tmp/\$name "\$s3path"
161+
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}${storageClassParam}\$tmp/\$name "\$s3path"
160162
elif [[ -d "\$name" ]]; then
161-
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name/" "\$s3path/\$name/"
163+
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}${storageClassParam}"\$name/" "\$s3path/\$name/"
162164
else
163-
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name" "\$s3path/\$name"
165+
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}${storageClassParam}"\$name" "\$s3path/\$name"
164166
fi
165167
}
166168

plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchFileCopyStrategyTest.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,11 @@ class AwsBatchFileCopyStrategyTest extends Specification {
190190
local name=$1
191191
local s3path=$2
192192
if [[ "$name" == - ]]; then
193-
aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path"
193+
aws s3 cp --only-show-errors - "$s3path"
194194
elif [[ -d "$name" ]]; then
195-
aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name"
195+
aws s3 cp --only-show-errors --recursive "$name" "$s3path/$name"
196196
else
197-
aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name"
197+
aws s3 cp --only-show-errors "$name" "$s3path/$name"
198198
fi
199199
}
200200

plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchScriptLauncherTest.groovy

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,11 @@ class AwsBatchScriptLauncherTest extends Specification {
124124
local name=$1
125125
local s3path=$2
126126
if [[ "$name" == - ]]; then
127-
/conda/bin/aws --region eu-west-1 s3 cp --only-show-errors --storage-class STANDARD - "$s3path"
127+
/conda/bin/aws --region eu-west-1 s3 cp --only-show-errors - "$s3path"
128128
elif [[ -d "$name" ]]; then
129-
/conda/bin/aws --region eu-west-1 s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name"
129+
/conda/bin/aws --region eu-west-1 s3 cp --only-show-errors --recursive "$name" "$s3path/$name"
130130
else
131-
/conda/bin/aws --region eu-west-1 s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name"
131+
/conda/bin/aws --region eu-west-1 s3 cp --only-show-errors "$name" "$s3path/$name"
132132
fi
133133
}
134134
@@ -303,11 +303,11 @@ class AwsBatchScriptLauncherTest extends Specification {
303303
local name=$1
304304
local s3path=$2
305305
if [[ "$name" == - ]]; then
306-
aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path"
306+
aws s3 cp --only-show-errors - "$s3path"
307307
elif [[ -d "$name" ]]; then
308-
aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name"
308+
aws s3 cp --only-show-errors --recursive "$name" "$s3path/$name"
309309
else
310-
aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name"
310+
aws s3 cp --only-show-errors "$name" "$s3path/$name"
311311
fi
312312
}
313313
@@ -475,11 +475,11 @@ class AwsBatchScriptLauncherTest extends Specification {
475475
local name=$1
476476
local s3path=$2
477477
if [[ "$name" == - ]]; then
478-
aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path"
478+
aws s3 cp --only-show-errors - "$s3path"
479479
elif [[ -d "$name" ]]; then
480-
aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name"
480+
aws s3 cp --only-show-errors --recursive "$name" "$s3path/$name"
481481
else
482-
aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name"
482+
aws s3 cp --only-show-errors "$name" "$s3path/$name"
483483
fi
484484
}
485485
@@ -591,11 +591,11 @@ class AwsBatchScriptLauncherTest extends Specification {
591591
local name=$1
592592
local s3path=$2
593593
if [[ "$name" == - ]]; then
594-
aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path"
594+
aws s3 cp --only-show-errors - "$s3path"
595595
elif [[ -d "$name" ]]; then
596-
aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name"
596+
aws s3 cp --only-show-errors --recursive "$name" "$s3path/$name"
597597
else
598-
aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name"
598+
aws s3 cp --only-show-errors "$name" "$s3path/$name"
599599
fi
600600
}
601601

0 commit comments

Comments
 (0)