From 47f190abfd3c1c8a9edb160bd0b1a84b4af41548 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 18 Sep 2025 12:19:58 -0400 Subject: [PATCH] Add Spring Integration for S3 support * Copy all the S3 functionality from the Spring Integration AWS project * Document the feature * Add `spring-cloud-aws-starter-integration-s3` convenient module * Introduce `LocalstackContainerTest` in the tests, similar to the one in DynamoDB module. Reuse this abstraction for all the S3 integration tests to avoid unnecessary LocalStack container restarts while we running all the tests in module --- docs/src/main/asciidoc/s3.adoc | 146 ++++++ pom.xml | 1 + spring-cloud-aws-dependencies/pom.xml | 6 + spring-cloud-aws-s3/pom.xml | 10 + .../cloud/s3/integration/S3FileInfo.java | 88 ++++ .../S3InboundFileSynchronizer.java | 87 ++++ ...InboundFileSynchronizingMessageSource.java | 47 ++ .../s3/integration/S3MessageHandler.java | 454 ++++++++++++++++++ .../S3PersistentAcceptOnceFileListFilter.java | 56 +++ .../S3RegexPatternFileListFilter.java | 49 ++ .../s3/integration/S3RemoteFileTemplate.java | 67 +++ .../cloud/s3/integration/S3Session.java | 261 ++++++++++ .../s3/integration/S3SessionFactory.java | 65 +++ .../S3SimplePatternFileListFilter.java | 44 ++ .../integration/S3StreamingMessageSource.java | 73 +++ .../cloud/s3/LocalstackContainerTest.java | 72 +++ ...hMatchingResourcePatternResolverTests.java | 29 +- .../cloud/s3/S3ResourceIntegrationTests.java | 44 +- .../cloud/s3/S3TemplateIntegrationTests.java | 30 +- .../S3InboundChannelAdapterTests.java | 149 ++++++ .../s3/integration/S3MessageHandlerTests.java | 338 +++++++++++++ .../S3StreamingChannelAdapterTests.java | 120 +++++ .../pom.xml | 35 ++ 23 files changed, 2205 insertions(+), 66 deletions(-) create mode 100644 spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3FileInfo.java create mode 100644 spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizer.java create mode 100644 spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizingMessageSource.java create mode 100644 spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3MessageHandler.java create mode 100644 spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3PersistentAcceptOnceFileListFilter.java create mode 100644 spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RegexPatternFileListFilter.java create mode 100644 spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RemoteFileTemplate.java create mode 100644 spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3Session.java create mode 100644 spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SessionFactory.java create mode 100644 spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SimplePatternFileListFilter.java create mode 100644 spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3StreamingMessageSource.java create mode 100644 spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/LocalstackContainerTest.java create mode 100644 spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3InboundChannelAdapterTests.java create mode 100644 spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3MessageHandlerTests.java create mode 100644 spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3StreamingChannelAdapterTests.java create mode 100644 spring-cloud-aws-starters/spring-cloud-aws-starter-integration-s3/pom.xml diff --git a/docs/src/main/asciidoc/s3.adoc b/docs/src/main/asciidoc/s3.adoc index 6a9fc318a..c69396d03 100644 --- a/docs/src/main/asciidoc/s3.adoc +++ b/docs/src/main/asciidoc/s3.adoc @@ -641,3 +641,149 @@ Sample IAM policy granting access to `spring-cloud-aws-demo` bucket: ] } ---- + +=== Spring Integration Support + +Starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] channel adapters for Amazon SQS. + +The S3 Channel Adapters are based on the `S3Client` template and `S3TransferManager`. +See their specification and Javadocs for more information. + +The S3 Inbound Channel Adapter is represented by the `S3InboundFileSynchronizingMessageSource` and allows pulling S3 objects as files from the S3 bucket to the local directory for synchronization. +This adapter is fully similar to the Inbound Channel Adapters in the FTP and SFTP Spring Integration modules. +See more information in the https://docs.spring.io/spring-integration/reference/ftp.html[FTP/FTPS Adapters Chapter] for common options or `SessionFactory`, `RemoteFileTemplate` and `FileListFilter` abstractions. + +The Java Configuration is: + +[source,java] +---- +@SpringBootApplication +public static class MyConfiguration { + + @Autowired + private S3Client amazonS3; + + @Bean + public S3InboundFileSynchronizer s3InboundFileSynchronizer() { + S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(this.amazonS3); + synchronizer.setDeleteRemoteFiles(true); + synchronizer.setPreserveTimestamp(true); + synchronizer.setRemoteDirectory(S3_BUCKET); + synchronizer.setFilter(new S3RegexPatternFileListFilter(".*\\.test$")); + Expression expression = PARSER.parseExpression("#this.toUpperCase() + '.a'"); + synchronizer.setLocalFilenameGeneratorExpression(expression); + return synchronizer; + } + + @Bean + @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "100")) + public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() { + S3InboundFileSynchronizingMessageSource messageSource = + new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer()); + messageSource.setAutoCreateLocalDirectory(true); + messageSource.setLocalDirectory(LOCAL_FOLDER); + messageSource.setLocalFilter(new AcceptOnceFileListFilter()); + return messageSource; + } + + @Bean + public PollableChannel s3FilesChannel() { + return new QueueChannel(); + } +} +---- + +With this config you receive messages with `java.io.File` `payload` from the `s3FilesChannel` after periodic synchronization of content from the Amazon S3 bucket into the local directory. + +The `S3StreamingMessageSource` adapter produces messages with payloads of type `InputStream`, allowing S3 objects to be fetched without writing to the local file system. +Since the session remains open, the consuming application is responsible for closing the session when the file has been consumed. +The session is provided in the closeableResource header (`IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE`). +Standard framework components, such as the `FileSplitter` and `StreamTransformer` will automatically close the session. + +The following Spring Boot application provides an example of configuring the S3 inbound streaming adapter using Java configuration: + +[source,java] +---- +@SpringBootApplication +public class S3JavaApplication { + + public static void main(String[] args) { + new SpringApplicationBuilder(S3JavaApplication.class) + .web(false) + .run(args); + } + + @Autowired + private S3Client amazonS3; + + @Bean + @InboundChannelAdapter(value = "s3Channel", poller = @Poller(fixedDelay = "100")) + public MessageSource s3InboundStreamingMessageSource() { + S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template()); + messageSource.setRemoteDirectory(S3_BUCKET); + messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), + "streaming")); + return messageSource; + } + + @Bean + @Transformer(inputChannel = "s3Channel", outputChannel = "data") + public org.springframework.integration.transformer.Transformer transformer() { + return new StreamTransformer(); + } + + @Bean + public S3RemoteFileTemplate template() { + return new S3RemoteFileTemplate(new S3SessionFactory(this.amazonS3)); + } + + @Bean + public PollableChannel s3Channel() { + return new QueueChannel(); + } +} +---- + +> NOTE: Unlike the non-streaming inbound channel adapter, this adapter does not prevent duplicates by default. +> If you do not delete the remote file and wish to prevent the file being processed again, you can configure an `S3PersistentFileListFilter` in the `filter` attribute. +> If you don’t want to persist the state, an in-memory `SimpleMetadataStore` can be used with the filter. +> If you wish to use a filename pattern (or regex) as well, use a `CompositeFileListFilter`. + +The `S3MessageHandler` is an Outbound Channel Adapter and allows performing `upload`, `download` and `copy` (see `S3MessageHandler.Command` enum) operations in the provided S3 bucket. + +The Java Configuration is: + +[source,java] +---- +@SpringBootApplication +public static class MyConfiguration { + + @Autowired + private S3AsyncClient amazonS3; + + @Bean + @ServiceActivator(inputChannel = "s3UploadChannel") + public MessageHandler s3MessageHandler() { + return new S3MessageHandler(this.amazonS3, "my-bucket"); + } + +} +---- + +With this config you can send a message with the `java.io.File` as `payload` and the `transferManager.upload()` operation will be performed, where the file name is used as a S3 Object key. + +See more information in the `S3MessageHandler` JavaDocs. + +NOTE: The AWS SDK recommends to use `S3CrtAsyncClient` for `S3TransferManager`, therefore an `S3AsyncClient.crtBuilder()` has to be used to achieve respective upload and download requirements, what is done internally in the `S3MessageHandler` when `S3CrtAsyncClient`-based constructor is used. + +The `S3MessageHandler` can be used as an Outbound Gateway with the `produceReply = true` constructor argument for Java Configuration. + +The "request-reply" nature of this gateway is async and the `Transfer` result from the `TransferManager` operation is sent to the `outputChannel`, assuming the transfer progress observation in the downstream flow. + +The `TransferListener` can be supplied to the `S3MessageHandler` to track the transfer progress per requests. + +See more information in the `S3MessageHandler` Javadocs. + +The Spring Integration dependency (as well as `s3-transfer-manager` and `aws-crt-client`) in the `spring-cloud-aws-s3` module are `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used. +For convenience, a dedicated `spring-cloud-aws-starter-integration-s3` is provided managing all the required dependencies for Spring Integration support with Amazon S3. + diff --git a/pom.xml b/pom.xml index 96538ab75..9372e8c2a 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ spring-cloud-aws-starters/spring-cloud-aws-starter-metrics spring-cloud-aws-starters/spring-cloud-aws-starter-parameter-store spring-cloud-aws-starters/spring-cloud-aws-starter-s3 + spring-cloud-aws-starters/spring-cloud-aws-starter-integration-s3 spring-cloud-aws-starters/spring-cloud-aws-starter-secrets-manager spring-cloud-aws-starters/spring-cloud-aws-starter-ses spring-cloud-aws-starters/spring-cloud-aws-starter-sns diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml index df111f992..367af6de3 100644 --- a/spring-cloud-aws-dependencies/pom.xml +++ b/spring-cloud-aws-dependencies/pom.xml @@ -202,6 +202,12 @@ ${project.version} + + io.awspring.cloud + spring-cloud-aws-starter-integration-s3 + ${project.version} + + io.awspring.cloud spring-cloud-aws-starter-sns diff --git a/spring-cloud-aws-s3/pom.xml b/spring-cloud-aws-s3/pom.xml index d0db70219..7044eeacd 100644 --- a/spring-cloud-aws-s3/pom.xml +++ b/spring-cloud-aws-s3/pom.xml @@ -40,6 +40,11 @@ aws-crt-client true + + org.springframework.integration + spring-integration-file + true + com.fasterxml.jackson.core jackson-databind @@ -55,6 +60,11 @@ junit-jupiter test + + org.springframework.integration + spring-integration-test + test + diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3FileInfo.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3FileInfo.java new file mode 100644 index 000000000..634f3a94a --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3FileInfo.java @@ -0,0 +1,88 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.util.Date; +import org.springframework.integration.file.remote.AbstractFileInfo; +import org.springframework.util.Assert; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * An Amazon S3 {@link org.springframework.integration.file.remote.FileInfo} implementation. + * + * @author Christian Tzolov + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3FileInfo extends AbstractFileInfo { + + private final S3Object s3Object; + + public S3FileInfo(S3Object s3Object) { + Assert.notNull(s3Object, "s3Object must not be null"); + this.s3Object = s3Object; + } + + @Override + public boolean isDirectory() { + return false; + } + + @Override + public boolean isLink() { + return false; + } + + @Override + public long getSize() { + return this.s3Object.size(); + } + + @Override + public long getModified() { + return this.s3Object.lastModified().getEpochSecond(); + } + + @Override + public String getFilename() { + return this.s3Object.key(); + } + + /** + * A permissions representation string. Throws {@link UnsupportedOperationException} to avoid extra + * {@link software.amazon.awssdk.services.s3.S3Client#getObjectAcl} REST call. The target application may choose to + * do that according to its logic. + * @return the permissions representation string. + */ + @Override + public String getPermissions() { + throw new UnsupportedOperationException("Use [AmazonS3.getObjectAcl()] to obtain permissions."); + } + + @Override + public S3Object getFileInfo() { + return this.s3Object; + } + + @Override + public String toString() { + return "FileInfo [isDirectory=" + isDirectory() + ", isLink=" + isLink() + ", Size=" + getSize() + + ", ModifiedTime=" + new Date(getModified()) + ", Filename=" + getFilename() + ", RemoteDirectory=" + + getRemoteDirectory() + "]"; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizer.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizer.java new file mode 100644 index 000000000..edc74afdc --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizer.java @@ -0,0 +1,87 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.io.File; +import java.io.IOException; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.file.remote.session.Session; +import org.springframework.integration.file.remote.session.SessionFactory; +import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer; +import org.springframework.integration.metadata.SimpleMetadataStore; +import org.springframework.lang.Nullable; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * An implementation of {@link AbstractInboundFileSynchronizer} for Amazon S3. + * + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3InboundFileSynchronizer extends AbstractInboundFileSynchronizer { + + public S3InboundFileSynchronizer() { + this(new S3SessionFactory()); + } + + public S3InboundFileSynchronizer(S3Client amazonS3) { + this(new S3SessionFactory(amazonS3)); + } + + /** + * Create a synchronizer with the {@link SessionFactory} used to acquire {@link Session} instances. + * @param sessionFactory The session factory. + */ + @SuppressWarnings("this-escape") + public S3InboundFileSynchronizer(SessionFactory sessionFactory) { + super(sessionFactory); + doSetRemoteDirectoryExpression(new LiteralExpression(null)); + doSetFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "s3MessageSource")); + } + + @Override + protected boolean isFile(S3Object file) { + return true; + } + + @Override + protected String getFilename(S3Object file) { + return (file != null ? file.key() : null); + } + + @Override + protected long getModified(S3Object file) { + return file.lastModified().getEpochSecond(); + } + + @Override + protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, + @Nullable EvaluationContext localFileEvaluationContext, S3Object remoteFile, File localDirectory, + Session session) throws IOException { + + return super.copyFileToLocalDirectory(((S3Session) session).normalizeBucketName(remoteDirectoryPath), + localFileEvaluationContext, remoteFile, localDirectory, session); + } + + @Override + protected String protocol() { + return "s3"; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizingMessageSource.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizingMessageSource.java new file mode 100644 index 000000000..cecc86a87 --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizingMessageSource.java @@ -0,0 +1,47 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.io.File; +import java.util.Comparator; +import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer; +import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * A {@link org.springframework.integration.core.MessageSource} implementation for the Amazon S3. + * + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3InboundFileSynchronizingMessageSource extends AbstractInboundFileSynchronizingMessageSource { + + public S3InboundFileSynchronizingMessageSource(AbstractInboundFileSynchronizer synchronizer) { + super(synchronizer); + } + + public S3InboundFileSynchronizingMessageSource(AbstractInboundFileSynchronizer synchronizer, + Comparator comparator) { + + super(synchronizer, comparator); + } + + public String getComponentType() { + return "aws:s3-inbound-channel-adapter"; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3MessageHandler.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3MessageHandler.java new file mode 100644 index 000000000..32fa24d8a --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3MessageHandler.java @@ -0,0 +1,454 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.CompletionException; +import java.util.function.BiConsumer; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.expression.ValueExpression; +import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; +import org.springframework.integration.support.utils.IntegrationUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandlingException; +import org.springframework.util.Assert; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.internal.util.Mimetype; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.CopyRequest; +import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest; +import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest; +import software.amazon.awssdk.transfer.s3.model.Transfer; +import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest; +import software.amazon.awssdk.transfer.s3.model.UploadRequest; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; +import software.amazon.awssdk.utils.IoUtils; +import software.amazon.awssdk.utils.Md5Utils; + +/** + * The {@link AbstractReplyProducingMessageHandler} implementation for the Amazon S3 services. + *

+ * The implementation is fully based on the {@link S3TransferManager} and support its {@code upload}, {@code download} + * and {@code copy} operations which can be determined by the provided or evaluated via SpEL expression at runtime + * {@link Command}. + *

+ * This {@link AbstractReplyProducingMessageHandler} can behave as a "one-way" (by default) or "request-reply" component + * according to the {@link #produceReply} constructor argument. + *

+ * The "one-way" behavior is also blocking, which is achieved with the {@link Transfer#completionFuture()} invocation. + * Consider to use an async upstream hand off if this blocking behavior isn't appropriate. + *

+ * The "request-reply" behavior is async and the {@link Transfer} result from the {@link S3TransferManager} operation is + * sent to the {@link #getOutputChannel()}, assuming the transfer progress observation in the downstream flow. + *

+ * The {@link TransferListener} can be provided to track the transfer progress. Also, see a {@link Transfer} API + * returned as a reply message from this handler. + *

+ * For the upload operation the {@link BiConsumer} callback can be supplied to populate options on a + * {@link PutObjectRequest.Builder} against request message. + *

+ * For download operation the {@code payload} must be a {@link File} instance, representing a single file for downloaded + * content or directory to download all files from the S3 virtual directory. + *

+ * An S3 Object {@code key} for upload and download can be determined by the provided {@link #keyExpression} or the + * {@link File#getName()} is used directly. The former has precedence. + *

+ * For copy operation all {@link #keyExpression}, {@link #destinationBucketExpression} and + * {@link #destinationKeyExpression} are required and must not evaluate to {@code null}. + *

+ * + * @author Artem Bilan + * @author John Logan + * + * @since 4.0 + * + * @see S3TransferManager + */ +public class S3MessageHandler extends AbstractReplyProducingMessageHandler { + + private final S3TransferManager transferManager; + + private final boolean produceReply; + + private final Expression bucketExpression; + + private EvaluationContext evaluationContext; + + private Expression keyExpression; + + private Expression destinationBucketExpression; + + private Expression destinationKeyExpression; + + private Expression commandExpression = new ValueExpression<>(Command.UPLOAD); + + private BiConsumer> uploadMetadataProvider = (builder, message) -> { + }; + + private TransferListener transferListener; + + public S3MessageHandler(S3AsyncClient amazonS3, String bucket) { + this(amazonS3, bucket, false); + } + + public S3MessageHandler(S3AsyncClient amazonS3, Expression bucketExpression) { + this(amazonS3, bucketExpression, false); + } + + public S3MessageHandler(S3AsyncClient amazonS3, String bucket, boolean produceReply) { + this(amazonS3, new LiteralExpression(bucket), produceReply); + Assert.notNull(bucket, "'bucket' must not be null"); + } + + public S3MessageHandler(S3AsyncClient amazonS3, Expression bucketExpression, boolean produceReply) { + this(S3TransferManager.builder().s3Client(amazonS3).build(), bucketExpression, produceReply); + Assert.notNull(amazonS3, "'amazonS3' must not be null"); + } + + public S3MessageHandler(S3TransferManager transferManager, String bucket) { + this(transferManager, bucket, false); + } + + public S3MessageHandler(S3TransferManager transferManager, Expression bucketExpression) { + this(transferManager, bucketExpression, false); + } + + public S3MessageHandler(S3TransferManager transferManager, String bucket, boolean produceReply) { + this(transferManager, new LiteralExpression(bucket), produceReply); + Assert.notNull(bucket, "'bucket' must not be null"); + } + + public S3MessageHandler(S3TransferManager transferManager, Expression bucketExpression, boolean produceReply) { + Assert.notNull(transferManager, "'transferManager' must not be null"); + Assert.notNull(bucketExpression, "'bucketExpression' must not be null"); + this.transferManager = transferManager; + this.bucketExpression = bucketExpression; + this.produceReply = produceReply; + } + + /** + * The SpEL expression to evaluate S3 object key at runtime against {@code requestMessage}. + * @param keyExpression the SpEL expression for S3 key. + */ + public void setKeyExpression(Expression keyExpression) { + this.keyExpression = keyExpression; + } + + /** + * Specify a {@link Command} to perform against {@link S3TransferManager}. + * @param command The {@link Command} to use. + * @see Command + */ + public void setCommand(Command command) { + Assert.notNull(command, "'command' must not be null"); + setCommandExpression(new ValueExpression<>(command)); + } + + /** + * The SpEL expression to evaluate the command to perform on {@link S3TransferManager}: {@code upload}, + * {@code download} or {@code copy}. + * @param commandExpression the SpEL expression to evaluate the {@link S3TransferManager} operation. + * @see Command + */ + public void setCommandExpression(Expression commandExpression) { + Assert.notNull(commandExpression, "'commandExpression' must not be null"); + this.commandExpression = commandExpression; + } + + /** + * The SpEL expression to evaluate the target S3 bucket for copy operation. + * @param destinationBucketExpression the SpEL expression for destination bucket. + * @see S3TransferManager#copy(CopyRequest) + */ + public void setDestinationBucketExpression(Expression destinationBucketExpression) { + this.destinationBucketExpression = destinationBucketExpression; + } + + /** + * The SpEL expression to evaluate the target S3 key for copy operation. + * @param destinationKeyExpression the SpEL expression for destination key. + * @see S3TransferManager#copy(CopyRequest) + */ + public void setDestinationKeyExpression(Expression destinationKeyExpression) { + this.destinationKeyExpression = destinationKeyExpression; + } + + /** + * Specify an {@link BiConsumer} callback to populate the metadata for upload operation, e.g. {@code Content-MD5}, + * {@code Content-Type} or any other required options. + * @param uploadMetadataProvider the {@link BiConsumer} to use for upload request option settings. + */ + public void setUploadMetadataProvider(BiConsumer> uploadMetadataProvider) { + Assert.notNull(uploadMetadataProvider, "'uploadMetadataProvider' must not be null"); + this.uploadMetadataProvider = uploadMetadataProvider; + } + + public void setTransferListener(TransferListener transferListener) { + this.transferListener = transferListener; + } + + @Override + public String getComponentType() { + return "aws:s3-outbound-channel-adapter"; + } + + @Override + protected void doInit() { + Assert.notNull(this.bucketExpression, "The 'bucketExpression' must not be null"); + super.doInit(); + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + } + + @Override + protected Object handleRequestMessage(Message requestMessage) { + Command command = this.commandExpression.getValue(this.evaluationContext, requestMessage, Command.class); + Assert.state(command != null, () -> "'commandExpression' [" + this.commandExpression.getExpressionString() + + "] cannot evaluate to null."); + + Transfer transfer = switch (command) { + case UPLOAD -> upload(requestMessage); + case DOWNLOAD -> download(requestMessage); + case COPY -> copy(requestMessage); + }; + + if (this.produceReply) { + return transfer; + } + else { + try { + transfer.completionFuture().join(); + } + catch (CompletionException ex) { + throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(requestMessage, + () -> "Failed to transfer file", ex.getCause()); + } + return null; + } + } + + private Transfer upload(Message requestMessage) { + Object payload = requestMessage.getPayload(); + String bucketName = obtainBucket(requestMessage); + + String key = null; + if (this.keyExpression != null) { + key = this.keyExpression.getValue(this.evaluationContext, requestMessage, String.class); + } + else if (payload instanceof File fileToUpload) { + key = fileToUpload.getName(); + } + + if (payload instanceof File fileToUpload && fileToUpload.isDirectory()) { + UploadDirectoryRequest.Builder uploadDirectoryRequest = UploadDirectoryRequest.builder().bucket(bucketName) + .source(fileToUpload.toPath()).s3Prefix(key); + + if (this.transferListener != null) { + uploadDirectoryRequest.uploadFileRequestTransformer( + (fileUpload) -> fileUpload.addTransferListener(this.transferListener)); + } + + return this.transferManager.uploadDirectory(uploadDirectoryRequest.build()); + } + else { + PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder() + .applyMutation((builder) -> this.uploadMetadataProvider.accept(builder, requestMessage)) + .bucket(bucketName).key(key); + + PutObjectRequest putObjectRequest = putObjectRequestBuilder.build(); + + AsyncRequestBody requestBody; + try { + if (payload instanceof InputStream inputStream) { + byte[] body = IoUtils.toByteArray(inputStream); + if (putObjectRequest.contentMD5() == null) { + putObjectRequestBuilder.contentMD5(Md5Utils.md5AsBase64(body)); + inputStream.reset(); + } + requestBody = AsyncRequestBody.fromBytes(body); + } + else if (payload instanceof File fileToUpload) { + if (putObjectRequest.contentMD5() == null) { + putObjectRequestBuilder.contentMD5(Md5Utils.md5AsBase64(fileToUpload)); + } + if (putObjectRequest.contentLength() == null) { + putObjectRequestBuilder.contentLength(fileToUpload.length()); + } + if (putObjectRequest.contentType() == null) { + putObjectRequestBuilder.contentType(Mimetype.getInstance().getMimetype(fileToUpload)); + } + requestBody = AsyncRequestBody.fromFile(fileToUpload); + } + else if (payload instanceof byte[] payloadBytes) { + if (putObjectRequest.contentMD5() == null) { + putObjectRequestBuilder.contentMD5(Md5Utils.md5AsBase64(payloadBytes)); + } + if (putObjectRequest.contentLength() == null) { + putObjectRequestBuilder.contentLength((long) payloadBytes.length); + } + requestBody = AsyncRequestBody.fromBytes(payloadBytes); + } + else { + throw new IllegalArgumentException("Unsupported payload type: [" + payload.getClass() + + "]. The only supported payloads for the upload request are " + + "java.io.File, java.io.InputStream, byte[] and PutObjectRequest."); + } + } + catch (IOException e) { + throw new MessageHandlingException(requestMessage, e); + } + + if (key == null) { + if (this.keyExpression != null) { + throw new IllegalStateException("The 'keyExpression' [" + this.keyExpression.getExpressionString() + + "] must not evaluate to null. Root object is: " + requestMessage); + } + else { + throw new IllegalStateException("Specify a 'keyExpression' for non-java.io.File payloads"); + } + } + + UploadRequest.Builder uploadRequest = UploadRequest.builder() + .putObjectRequest(putObjectRequestBuilder.build()).requestBody(requestBody); + + if (transferListener != null) { + uploadRequest.addTransferListener(transferListener); + } + + return this.transferManager.upload(uploadRequest.build()); + } + } + + private Transfer download(Message requestMessage) { + Object payload = requestMessage.getPayload(); + Assert.state(payload instanceof File, () -> "For the 'DOWNLOAD' operation the 'payload' must be of " + + "'java.io.File' type, but gotten: [" + payload.getClass() + ']'); + + File targetFile = (File) payload; + + String bucket = obtainBucket(requestMessage); + + String key = this.keyExpression != null + ? this.keyExpression.getValue(this.evaluationContext, requestMessage, String.class) + : null; + + if (targetFile.isDirectory()) { + DownloadDirectoryRequest.Builder downloadDirectoryRequest = DownloadDirectoryRequest.builder() + .bucket(bucket).destination(targetFile.toPath()) + .listObjectsV2RequestTransformer(filter -> filter.prefix(key)); + if (this.transferListener != null) { + downloadDirectoryRequest.downloadFileRequestTransformer( + (fileDownload) -> fileDownload.addTransferListener(this.transferListener)); + } + return this.transferManager.downloadDirectory(downloadDirectoryRequest.build()); + } + else { + DownloadFileRequest.Builder downloadFileRequest = DownloadFileRequest.builder().destination(targetFile) + .getObjectRequest(request -> request.bucket(bucket).key(key != null ? key : targetFile.getName())); + if (this.transferListener != null) { + downloadFileRequest.addTransferListener(this.transferListener); + } + return this.transferManager.downloadFile(downloadFileRequest.build()); + } + } + + private Transfer copy(Message requestMessage) { + String sourceBucketName = obtainBucket(requestMessage); + + String sourceKey = null; + if (this.keyExpression != null) { + sourceKey = this.keyExpression.getValue(this.evaluationContext, requestMessage, String.class); + } + + Assert.state(sourceKey != null, () -> "The 'keyExpression' must not be null for 'copy' operation " + + "and 'keyExpression' can't evaluate to null. " + "Root object is: " + requestMessage); + + String destinationBucketName = null; + if (this.destinationBucketExpression != null) { + destinationBucketName = this.destinationBucketExpression.getValue(this.evaluationContext, requestMessage, + String.class); + } + + Assert.state(destinationBucketName != null, + () -> "The 'destinationBucketExpression' must not be null for 'copy' operation " + + "and can't evaluate to null. Root object is: " + requestMessage); + + String destinationKey = null; + if (this.destinationKeyExpression != null) { + destinationKey = this.destinationKeyExpression.getValue(this.evaluationContext, requestMessage, + String.class); + } + + Assert.state(destinationKey != null, + () -> "The 'destinationKeyExpression' must not be null for 'copy' operation " + + "and can't evaluate to null. Root object is: " + requestMessage); + + CopyObjectRequest.Builder copyObjectRequest = CopyObjectRequest.builder().sourceBucket(sourceBucketName) + .sourceKey(sourceKey).destinationBucket(destinationBucketName).destinationKey(destinationKey); + + CopyRequest.Builder copyRequest = CopyRequest.builder().copyObjectRequest(copyObjectRequest.build()); + if (this.transferListener != null) { + copyRequest.addTransferListener(this.transferListener); + } + return this.transferManager.copy(copyRequest.build()); + } + + private String obtainBucket(Message requestMessage) { + String bucketName; + if (this.bucketExpression instanceof LiteralExpression) { + bucketName = (String) this.bucketExpression.getValue(); + } + else { + bucketName = this.bucketExpression.getValue(this.evaluationContext, requestMessage, String.class); + } + Assert.state(bucketName != null, () -> "The 'bucketExpression' [" + this.bucketExpression.getExpressionString() + + "] must not evaluate to null. Root object is: " + requestMessage); + + return bucketName; + } + + /** + * The {@link S3MessageHandler} mode. + * + * @see #setCommand + */ + public enum Command { + + /** + * The command to perform {@link S3TransferManager#upload} operation. + */ + UPLOAD, + + /** + * The command to perform {@link S3TransferManager#download} operation. + */ + DOWNLOAD, + + /** + * The command to perform {@link S3TransferManager#copy} operation. + */ + COPY + + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3PersistentAcceptOnceFileListFilter.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3PersistentAcceptOnceFileListFilter.java new file mode 100644 index 000000000..60ea2e219 --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3PersistentAcceptOnceFileListFilter.java @@ -0,0 +1,56 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter; +import org.springframework.integration.metadata.ConcurrentMetadataStore; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * Persistent file list filter using the server's file timestamp to detect if we've already 'seen' this file. + * + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3PersistentAcceptOnceFileListFilter extends AbstractPersistentAcceptOnceFileListFilter { + + public S3PersistentAcceptOnceFileListFilter(ConcurrentMetadataStore store, String prefix) { + super(store, prefix); + } + + @Override + protected long modified(S3Object file) { + return (file != null) ? file.lastModified().getEpochSecond() : 0L; + } + + @Override + protected String fileName(S3Object file) { + return (file != null) ? file.key() : null; + } + + /** + * Always return false since no directory notion in S3. + * @param file the {@link S3Object} + * @return always false: S3 does not have a notion of directory + * @since 2.5 + */ + @Override + protected boolean isDirectory(S3Object file) { + return false; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RegexPatternFileListFilter.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RegexPatternFileListFilter.java new file mode 100644 index 000000000..c4684c918 --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RegexPatternFileListFilter.java @@ -0,0 +1,49 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.util.regex.Pattern; +import org.springframework.integration.file.filters.AbstractRegexPatternFileListFilter; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * Implementation of {@link AbstractRegexPatternFileListFilter} for Amazon S3. + * + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3RegexPatternFileListFilter extends AbstractRegexPatternFileListFilter { + + public S3RegexPatternFileListFilter(String pattern) { + super(pattern); + } + + public S3RegexPatternFileListFilter(Pattern pattern) { + super(pattern); + } + + @Override + protected String getFilename(S3Object file) { + return (file != null) ? file.key() : null; + } + + @Override + protected boolean isDirectory(S3Object file) { + return false; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RemoteFileTemplate.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RemoteFileTemplate.java new file mode 100644 index 000000000..ece9fb05f --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RemoteFileTemplate.java @@ -0,0 +1,67 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.io.IOException; +import java.io.UncheckedIOException; +import org.springframework.integration.file.remote.ClientCallback; +import org.springframework.integration.file.remote.RemoteFileTemplate; +import org.springframework.integration.file.remote.session.SessionFactory; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * An Amazon S3 specific {@link RemoteFileTemplate} extension. + * + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3RemoteFileTemplate extends RemoteFileTemplate { + + public S3RemoteFileTemplate() { + this(new S3SessionFactory()); + } + + public S3RemoteFileTemplate(S3Client amazonS3) { + this(new S3SessionFactory(amazonS3)); + } + + /** + * Construct a {@link RemoteFileTemplate} with the supplied session factory. + * @param sessionFactory the session factory. + */ + public S3RemoteFileTemplate(SessionFactory sessionFactory) { + super(sessionFactory); + } + + @SuppressWarnings("unchecked") + @Override + public T executeWithClient(final ClientCallback callback) { + return callback.doWithClient((C) this.sessionFactory.getSession().getClientInstance()); + } + + @Override + public boolean exists(final String path) { + try { + return this.sessionFactory.getSession().exists(path); + } + catch (IOException ex) { + throw new UncheckedIOException("Failed to check the path " + path, ex); + } + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3Session.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3Session.java new file mode 100644 index 000000000..2ee9a87f8 --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3Session.java @@ -0,0 +1,261 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; +import org.springframework.beans.DirectFieldAccessor; +import org.springframework.integration.file.remote.session.Session; +import org.springframework.util.Assert; +import org.springframework.util.StreamUtils; +import org.springframework.util.StringUtils; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.utils.IoUtils; + +/** + * An Amazon S3 {@link Session} implementation. + * + * @author Artem Bilan + * @author Jim Krygowski + * @author Anwar Chirakkattil + * @author Xavier François + * @author Rogerio Lino + * + * @since 4.0 + */ +public class S3Session implements Session { + + private final S3Client amazonS3; + + private String endpoint; + + public S3Session(S3Client amazonS3) { + Assert.notNull(amazonS3, "'amazonS3' must not be null."); + this.amazonS3 = amazonS3; + } + + public void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } + + @Override + public S3Object[] list(String path) { + String[] bucketPrefix = splitPathToBucketAndKey(path, false); + + ListObjectsRequest.Builder listObjectsRequest = ListObjectsRequest.builder().bucket(bucketPrefix[0]); + if (bucketPrefix.length > 1) { + listObjectsRequest.prefix(bucketPrefix[1]); + } + + /* + * For listing objects, Amazon S3 returns up to 1,000 keys in the response. If you have more than 1,000 keys in + * your bucket, the response will be truncated. You should always check for if the response is truncated. + */ + ListObjectsResponse objectListing; + List objectSummaries = new ArrayList<>(); + do { + objectListing = this.amazonS3.listObjects(listObjectsRequest.build()); + List contents = objectListing.contents(); + objectSummaries.addAll(contents); + if (Boolean.TRUE.equals(objectListing.isTruncated())) { + listObjectsRequest.marker(contents.get(contents.size() - 1).key()); + } + } + while (Boolean.TRUE.equals(objectListing.isTruncated())); + + return objectSummaries.toArray(new S3Object[0]); + } + + @Override + public String[] listNames(String path) { + String[] bucketPrefix = splitPathToBucketAndKey(path, false); + + ListObjectsRequest.Builder listObjectsRequest = ListObjectsRequest.builder().bucket(bucketPrefix[0]); + if (bucketPrefix.length > 1) { + listObjectsRequest.prefix(bucketPrefix[1]); + } + + /* + * For listing objects, Amazon S3 returns up to 1,000 keys in the response. If you have more than 1,000 keys in + * your bucket, the response will be truncated. You should always check for if the response is truncated. + */ + ListObjectsResponse objectListing; + List names = new ArrayList<>(); + do { + objectListing = this.amazonS3.listObjects(listObjectsRequest.build()); + List contents = objectListing.contents(); + for (S3Object objectSummary : contents) { + names.add(objectSummary.key()); + } + if (Boolean.TRUE.equals(objectListing.isTruncated())) { + listObjectsRequest.marker(contents.get(contents.size() - 1).key()); + } + } + while (Boolean.TRUE.equals(objectListing.isTruncated())); + + return names.toArray(new String[0]); + } + + @Override + public boolean remove(String path) { + String[] bucketKey = splitPathToBucketAndKey(path, true); + this.amazonS3.deleteObject(request -> request.bucket(bucketKey[0]).key(bucketKey[1])); + return true; + } + + @Override + public void rename(String pathFrom, String pathTo) { + String[] bucketKeyFrom = splitPathToBucketAndKey(pathFrom, true); + String[] bucketKeyTo = splitPathToBucketAndKey(pathTo, true); + CopyObjectRequest.Builder copyRequest = CopyObjectRequest.builder().sourceBucket(bucketKeyFrom[0]) + .sourceKey(bucketKeyFrom[1]).destinationBucket(bucketKeyTo[0]).destinationKey(bucketKeyTo[1]); + this.amazonS3.copyObject(copyRequest.build()); + + // Delete the source + this.amazonS3.deleteObject(request -> request.bucket(bucketKeyFrom[0]).key(bucketKeyFrom[1])); + } + + @Override + public void read(String source, OutputStream outputStream) throws IOException { + String[] bucketKey = splitPathToBucketAndKey(source, true); + GetObjectRequest.Builder getObjectRequest = GetObjectRequest.builder().bucket(bucketKey[0]).key(bucketKey[1]); + try (InputStream inputStream = this.amazonS3.getObject(getObjectRequest.build())) { + StreamUtils.copy(inputStream, outputStream); + } + } + + @Override + public void write(InputStream inputStream, String destination) { + Assert.notNull(inputStream, "'inputStream' must not be null."); + String[] bucketKey = splitPathToBucketAndKey(destination, true); + PutObjectRequest.Builder putObjectRequest = PutObjectRequest.builder().bucket(bucketKey[0]).key(bucketKey[1]); + try { + this.amazonS3.putObject(putObjectRequest.build(), RequestBody.fromBytes(IoUtils.toByteArray(inputStream))); + } + catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + @Override + public void append(InputStream inputStream, String destination) { + throw new UnsupportedOperationException("The 'append' operation isn't supported by the Amazon S3 protocol."); + } + + @Override + public boolean mkdir(String directory) { + this.amazonS3.createBucket(request -> request.bucket(directory)); + return true; + } + + @Override + public boolean rmdir(String directory) { + this.amazonS3.deleteBucket(request -> request.bucket(directory)); + return true; + } + + @Override + public boolean exists(String path) { + String[] bucketKey = splitPathToBucketAndKey(path, true); + try { + this.amazonS3.getObjectAttributes(request -> request.bucket(bucketKey[0]).key(bucketKey[1])); + } + catch (NoSuchKeyException ex) { + return false; + } + return true; + } + + @Override + public InputStream readRaw(String source) { + String[] bucketKey = splitPathToBucketAndKey(source, true); + return this.amazonS3.getObject(request -> request.bucket(bucketKey[0]).key(bucketKey[1])); + } + + @Override + public void close() { + // No-op. This session is just direct wrapper for the AmazonS3 + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public boolean finalizeRaw() { + return true; + } + + @Override + public Object getClientInstance() { + return this.amazonS3; + } + + @Override + public String getHostPort() { + if (this.endpoint != null) { + return this.endpoint; + } + else { + synchronized (this) { + if (this.endpoint != null) { + return this.endpoint; + } + DirectFieldAccessor dfa = new DirectFieldAccessor(this.amazonS3.utilities()); + Region region = (Region) dfa.getPropertyValue("region"); + this.endpoint = String.format("%s.%s:%d", S3Client.SERVICE_NAME, region, 443); + return this.endpoint; + } + } + } + + public String normalizeBucketName(String path) { + return splitPathToBucketAndKey(path, false)[0]; + } + + private String[] splitPathToBucketAndKey(String path, boolean requireKey) { + Assert.hasText(path, "'path' must not be empty String."); + + path = StringUtils.trimLeadingCharacter(path, '/'); + + String[] bucketKey = path.split("/", 2); + + if (requireKey) { + Assert.state(bucketKey.length == 2, "'path' must in pattern [BUCKET/KEY]."); + Assert.state(bucketKey[0].length() >= 3, "S3 bucket name must be at least 3 characters long."); + } + else { + Assert.state(bucketKey.length > 0 && bucketKey[0].length() >= 3, + "S3 bucket name must be at least 3 characters long."); + } + return bucketKey; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SessionFactory.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SessionFactory.java new file mode 100644 index 000000000..f54356c9d --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SessionFactory.java @@ -0,0 +1,65 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import org.springframework.integration.file.remote.session.SessionFactory; +import org.springframework.integration.file.remote.session.SharedSessionCapable; +import org.springframework.util.Assert; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * An Amazon S3 specific {@link SessionFactory} implementation. Also, this class implements {@link SharedSessionCapable} + * around the single instance, since the {@link S3Session} is simple thread-safe wrapper for the {@link S3Client}. + * + * @author Artem Bilan + * @author Xavier François + * + * @since 4.0 + */ +public class S3SessionFactory implements SessionFactory, SharedSessionCapable { + + private final S3Session s3Session; + + public S3SessionFactory() { + this(S3Client.create()); + } + + public S3SessionFactory(S3Client amazonS3) { + Assert.notNull(amazonS3, "'amazonS3' must not be null."); + this.s3Session = new S3Session(amazonS3); + } + + @Override + public S3Session getSession() { + return this.s3Session; + } + + @Override + public boolean isSharedSession() { + return true; + } + + @Override + public void resetSharedSession() { + // No-op. The S3Session is stateless and can be used concurrently. + } + + public void setEndpoint(String endpoint) { + this.s3Session.setEndpoint(endpoint); + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SimplePatternFileListFilter.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SimplePatternFileListFilter.java new file mode 100644 index 000000000..813b41294 --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SimplePatternFileListFilter.java @@ -0,0 +1,44 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import org.springframework.integration.file.filters.AbstractSimplePatternFileListFilter; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * Implementation of {@link AbstractSimplePatternFileListFilter} for Amazon S3. + * + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3SimplePatternFileListFilter extends AbstractSimplePatternFileListFilter { + + public S3SimplePatternFileListFilter(String pattern) { + super(pattern); + } + + @Override + protected String getFilename(S3Object file) { + return (file != null) ? file.key() : null; + } + + @Override + protected boolean isDirectory(S3Object file) { + return false; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3StreamingMessageSource.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3StreamingMessageSource.java new file mode 100644 index 000000000..8942f50e9 --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3StreamingMessageSource.java @@ -0,0 +1,73 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import org.springframework.integration.file.remote.AbstractFileInfo; +import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource; +import org.springframework.integration.file.remote.RemoteFileTemplate; +import org.springframework.integration.metadata.SimpleMetadataStore; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * A {@link AbstractRemoteFileStreamingMessageSource} implementation for the Amazon S3. + * + * @author Christian Tzolov + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3StreamingMessageSource extends AbstractRemoteFileStreamingMessageSource { + + public S3StreamingMessageSource(RemoteFileTemplate template) { + super(template, null); + } + + @SuppressWarnings("this-escape") + public S3StreamingMessageSource(RemoteFileTemplate template, Comparator comparator) { + super(template, comparator); + doSetFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "s3StreamingMessageSource")); + } + + @Override + protected List> asFileInfoList(Collection collection) { + return collection.stream().map(S3FileInfo::new).collect(Collectors.toList()); + } + + @Override + public String getComponentType() { + return "aws:s3-inbound-streaming-channel-adapter"; + } + + @Override + protected AbstractFileInfo poll() { + AbstractFileInfo file = super.poll(); + if (file != null) { + S3Session s3Session = (S3Session) getRemoteFileTemplate().getSession(); + file.setRemoteDirectory(s3Session.normalizeBucketName(file.getRemoteDirectory())); + } + return file; + } + + @Override + protected boolean isDirectory(S3Object file) { + return false; + } + +} diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/LocalstackContainerTest.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/LocalstackContainerTest.java new file mode 100644 index 000000000..637c0397b --- /dev/null +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/LocalstackContainerTest.java @@ -0,0 +1,72 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3; + +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; + +/** + * The base contract for JUnit tests based on the container for Localstack. The Testcontainers 'reuse' option must be + * disabled, so Ryuk container is started and will clean all the containers up from this test suite after JVM exit. + * Since the Localstack container instance is shared via static property, it is going to be started only once per JVM; + * therefore, the target Docker container is reused automatically. + * + * @author Artem Bilan + * + * @since 4.0 + */ +@Testcontainers(disabledWithoutDocker = true) +public interface LocalstackContainerTest { + + LocalStackContainer LOCAL_STACK_CONTAINER = new LocalStackContainer( + DockerImageName.parse("localstack/localstack:4.4.0")).withEnv("S3_SKIP_SIGNATURE_VALIDATION", "0"); + + @BeforeAll + static void startContainer() { + LOCAL_STACK_CONTAINER.start(); + } + + static S3AsyncClient s3AsyncClient() { + return S3AsyncClient.crtBuilder().region(Region.of(LOCAL_STACK_CONTAINER.getRegion())) + .credentialsProvider(credentialsProvider()).endpointOverride(LOCAL_STACK_CONTAINER.getEndpoint()) + .build(); + } + + static S3Client s3Client() { + return applyAwsClientOptions(S3Client.builder()); + } + + static AwsCredentialsProvider credentialsProvider() { + return StaticCredentialsProvider.create( + AwsBasicCredentials.create(LOCAL_STACK_CONTAINER.getAccessKey(), LOCAL_STACK_CONTAINER.getSecretKey())); + } + + private static , T> T applyAwsClientOptions(B clientBuilder) { + return clientBuilder.region(Region.of(LOCAL_STACK_CONTAINER.getRegion())) + .credentialsProvider(credentialsProvider()).endpointOverride(LOCAL_STACK_CONTAINER.getEndpoint()) + .build(); + } + +} diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3PathMatchingResourcePatternResolverTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3PathMatchingResourcePatternResolverTests.java index f1147df0b..ad0d7b15d 100644 --- a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3PathMatchingResourcePatternResolverTests.java +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3PathMatchingResourcePatternResolverTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2023 the original author or authors. + * Copyright 2013-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,35 +24,23 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.PutObjectRequest; -@Testcontainers -class S3PathMatchingResourcePatternResolverTests { - private static final RequestBody requestBody = RequestBody.fromString("test-file-content"); +/** + * @author Tobias Soloschenko + * @author Artem Bilan + */ +class S3PathMatchingResourcePatternResolverTests implements LocalstackContainerTest { - @Container - static LocalStackContainer localstack = new LocalStackContainer( - DockerImageName.parse("localstack/localstack:4.4.0")); + private static final RequestBody requestBody = RequestBody.fromString("test-file-content"); private static ResourcePatternResolver resourceLoader; @BeforeAll static void beforeAll() { - // region and credentials are irrelevant for test, but must be added to make - // test work on environments without AWS cli configured - StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider - .create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())); - S3Client client = S3Client.builder().region(Region.of(localstack.getRegion())) - .credentialsProvider(credentialsProvider).endpointOverride(localstack.getEndpoint()).build(); + S3Client client = LocalstackContainerTest.s3Client(); // prepare buckets and objects for tests client.createBucket(request -> request.bucket("my-bucket")); @@ -134,4 +122,5 @@ private static ResourcePatternResolver getResourceLoader(S3Client s3Client) { loader.addProtocolResolver(new S3ProtocolResolver(s3Client)); return new S3PathMatchingResourcePatternResolver(s3Client, new PathMatchingResourcePatternResolver(loader)); } + } diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3ResourceIntegrationTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3ResourceIntegrationTests.java index 74bcd77aa..80055b2fc 100644 --- a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3ResourceIntegrationTests.java +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3ResourceIntegrationTests.java @@ -40,15 +40,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.shaded.com.google.common.io.Files; -import org.testcontainers.utility.DockerImageName; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -62,17 +55,16 @@ * * @author Maciej Walkowiak * @author Anton Perez + * @author Artem Bilan */ -@Testcontainers -class S3ResourceIntegrationTests { - private static final int DEFAULT_PART_SIZE = 5242880; +class S3ResourceIntegrationTests implements LocalstackContainerTest { - @Container - static LocalStackContainer localstack = new LocalStackContainer( - DockerImageName.parse("localstack/localstack:4.4.0")); + private static final int DEFAULT_PART_SIZE = 5242880; private static S3Client client; + private static S3AsyncClient asyncClient; + private static S3TransferManager s3TransferManager; // Required for the @TestAvailableOutputStreamProviders annotation @@ -85,14 +77,8 @@ private static Stream availableS3OutputStreamProviders() @BeforeAll static void beforeAll() { - // region and credentials are irrelevant for test, but must be added to make - // test work on environments without AWS cli configured - StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider - .create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())); - asyncClient = S3AsyncClient.builder().region(Region.of(localstack.getRegion())) - .credentialsProvider(credentialsProvider).endpointOverride(localstack.getEndpoint()).build(); - client = S3Client.builder().region(Region.of(localstack.getRegion())).credentialsProvider(credentialsProvider) - .endpointOverride(localstack.getEndpoint()).build(); + asyncClient = LocalstackContainerTest.s3AsyncClient(); + client = LocalstackContainerTest.s3Client(); s3TransferManager = S3TransferManager.builder().s3Client(asyncClient).build(); client.createBucket(request -> request.bucket("first-bucket")); } @@ -148,8 +134,8 @@ void contentLengthThrowsWhenResourceDoesNotExist(S3OutputStreamProvider s3Output @TestAvailableOutputStreamProviders void returnsResourceUrl(S3OutputStreamProvider s3OutputStreamProvider) throws IOException { S3Resource resource = s3Resource("s3://first-bucket/a-file.txt", s3OutputStreamProvider); - assertThat(resource.getURL().toString()) - .isEqualTo("http://127.0.0.1:" + localstack.getFirstMappedPort() + "/first-bucket/a-file.txt"); + assertThat(resource.getURL().toString()).isEqualTo("http://127.0.0.1:" + + LocalstackContainerTest.LOCAL_STACK_CONTAINER.getFirstMappedPort() + "/first-bucket/a-file.txt"); } @TestAvailableOutputStreamProviders @@ -162,10 +148,12 @@ void returnsEmptyUrlToBucketWhenObjectIsEmpty(S3OutputStreamProvider s3OutputStr void returnsEncodedResourceUrlAndUri(S3OutputStreamProvider s3OutputStreamProvider) throws IOException, URISyntaxException { S3Resource resource = s3Resource("s3://first-bucket/some/[objectName]", s3OutputStreamProvider); - assertThat(resource.getURL().toString()).isEqualTo( - "http://127.0.0.1:" + localstack.getFirstMappedPort() + "/first-bucket/some/%5BobjectName%5D"); + assertThat(resource.getURL().toString()) + .isEqualTo("http://127.0.0.1:" + LocalstackContainerTest.LOCAL_STACK_CONTAINER.getFirstMappedPort() + + "/first-bucket/some/%5BobjectName%5D"); assertThat(resource.getURI()).isEqualTo( - new URI("http://127.0.0.1:" + localstack.getFirstMappedPort() + "/first-bucket/some/%5BobjectName%5D")); + new URI("http://127.0.0.1:" + LocalstackContainerTest.LOCAL_STACK_CONTAINER.getFirstMappedPort() + + "/first-bucket/some/%5BobjectName%5D")); } @TestAvailableOutputStreamProviders @@ -292,6 +280,7 @@ private String retrieveContent(S3Resource resource) throws IOException { } static class Person { + private String name; public String getName() { @@ -306,6 +295,7 @@ public void setName(String name) { public String toString() { return "Person{" + "name='" + name + '\'' + '}'; } + } @Target(ElementType.METHOD) @@ -313,5 +303,7 @@ public String toString() { @ParameterizedTest @MethodSource("availableS3OutputStreamProviders") @interface TestAvailableOutputStreamProviders { + } + } diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3TemplateIntegrationTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3TemplateIntegrationTests.java index cde964ad2..f3b127b07 100644 --- a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3TemplateIntegrationTests.java +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3TemplateIntegrationTests.java @@ -42,12 +42,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.util.StreamUtils; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.http.HttpStatusCode; @@ -67,31 +61,24 @@ * @author Yuki Yoshida * @author Ziemowit Stolarczyk * @author Hardik Singh Behl + * @author Artem Bilan */ -@Testcontainers -class S3TemplateIntegrationTests { +class S3TemplateIntegrationTests implements LocalstackContainerTest { private static final String BUCKET_NAME = "test-bucket"; - @Container - static LocalStackContainer localstack = new LocalStackContainer( - DockerImageName.parse("localstack/localstack:4.4.0")).withEnv("S3_SKIP_SIGNATURE_VALIDATION", "0"); - private static S3Client client; private static S3Presigner presigner; + private S3Template s3Template; @BeforeAll static void beforeAll() { - // region and credentials are irrelevant for test, but must be added to make - // test work on environments without AWS cli configured - StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider - .create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())); - client = S3Client.builder().region(Region.of(localstack.getRegion())).credentialsProvider(credentialsProvider) - .endpointOverride(localstack.getEndpoint()).build(); - presigner = S3Presigner.builder().region(Region.of(localstack.getRegion())) - .credentialsProvider(credentialsProvider).endpointOverride(localstack.getEndpoint()).build(); + client = LocalstackContainerTest.s3Client(); + presigner = S3Presigner.builder().region(Region.of(LocalstackContainerTest.LOCAL_STACK_CONTAINER.getRegion())) + .credentialsProvider(LocalstackContainerTest.credentialsProvider()) + .endpointOverride(LocalstackContainerTest.LOCAL_STACK_CONTAINER.getEndpoint()).build(); } @BeforeEach @@ -363,7 +350,9 @@ private String calculateContentMD5(String content) { } static class Person { + private String firstName; + private String lastName; public Person() { @@ -389,6 +378,7 @@ public String getFirstName() { public void setFirstName(String firstName) { this.firstName = firstName; } + } } diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3InboundChannelAdapterTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3InboundChannelAdapterTests.java new file mode 100644 index 000000000..c5e4bbdbc --- /dev/null +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3InboundChannelAdapterTests.java @@ -0,0 +1,149 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.s3.LocalstackContainerTest; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Path; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.file.FileHeaders; +import org.springframework.integration.file.filters.AcceptOnceFileListFilter; +import org.springframework.messaging.Message; +import org.springframework.messaging.PollableChannel; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.FileCopyUtils; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; + +/** + * @author Artem Bilan + * @author Jim Krygowski + * @author Xavier François + * + * @since 4.0 + */ +@SpringJUnitConfig +@DirtiesContext +class S3InboundChannelAdapterTests implements LocalstackContainerTest { + + private static final ExpressionParser PARSER = new SpelExpressionParser(); + + private static final String S3_BUCKET = "s3-bucket"; + + private static S3Client S3; + + @TempDir + static Path TEMPORARY_FOLDER; + + private static File LOCAL_FOLDER; + + @Autowired + private PollableChannel s3FilesChannel; + + @BeforeAll + static void setup() { + S3 = LocalstackContainerTest.s3Client(); + S3.createBucket(request -> request.bucket(S3_BUCKET)); + S3.putObject(request -> request.bucket(S3_BUCKET).key("subdir/a.test"), RequestBody.fromString("Hello")); + S3.putObject(request -> request.bucket(S3_BUCKET).key("subdir/b.test"), RequestBody.fromString("Bye")); + + LOCAL_FOLDER = TEMPORARY_FOLDER.resolve("local").toFile(); + } + + @Test + void s3InboundChannelAdapter() throws IOException { + Message message = this.s3FilesChannel.receive(10000); + assertThat(message).isNotNull(); + assertThat(message.getPayload()).isInstanceOf(File.class); + File localFile = (File) message.getPayload(); + assertThat(localFile).hasName("A.TEST.a"); + + String content = FileCopyUtils.copyToString(new FileReader(localFile)); + assertThat(content).isEqualTo("Hello"); + + message = this.s3FilesChannel.receive(10000); + assertThat(message).isNotNull(); + assertThat(message.getPayload()).isInstanceOf(File.class); + localFile = (File) message.getPayload(); + assertThat(localFile).hasName("B.TEST.a"); + + content = FileCopyUtils.copyToString(new FileReader(localFile)); + assertThat(content).isEqualTo("Bye"); + + assertThat(message.getHeaders()).containsKeys(FileHeaders.REMOTE_DIRECTORY, FileHeaders.REMOTE_HOST_PORT, + FileHeaders.REMOTE_FILE); + } + + @Configuration + @EnableIntegration + public static class Config { + + @Bean + public S3SessionFactory s3SessionFactory() { + S3SessionFactory s3SessionFactory = new S3SessionFactory(S3); + s3SessionFactory.setEndpoint("s3-url.com:8000"); + return s3SessionFactory; + } + + @Bean + public S3InboundFileSynchronizer s3InboundFileSynchronizer() { + S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(s3SessionFactory()); + synchronizer.setDeleteRemoteFiles(true); + synchronizer.setPreserveTimestamp(true); + synchronizer.setRemoteDirectory(S3_BUCKET); + synchronizer.setFilter(new S3RegexPatternFileListFilter(".*\\.test$")); + Expression expression = PARSER.parseExpression( + "(#this.contains('/') ? #this.substring(#this.lastIndexOf('/') + 1) : #this).toUpperCase() + '.a'"); + synchronizer.setLocalFilenameGeneratorExpression(expression); + return synchronizer; + } + + @Bean + @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "100")) + public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() { + S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource( + s3InboundFileSynchronizer()); + messageSource.setAutoCreateLocalDirectory(true); + messageSource.setLocalDirectory(LOCAL_FOLDER); + messageSource.setLocalFilter(new AcceptOnceFileListFilter<>()); + return messageSource; + } + + @Bean + public PollableChannel s3FilesChannel() { + return new QueueChannel(); + } + + } + +} diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3MessageHandlerTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3MessageHandlerTests.java new file mode 100644 index 000000000..ab3bb582f --- /dev/null +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3MessageHandlerTests.java @@ -0,0 +1,338 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.awspring.cloud.s3.LocalstackContainerTest; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.expression.Expression; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.test.util.TestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.FileCopyUtils; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.CreateBucketResponse; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.transfer.s3.model.Copy; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; +import software.amazon.awssdk.utils.StringInputStream; + +/** + * @author Artem Bilan + * @author John Logan + * @author Jim Krygowski + * + * @since 4.0 + */ +@SpringJUnitConfig +@DirtiesContext +class S3MessageHandlerTests implements LocalstackContainerTest { + + static S3AsyncClient S3; + + // define the bucket and file names used throughout the test + static final String S3_BUCKET_NAME = "my-bucket"; + + static final String S3_FILE_KEY_BAR = "subdir/bar"; + + static final String S3_FILE_KEY_FOO = "subdir/foo"; + + static final SpelExpressionParser PARSER = new SpelExpressionParser(); + + @TempDir + static Path temporaryFolder; + + @Autowired + ContextConfiguration contextConfiguration; + + @Autowired + MessageChannel s3SendChannel; + + @Autowired + MessageChannel s3ProcessChannel; + + @Autowired + PollableChannel s3ReplyChannel; + + @Autowired + @Qualifier("s3MessageHandler") + S3MessageHandler s3MessageHandler; + + @BeforeAll + static void setup() { + S3 = LocalstackContainerTest.s3AsyncClient(); + S3.createBucket(request -> request.bucket(S3_BUCKET_NAME)).join(); + } + + @BeforeEach + void prepareBucket() { + S3.listObjects(request -> request.bucket(S3_BUCKET_NAME)).thenCompose(result -> { + if (result.hasContents()) { + return S3.deleteObjects(request -> request.bucket(S3_BUCKET_NAME) + .delete(delete -> delete.objects(result.contents().stream().map(S3Object::key) + .map(key -> ObjectIdentifier.builder().key(key).build()).toList()))); + } + else { + return CompletableFuture.completedFuture(null); + } + }).join(); + + this.contextConfiguration.transferCompletedLatch = new CountDownLatch(1); + } + + @Test + void uploadFile() throws IOException, InterruptedException { + File file = new File(temporaryFolder.toFile(), "foo.mp3"); + file.createNewFile(); + byte[] testData = "test data".getBytes(); + FileCopyUtils.copy(testData, file); + Message message = MessageBuilder.withPayload(file) + .setHeader("s3Command", S3MessageHandler.Command.UPLOAD.name()).build(); + + this.s3SendChannel.send(message); + assertThat(this.contextConfiguration.transferCompletedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + File outputFile = new File(temporaryFolder.toFile(), "outputFile1"); + + GetObjectResponse getObjectResponse = S3 + .getObject(request -> request.bucket(S3_BUCKET_NAME).key("foo.mp3"), outputFile.toPath()).join(); + + assertThat(getObjectResponse.contentLength()).isEqualTo(testData.length); + assertThat(getObjectResponse.contentType()).isEqualTo("audio/mpeg"); + + assertThat(FileCopyUtils.copyToByteArray(outputFile)).isEqualTo(testData); + } + + @Test + void uploadInputStream() throws IOException, InterruptedException { + Expression actualKeyExpression = TestUtils.getPropertyValue(this.s3MessageHandler, "keyExpression", + Expression.class); + + this.s3MessageHandler.setKeyExpression(null); + + String testData = "a"; + + InputStream payload = new StringInputStream(testData); + Message message = MessageBuilder.withPayload(payload) + .setHeader("s3Command", S3MessageHandler.Command.UPLOAD.name()).setHeader("key", "myStream").build(); + + assertThatThrownBy(() -> this.s3SendChannel.send(message)) + .hasCauseExactlyInstanceOf(IllegalStateException.class) + .hasStackTraceContaining("Specify a 'keyExpression' for non-java.io.File payloads"); + + this.s3MessageHandler.setKeyExpression(actualKeyExpression); + + this.s3SendChannel.send(message); + + assertThat(this.contextConfiguration.transferCompletedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + File outputFile = new File(temporaryFolder.toFile(), "outputFile2"); + + GetObjectResponse getObjectResponse = S3 + .getObject(request -> request.bucket(S3_BUCKET_NAME).key("myStream"), outputFile.toPath()).join(); + + assertThat(getObjectResponse.contentLength()).isEqualTo(testData.length()); + assertThat(getObjectResponse.contentType()).isEqualTo("application/json"); + assertThat(getObjectResponse.contentDisposition()).isEqualTo("test.json"); + + assertThat(FileCopyUtils.copyToByteArray(outputFile)).isEqualTo(testData.getBytes()); + } + + @Test + void uploadByteArray() throws InterruptedException, IOException { + byte[] payload = "b".getBytes(StandardCharsets.UTF_8); + Message message = MessageBuilder.withPayload(payload) + .setHeader("s3Command", S3MessageHandler.Command.UPLOAD.name()).setHeader("key", "myStream").build(); + + this.s3SendChannel.send(message); + + assertThat(this.contextConfiguration.transferCompletedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + File outputFile = new File(temporaryFolder.toFile(), "outputFile3"); + + GetObjectResponse getObjectResponse = S3 + .getObject(request -> request.bucket(S3_BUCKET_NAME).key("myStream"), outputFile.toPath()).join(); + + assertThat(getObjectResponse.contentLength()).isEqualTo(payload.length); + assertThat(getObjectResponse.contentType()).isEqualTo("application/json"); + assertThat(getObjectResponse.contentDisposition()).isEqualTo("test.json"); + + assertThat(FileCopyUtils.copyToByteArray(outputFile)).isEqualTo(payload); + } + + @Test + void downloadDirectory() throws IOException { + CompletableFuture bb = S3.putObject( + request -> request.bucket(S3_BUCKET_NAME).key(S3_FILE_KEY_BAR), AsyncRequestBody.fromString("bb")); + CompletableFuture f = S3.putObject( + request -> request.bucket(S3_BUCKET_NAME).key(S3_FILE_KEY_FOO), AsyncRequestBody.fromString("f")); + + CompletableFuture.allOf(bb, f).join(); + + File directoryForDownload = new File(temporaryFolder.toFile(), "myFolder"); + directoryForDownload.mkdir(); + Message message = MessageBuilder.withPayload(directoryForDownload) + .setHeader("s3Command", S3MessageHandler.Command.DOWNLOAD).build(); + + this.s3SendChannel.send(message); + + // get the "root" directory + File[] directoryArray = directoryForDownload.listFiles(); + assertThat(directoryArray).isNotNull(); + assertThat(directoryArray.length).isEqualTo(1); + + File subDirectory = directoryArray[0]; + assertThat(subDirectory).hasName("subdir"); + + // get the files we downloaded + File[] fileArray = subDirectory.listFiles(); + assertThat(fileArray).isNotNull(); + assertThat(fileArray.length).isEqualTo(2); + + List files = Arrays.asList(fileArray); + files.sort(Comparator.comparing(File::getName)); + + File file1 = files.get(0); + assertThat(file1).hasName(S3_FILE_KEY_BAR.split("/", 2)[1]); + assertThat(FileCopyUtils.copyToString(new FileReader(file1))).isEqualTo("bb"); + + File file2 = files.get(1); + assertThat(file2).hasName(S3_FILE_KEY_FOO.split("/", 2)[1]); + assertThat(FileCopyUtils.copyToString(new FileReader(file2))).isEqualTo("f"); + } + + @Test + void copy() throws IOException { + byte[] testData = "ff".getBytes(); + CompletableFuture mySource = S3.putObject( + request -> request.bucket(S3_BUCKET_NAME).key("mySource"), AsyncRequestBody.fromBytes(testData)); + CompletableFuture theirBucket = S3 + .createBucket(request -> request.bucket("their-bucket")); + + CompletableFuture.allOf(mySource, theirBucket).join(); + Map payload = new HashMap<>(); + payload.put("key", "mySource"); + payload.put("destination", "their-bucket"); + payload.put("destinationKey", "theirTarget"); + this.s3ProcessChannel.send(new GenericMessage<>(payload)); + + Message receive = this.s3ReplyChannel.receive(10000); + assertThat(receive).isNotNull(); + + assertThat(receive.getPayload()).isInstanceOf(Copy.class); + Copy copy = (Copy) receive.getPayload(); + + copy.completionFuture().join(); + + File outputFile = new File(temporaryFolder.toFile(), "outputFile4"); + + GetObjectResponse getObjectResponse = S3 + .getObject(request -> request.bucket("their-bucket").key("theirTarget"), outputFile.toPath()).join(); + + assertThat(getObjectResponse.contentLength()).isEqualTo(testData.length); + + assertThat(FileCopyUtils.copyToByteArray(outputFile)).isEqualTo(testData); + } + + @Configuration + @EnableIntegration + public static class ContextConfiguration { + + volatile CountDownLatch transferCompletedLatch; + + @Bean + @ServiceActivator(inputChannel = "s3SendChannel") + public MessageHandler s3MessageHandler() { + S3MessageHandler s3MessageHandler = new S3MessageHandler(S3, S3_BUCKET_NAME); + s3MessageHandler.setCommandExpression(PARSER.parseExpression("headers.s3Command")); + Expression keyExpression = PARSER.parseExpression( + "payload instanceof T(java.io.File) and !payload.directory ? payload.name : headers[key]"); + s3MessageHandler.setKeyExpression(keyExpression); + s3MessageHandler.setUploadMetadataProvider((metadata, message) -> { + if (message.getPayload() instanceof InputStream || message.getPayload() instanceof byte[]) { + metadata.contentLength(1L).contentType("application/json").contentDisposition("test.json") + .acl(ObjectCannedACL.PUBLIC_READ_WRITE); + } + }); + s3MessageHandler.setTransferListener(new TransferListener() { + + @Override + public void transferComplete(Context.TransferComplete context) { + transferCompletedLatch.countDown(); + } + + }); + return s3MessageHandler; + } + + @Bean + public PollableChannel s3ReplyChannel() { + return new QueueChannel(); + } + + @Bean + @ServiceActivator(inputChannel = "s3ProcessChannel") + public MessageHandler s3ProcessMessageHandler() { + S3MessageHandler s3MessageHandler = new S3MessageHandler(S3, S3_BUCKET_NAME, true); + s3MessageHandler.setOutputChannel(s3ReplyChannel()); + s3MessageHandler.setCommand(S3MessageHandler.Command.COPY); + s3MessageHandler.setKeyExpression(PARSER.parseExpression("payload.key")); + s3MessageHandler.setDestinationBucketExpression(PARSER.parseExpression("payload.destination")); + s3MessageHandler.setDestinationKeyExpression(PARSER.parseExpression("payload.destinationKey")); + return s3MessageHandler; + } + + } + +} diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3StreamingChannelAdapterTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3StreamingChannelAdapterTests.java new file mode 100644 index 000000000..843a85f4c --- /dev/null +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3StreamingChannelAdapterTests.java @@ -0,0 +1,120 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.s3.LocalstackContainerTest; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Comparator; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.file.FileHeaders; +import org.springframework.integration.metadata.SimpleMetadataStore; +import org.springframework.messaging.Message; +import org.springframework.messaging.PollableChannel; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * @author Christian Tzolov + * @author Artem Bilan + * + * @since 4.0 + */ +@SpringJUnitConfig +@DirtiesContext +class S3StreamingChannelAdapterTests implements LocalstackContainerTest { + + private static final String S3_BUCKET = "s3-bucket"; + + private static S3Client S3; + + @Autowired + private PollableChannel s3FilesChannel; + + @BeforeAll + static void setup() { + S3 = LocalstackContainerTest.s3Client(); + S3.createBucket(request -> request.bucket(S3_BUCKET)); + S3.putObject(request -> request.bucket(S3_BUCKET).key("subdir/a.test"), RequestBody.fromString("Hello")); + S3.putObject(request -> request.bucket(S3_BUCKET).key("subdir/b.test"), RequestBody.fromString("Bye")); + } + + @Test + void s3InboundStreamingChannelAdapter() throws IOException { + Message message = this.s3FilesChannel.receive(10000); + assertThat(message).isNotNull(); + assertThat(message.getPayload()).isInstanceOf(InputStream.class); + assertThat(message.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("subdir/a.test"); + + InputStream inputStreamA = (InputStream) message.getPayload(); + assertThat(inputStreamA).isNotNull(); + assertThat(IOUtils.toString(inputStreamA, Charset.defaultCharset())).isEqualTo("Hello"); + inputStreamA.close(); + + message = this.s3FilesChannel.receive(10000); + assertThat(message).isNotNull(); + assertThat(message.getPayload()).isInstanceOf(InputStream.class); + assertThat(message.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("subdir/b.test"); + assertThat(message.getHeaders()).containsKeys(FileHeaders.REMOTE_DIRECTORY, FileHeaders.REMOTE_HOST_PORT, + FileHeaders.REMOTE_FILE); + InputStream inputStreamB = (InputStream) message.getPayload(); + assertThat(IOUtils.toString(inputStreamB, Charset.defaultCharset())).isEqualTo("Bye"); + + assertThat(this.s3FilesChannel.receive(10)).isNull(); + + inputStreamB.close(); + } + + @Configuration + @EnableIntegration + public static class Config { + + @Bean + @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "100")) + public S3StreamingMessageSource s3InboundStreamingMessageSource() { + S3SessionFactory s3SessionFactory = new S3SessionFactory(S3); + S3RemoteFileTemplate s3FileTemplate = new S3RemoteFileTemplate(s3SessionFactory); + S3StreamingMessageSource s3MessageSource = new S3StreamingMessageSource(s3FileTemplate, + Comparator.comparing(S3Object::key)); + s3MessageSource.setRemoteDirectory("/" + S3_BUCKET + "/subdir"); + s3MessageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "streaming")); + + return s3MessageSource; + } + + @Bean + public PollableChannel s3FilesChannel() { + return new QueueChannel(); + } + + } + +} diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-s3/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-s3/pom.xml new file mode 100644 index 000000000..4c8cea3ba --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-s3/pom.xml @@ -0,0 +1,35 @@ + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + Spring Cloud AWS Starter for Spring Integration with S3 + spring-cloud-aws-starter-integration-s3 + + + + io.awspring.cloud + spring-cloud-aws-starter-s3 + + + software.amazon.awssdk + s3-transfer-manager + + + software.amazon.awssdk + aws-crt-client + + + org.springframework.integration + spring-integration-file + + + +