diff --git a/.buildkite/pipelines/pull-request/s3.yml b/.buildkite/pipelines/pull-request/s3.yml new file mode 100644 index 0000000000000..66fa688e94b5f --- /dev/null +++ b/.buildkite/pipelines/pull-request/s3.yml @@ -0,0 +1,15 @@ +steps: + - label: third-party / s3 + command: | + export amazon_s3_bucket=elasticsearch-ci.us-west-2 + export amazon_s3_base_path=$BUILDKITE_BRANCH + + .ci/scripts/run-gradle.sh s3ThirdPartyTest + env: + USE_3RD_PARTY_S3_CREDENTIALS: "true" + timeout_in_minutes: 30 + agents: + provider: gcp + image: family/elasticsearch-ubuntu-2004 + machineType: n2-standard-8 + buildDirectory: /dev/shm/bk diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java index 4cebedebfba07..f512eac9fd891 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -61,12 +61,13 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes static final boolean USE_FIXTURE = Booleans.parseBoolean(System.getProperty("tests.use.fixture", "true")); @ClassRule - public static MinioTestContainer minio = new MinioTestContainer( - USE_FIXTURE, - System.getProperty("test.s3.account"), - System.getProperty("test.s3.key"), - System.getProperty("test.s3.bucket") - ); + // public static MinioTestContainer minio = new MinioTestContainer( + // USE_FIXTURE, + // System.getProperty("test.s3.account"), + // System.getProperty("test.s3.key"), + // System.getProperty("test.s3.bucket") + // ); + public static MinioTestContainer minio = null; @Override protected Collection> getPlugins() { @@ -82,6 +83,7 @@ protected SecureSettings credentials() { MockSecureSettings secureSettings = new MockSecureSettings(); secureSettings.setString("s3.client.default.access_key", System.getProperty("test.s3.account")); secureSettings.setString("s3.client.default.secret_key", System.getProperty("test.s3.key")); + secureSettings.setString("s3.client.default.session_token", System.getenv("S3_SESSION")); return secureSettings; } @@ -109,7 +111,8 @@ protected Settings nodeSettings() { protected void createRepository(String repoName) { Settings.Builder settings = Settings.builder() .put("bucket", System.getProperty("test.s3.bucket")) - .put("base_path", System.getProperty("test.s3.base", "testpath")); + .put("base_path", System.getProperty("test.s3.base", "testpath")) + .put("buffer_size", S3Repository.MIN_PART_SIZE_USING_MULTIPART); final String endpoint = USE_FIXTURE ? minio.getAddress() : System.getProperty("test.s3.endpoint"); if (endpoint != null) { settings.put("endpoint", endpoint); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index ea13657964016..c78be999a4aa4 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -69,8 +69,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.time.Instant; import java.util.ArrayList; +import java.util.Base64; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -189,13 +192,18 @@ private void flushBuffer(boolean lastPart) throws IOException { } } assert lastPart == false || successful : "must only write last part if successful"; + final var byteStream = buffer.bytes().streamInput(); + byteStream.mark(0); + final var md5 = md5DigestOfInputStream(byteStream, Long.MAX_VALUE); + byteStream.reset(); final UploadPartRequest uploadRequest = createPartUploadRequest( purpose, - buffer.bytes().streamInput(), + byteStream, uploadId.get(), parts.size() + 1, absoluteBlobKey, buffer.size(), + md5, lastPart ); final UploadPartResult uploadResponse; @@ -260,6 +268,7 @@ private UploadPartRequest createPartUploadRequest( int number, String blobName, long size, + byte[] partMd5, boolean lastPart ) { final UploadPartRequest uploadRequest = new UploadPartRequest(); @@ -270,6 +279,7 @@ private UploadPartRequest createPartUploadRequest( uploadRequest.setInputStream(stream); S3BlobStore.configureRequestForMetrics(uploadRequest, blobStore, Operation.PUT_MULTIPART_OBJECT, purpose); uploadRequest.setPartSize(size); + uploadRequest.setMd5Digest(Base64.getEncoder().encodeToString(partMd5)); uploadRequest.setLastPart(lastPart); return uploadRequest; } @@ -457,8 +467,18 @@ void executeSingleUpload( if (blobSize > s3BlobStore.bufferSizeInBytes()) { throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size"); } + // required to reset the stream for MD5 calculation + if (input.markSupported() == false) { + throw new IllegalArgumentException("input stream mark not supported"); + } final ObjectMetadata md = new ObjectMetadata(); + + input.mark(0); + final byte[] md5 = md5DigestOfInputStream(input, Long.MAX_VALUE); + input.reset(); + md.setContentMD5(Base64.getEncoder().encodeToString(md5)); + md.setContentLength(blobSize); if (s3BlobStore.serverSideEncryption()) { md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); @@ -485,7 +505,6 @@ void executeMultipartUpload( final InputStream input, final long blobSize ) throws IOException { - ensureMultiPartUploadSize(blobSize); final long partSize = s3BlobStore.bufferSizeInBytes(); final Tuple multiparts = numberOfMultiparts(blobSize, partSize); @@ -498,6 +517,19 @@ void executeMultipartUpload( final long lastPartSize = multiparts.v2(); assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes"; + // required to reset the stream for MD5 calculation + if (input.markSupported() == false) { + throw new IllegalArgumentException("input stream mark not supported"); + } + + final byte[][] md5s = new byte[nbParts][]; + input.mark(0); + for (int i = 0; i < nbParts - 1; i++) { + md5s[i] = md5DigestOfInputStream(input, partSize); + } + md5s[nbParts - 1] = md5DigestOfInputStream(input, lastPartSize); + input.reset(); + final SetOnce uploadId = new SetOnce<>(); final String bucketName = s3BlobStore.bucket(); boolean success = false; @@ -525,6 +557,7 @@ void executeMultipartUpload( i, blobName, lastPart ? lastPartSize : partSize, + md5s[i - 1], lastPart ); bytesCount += uploadRequest.getPartSize(); @@ -564,6 +597,28 @@ void executeMultipartUpload( } } + // Calculate the MD5 of up to remaining bytes of the given InputStream + private byte[] md5DigestOfInputStream(final InputStream inputStream, long remaining) throws IOException { + try { + final MessageDigest md5 = MessageDigest.getInstance("MD5"); + // update in chunks to bound memory usage while amortizing read cost + byte[] buffer = new byte[65536]; + int bytesRead; + do { + final int toRead = (int) Math.min(remaining, buffer.length); + bytesRead = inputStream.read(buffer, 0, toRead); + if (bytesRead > 0) { + md5.update(buffer, 0, bytesRead); + remaining -= bytesRead; + } + } while (bytesRead > 0); + + return md5.digest(); + } catch (NoSuchAlgorithmException e) { + throw new IOException(e); + } + } + // non-static, package private for testing void ensureMultiPartUploadSize(final long blobSize) { if (blobSize > MAX_FILE_SIZE_USING_MULTIPART.getBytes()) { diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java index 58bb11874fbe6..32e2202e868df 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java @@ -213,7 +213,7 @@ public void testExecuteMultipartUpload() throws IOException { final ArgumentCaptor compArgCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class); when(client.completeMultipartUpload(compArgCaptor.capture())).thenReturn(new CompleteMultipartUploadResult()); - final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]); + final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[1]); final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize); diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index b9f2e797f71dd..3ff7117d38600 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -290,9 +290,12 @@ public void testIndexLatest() throws Exception { public void testReadFromPositionWithLength() { final var blobName = randomIdentifier(); - final var blobBytes = randomBytesReference(randomIntBetween(100, 2_000)); + // forcing multipart temporarily + final var blobBytes = randomBytesReference(randomIntBetween(25 * 1024 * 1024 + 100, 25 * 1024 * 1024 + 2_000)); final var repository = getRepository(); + logger.info("---> uploading blob of size {}", blobBytes.length()); + logger.info("repository buffer size: {}", repository.getReadBufferSizeInBytes()); executeOnBlobStore(repository, blobStore -> { blobStore.writeBlob(randomPurpose(), blobName, blobBytes, true); return null;