Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.zip.CRC32C;
import java.util.Comparator;
import java.util.List;
import java.util.Queue;
Expand Down Expand Up @@ -129,7 +128,7 @@ public final class S3OutputStream extends OutputStream {
*/
private ByteBuffer buf;

private MessageDigest md5;
private CRC32C crc32c;

/**
* Phaser object to synchronize stream termination
Expand Down Expand Up @@ -208,15 +207,25 @@ public S3OutputStream setContentType(String type) {
}

/**
* @return A MD5 message digester
* @return A new CRC32C checksum instance
*/
private MessageDigest createMd5() {
try {
return MessageDigest.getInstance("MD5");
}
catch(NoSuchAlgorithmException e) {
throw new IllegalStateException("Cannot find a MD5 algorithm provider",e);
}
private CRC32C createCrc32c() {
return new CRC32C();
}

/**
* Convert CRC32C checksum value to a 4-byte big-endian array
* @param crc32c The CRC32C checksum instance
* @return The checksum as a 4-byte array
*/
private byte[] getCrc32cBytes(CRC32C crc32c) {
long value = crc32c.getValue();
return new byte[] {
(byte) ((value >> 24) & 0xFF),
(byte) ((value >> 16) & 0xFF),
(byte) ((value >> 8) & 0xFF),
(byte) (value & 0xFF)
};
}

/**
Expand All @@ -233,7 +242,7 @@ public void write (int b) throws IOException {
}
if( buf == null ) {
buf = allocate();
md5 = createMd5();
crc32c = createCrc32c();
}
else if( !buf.hasRemaining() ) {
if( buf.position() < bufferSize ) {
Expand All @@ -243,13 +252,13 @@ else if( !buf.hasRemaining() ) {
flush();
// create a new buffer
buf = allocate();
md5 = createMd5();
crc32c = createCrc32c();
}
}

buf.put((byte) b);
// update the md5 checksum
md5.update((byte) b);
// update the checksum
crc32c.update(b);
}

/**
Expand All @@ -263,7 +272,7 @@ public void flush() throws IOException {
if( uploadBuffer(buf, false) ) {
// clear the current buffer
buf = null;
md5 = null;
crc32c = null;
}
}

Expand Down Expand Up @@ -312,7 +321,7 @@ private boolean uploadBuffer(ByteBuffer buf, boolean last) throws IOException {
}

// set the buffer in read mode and submit for upload
executor.submit( task(buf, md5.digest(), ++partsCount) );
executor.submit( task(buf, getCrc32cBytes(crc32c), ++partsCount) );

return true;
}
Expand Down Expand Up @@ -382,10 +391,10 @@ public void close() throws IOException {

if (uploadId == null) {
if( buf != null )
putObject(buf, md5.digest());
putObject(buf, getCrc32cBytes(crc32c));
else
// this is needed when trying to upload an empty
putObject(new ByteArrayInputStream(new byte[]{}), 0, createMd5().digest());
// this is needed when trying to upload an empty
putObject(new ByteArrayInputStream(new byte[]{}), 0, getCrc32cBytes(createCrc32c()));
}
else {
// -- upload remaining chunk
Expand Down Expand Up @@ -500,7 +509,7 @@ private void uploadPart(final InputStream content, final long contentLength, fin
reqBuilder.uploadId(uploadId);
reqBuilder.partNumber(partNumber);
reqBuilder.contentLength(contentLength);
reqBuilder.contentMD5(Base64.getEncoder().encodeToString(checksum));
reqBuilder.checksumCRC32C(Base64.getEncoder().encodeToString(checksum));

final UploadPartResponse resp = s3.uploadPart(reqBuilder.build(), RequestBody.fromInputStream(content, contentLength));
log.trace("Uploaded part {} with length {} for {}: {}", partNumber, contentLength, objectId, resp.eTag());
Expand Down Expand Up @@ -598,7 +607,7 @@ private void putObject(final InputStream content, final long contentLength, byte
reqBuilder.bucket(objectId.bucket());
reqBuilder.key(objectId.key());
reqBuilder.contentLength(contentLength);
reqBuilder.contentMD5( Base64.getEncoder().encodeToString(checksum) );
reqBuilder.checksumCRC32C(Base64.getEncoder().encodeToString(checksum));
if( cannedAcl!=null ) {
reqBuilder.acl(cannedAcl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,99 +73,58 @@ public S3Object lookup(S3Path s3Path) throws NoSuchFileException {
}

/**
* Lookup for the S3 object matching the specified path using at most two bounded
* {@code listObjects} calls (replaces the previous unbounded pagination loop).
* Lookup for the S3 object matching the specified path using at one head request
* and one bounded list-objects call.
*
* @param s3Path the S3 path to look up
* @param client the S3 client
* @return the matching {@link S3Object}, or {@code null} if not found
*/
private S3Object getS3Object(S3Path s3Path, S3Client client) {

// Call 1: list up to 2 objects whose key starts with the target key.
//
// Why maxKeys(2) instead of paginating all results?
// The previous implementation used an unbounded while(true) loop fetching 250 keys
// per page. On prefixes with millions of objects this caused excessive S3 LIST API
// calls, high latency, and potential timeouts. Two results are enough to cover
// the common cases:
// - Exact file match: the key itself exists as an object (e.g. "data.txt")
// - Directory match: a child object (e.g. "data/file1") appears within the
// first 2 lexicographic results
ListObjectsRequest request = ListObjectsRequest.builder()
.bucket(s3Path.getBucket())
.prefix(s3Path.getKey())
.maxKeys(2)
.build();

ListObjectsResponse listing = client.listObjects(request);
List<S3Object> results = listing.contents();

for( S3Object item : results ) {
if( matchName(s3Path.getKey(), item)) {
return item;
/*
* First: try HEAD request for exact object (fast, works on all bucket types including Express)
*/
try {
HeadObjectResponse metadata = client.getObjectMetadata(s3Path.getBucket(), s3Path.getKey());
if( metadata != null ) {
return S3Object.builder()
.key(s3Path.getKey())
.size(metadata.contentLength())
.lastModified(metadata.lastModified())
.eTag(metadata.eTag())
.storageClass(metadata.storageClassAsString())
.build();
}
}

// Call 2 (fallback): list 1 object with prefix "key/" to detect directories
// that Call 1 missed.
//
// Why can Call 1 miss a directory?
// S3 lists keys in lexicographic (UTF-8 byte) order, and several common characters
// sort *before* '/' (0x2F) — notably '-' (0x2D) and '.' (0x2E).
//
// Example: given keys "a-a/file-3", "a.txt", and "a/file-1", S3 returns them as:
// a-a/file-3 ← '-' (0x2D) < '/' (0x2F)
// a.txt ← '.' (0x2E) < '/' (0x2F)
// a/file-1 ← '/' (0x2F) — the actual directory child
//
// With maxKeys(2), Call 1 only sees "a-a/file-3" and "a.txt" — neither matches
// key "a" via matchName(). The directory child "a/file-1" is pushed beyond the
// result window by sibling keys that sort earlier.
//
// By searching with prefix "a/" directly, we skip all those siblings and find
// "a/file-1", confirming that "a" is a directory.
request = ListObjectsRequest.builder()
.bucket(s3Path.getBucket())
.prefix(s3Path.getKey()+'/')
.maxKeys(1)
.build();

listing = client.listObjects(request);
results = listing.contents();
for( S3Object item : results ) {
if( matchName(s3Path.getKey(), item)) {
return item;
catch (S3Exception e) {
if( e.statusCode() != 404 ) {
throw e;
}
// 404 = object doesn't exist as a file, fall through to directory check
}
return null;
}

private boolean matchName(String fileName, S3Object summary) {
String foundKey = summary.key();

// they are different names return false
if( !foundKey.startsWith(fileName) ) {
return false;
/*
* Second: check if it's a "directory" by listing with trailing slash.
* S3 Express One Zone buckets require prefixes to end with a delimiter.
*/
String prefix = s3Path.getKey();
if( !prefix.endsWith("/") ) {
prefix = prefix + "/";
}

// when they are the same length, they are identical
if( foundKey.length() == fileName.length() )
return true;

return foundKey.charAt(fileName.length()) == '/';
}
ListObjectsV2Request request = ListObjectsV2Request.builder()
.bucket(s3Path.getBucket())
.prefix(prefix)
.maxKeys(1) // only need to find one child to confirm directory exists
.build();

public HeadObjectResponse getS3ObjectMetadata(S3Path s3Path) {
S3Client client = s3Path.getFileSystem().getClient();
try {
return client.getObjectMetadata(s3Path.getBucket(), s3Path.getKey());
}
catch (S3Exception e){
if (e.statusCode() != 404){
throw e;
}
return null;
for( S3Object item : client.listObjectsV2Paginator(request).contents() ) {
// Found a child, so this "directory" exists - return a synthetic S3Object for it
return S3Object.builder()
.key(s3Path.getKey())
.build();
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import software.amazon.awssdk.services.s3.model.ObjectCannedACL
@CompileStatic
class S3BashLib extends BashFunLib<S3BashLib> {

private String storageClass = 'STANDARD'
private String storageClass = ''
private String storageEncryption = ''
private String storageKmsKeyId = ''
private String debug = ''
Expand Down Expand Up @@ -112,17 +112,18 @@ class S3BashLib extends BashFunLib<S3BashLib> {
* @return The Bash script implementing the S3 helper functions
*/
protected String s3Lib() {
final storageClassParam = storageClass ? "--storage-class $storageClass " : ""
"""
# aws helper
nxf_s3_upload() {
local name=\$1
local s3path=\$2
if [[ "\$name" == - ]]; then
$cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass - "\$s3path"
$cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}${storageClassParam}- "\$s3path"
elif [[ -d "\$name" ]]; then
$cli s3 cp --only-show-errors --recursive ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name" "\$s3path/\$name"
$cli s3 cp --only-show-errors --recursive ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}${storageClassParam}"\$name" "\$s3path/\$name"
else
$cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name" "\$s3path/\$name"
$cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}${storageClassParam}"\$name" "\$s3path/\$name"
fi
}

Expand All @@ -148,6 +149,7 @@ class S3BashLib extends BashFunLib<S3BashLib> {
*/
protected String s5cmdLib() {
final cli = s5cmdPath
final storageClassParam = storageClass ? "--storage-class $storageClass " : ""
"""
# aws helper for s5cmd
nxf_s3_upload() {
Expand All @@ -156,11 +158,11 @@ class S3BashLib extends BashFunLib<S3BashLib> {
if [[ "\$name" == - ]]; then
local tmp=\$(nxf_mktemp)
cp /dev/stdin \$tmp/\$name
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass \$tmp/\$name "\$s3path"
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}${storageClassParam}\$tmp/\$name "\$s3path"
elif [[ -d "\$name" ]]; then
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name/" "\$s3path/\$name/"
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}${storageClassParam}"\$name/" "\$s3path/\$name/"
else
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name" "\$s3path/\$name"
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}${storageClassParam}"\$name" "\$s3path/\$name"
fi
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ class AwsBatchFileCopyStrategyTest extends Specification {
local name=$1
local s3path=$2
if [[ "$name" == - ]]; then
aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path"
aws s3 cp --only-show-errors - "$s3path"
elif [[ -d "$name" ]]; then
aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name"
aws s3 cp --only-show-errors --recursive "$name" "$s3path/$name"
else
aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name"
aws s3 cp --only-show-errors "$name" "$s3path/$name"
fi
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ class AwsBatchScriptLauncherTest extends Specification {
local name=$1
local s3path=$2
if [[ "$name" == - ]]; then
/conda/bin/aws --region eu-west-1 s3 cp --only-show-errors --storage-class STANDARD - "$s3path"
/conda/bin/aws --region eu-west-1 s3 cp --only-show-errors - "$s3path"
elif [[ -d "$name" ]]; then
/conda/bin/aws --region eu-west-1 s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name"
/conda/bin/aws --region eu-west-1 s3 cp --only-show-errors --recursive "$name" "$s3path/$name"
else
/conda/bin/aws --region eu-west-1 s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name"
/conda/bin/aws --region eu-west-1 s3 cp --only-show-errors "$name" "$s3path/$name"
fi
}

Expand Down Expand Up @@ -303,11 +303,11 @@ class AwsBatchScriptLauncherTest extends Specification {
local name=$1
local s3path=$2
if [[ "$name" == - ]]; then
aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path"
aws s3 cp --only-show-errors - "$s3path"
elif [[ -d "$name" ]]; then
aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name"
aws s3 cp --only-show-errors --recursive "$name" "$s3path/$name"
else
aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name"
aws s3 cp --only-show-errors "$name" "$s3path/$name"
fi
}

Expand Down Expand Up @@ -475,11 +475,11 @@ class AwsBatchScriptLauncherTest extends Specification {
local name=$1
local s3path=$2
if [[ "$name" == - ]]; then
aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path"
aws s3 cp --only-show-errors - "$s3path"
elif [[ -d "$name" ]]; then
aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name"
aws s3 cp --only-show-errors --recursive "$name" "$s3path/$name"
else
aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name"
aws s3 cp --only-show-errors "$name" "$s3path/$name"
fi
}

Expand Down Expand Up @@ -591,11 +591,11 @@ class AwsBatchScriptLauncherTest extends Specification {
local name=$1
local s3path=$2
if [[ "$name" == - ]]; then
aws s3 cp --only-show-errors --storage-class STANDARD - "$s3path"
aws s3 cp --only-show-errors - "$s3path"
elif [[ -d "$name" ]]; then
aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name"
aws s3 cp --only-show-errors --recursive "$name" "$s3path/$name"
else
aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name"
aws s3 cp --only-show-errors "$name" "$s3path/$name"
fi
}

Expand Down
Loading
Loading