diff --git a/docs/src/main/asciidoc/s3.adoc b/docs/src/main/asciidoc/s3.adoc index 6a9fc318a..c69396d03 100644 --- a/docs/src/main/asciidoc/s3.adoc +++ b/docs/src/main/asciidoc/s3.adoc @@ -641,3 +641,149 @@ Sample IAM policy granting access to `spring-cloud-aws-demo` bucket: ] } ---- + +=== Spring Integration Support + +Starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] channel adapters for Amazon SQS. + +The S3 Channel Adapters are based on the `S3Client` template and `S3TransferManager`. +See their specification and Javadocs for more information. + +The S3 Inbound Channel Adapter is represented by the `S3InboundFileSynchronizingMessageSource` and allows pulling S3 objects as files from the S3 bucket to the local directory for synchronization. +This adapter is fully similar to the Inbound Channel Adapters in the FTP and SFTP Spring Integration modules. +See more information in the https://docs.spring.io/spring-integration/reference/ftp.html[FTP/FTPS Adapters Chapter] for common options or `SessionFactory`, `RemoteFileTemplate` and `FileListFilter` abstractions. + +The Java Configuration is: + +[source,java] +---- +@SpringBootApplication +public static class MyConfiguration { + + @Autowired + private S3Client amazonS3; + + @Bean + public S3InboundFileSynchronizer s3InboundFileSynchronizer() { + S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(this.amazonS3); + synchronizer.setDeleteRemoteFiles(true); + synchronizer.setPreserveTimestamp(true); + synchronizer.setRemoteDirectory(S3_BUCKET); + synchronizer.setFilter(new S3RegexPatternFileListFilter(".*\\.test$")); + Expression expression = PARSER.parseExpression("#this.toUpperCase() + '.a'"); + synchronizer.setLocalFilenameGeneratorExpression(expression); + return synchronizer; + } + + @Bean + @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "100")) + public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() { + S3InboundFileSynchronizingMessageSource messageSource = + new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer()); + messageSource.setAutoCreateLocalDirectory(true); + messageSource.setLocalDirectory(LOCAL_FOLDER); + messageSource.setLocalFilter(new AcceptOnceFileListFilter()); + return messageSource; + } + + @Bean + public PollableChannel s3FilesChannel() { + return new QueueChannel(); + } +} +---- + +With this config you receive messages with `java.io.File` `payload` from the `s3FilesChannel` after periodic synchronization of content from the Amazon S3 bucket into the local directory. + +The `S3StreamingMessageSource` adapter produces messages with payloads of type `InputStream`, allowing S3 objects to be fetched without writing to the local file system. +Since the session remains open, the consuming application is responsible for closing the session when the file has been consumed. +The session is provided in the closeableResource header (`IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE`). +Standard framework components, such as the `FileSplitter` and `StreamTransformer` will automatically close the session. + +The following Spring Boot application provides an example of configuring the S3 inbound streaming adapter using Java configuration: + +[source,java] +---- +@SpringBootApplication +public class S3JavaApplication { + + public static void main(String[] args) { + new SpringApplicationBuilder(S3JavaApplication.class) + .web(false) + .run(args); + } + + @Autowired + private S3Client amazonS3; + + @Bean + @InboundChannelAdapter(value = "s3Channel", poller = @Poller(fixedDelay = "100")) + public MessageSource s3InboundStreamingMessageSource() { + S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template()); + messageSource.setRemoteDirectory(S3_BUCKET); + messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), + "streaming")); + return messageSource; + } + + @Bean + @Transformer(inputChannel = "s3Channel", outputChannel = "data") + public org.springframework.integration.transformer.Transformer transformer() { + return new StreamTransformer(); + } + + @Bean + public S3RemoteFileTemplate template() { + return new S3RemoteFileTemplate(new S3SessionFactory(this.amazonS3)); + } + + @Bean + public PollableChannel s3Channel() { + return new QueueChannel(); + } +} +---- + +> NOTE: Unlike the non-streaming inbound channel adapter, this adapter does not prevent duplicates by default. +> If you do not delete the remote file and wish to prevent the file being processed again, you can configure an `S3PersistentFileListFilter` in the `filter` attribute. +> If you don’t want to persist the state, an in-memory `SimpleMetadataStore` can be used with the filter. +> If you wish to use a filename pattern (or regex) as well, use a `CompositeFileListFilter`. + +The `S3MessageHandler` is an Outbound Channel Adapter and allows performing `upload`, `download` and `copy` (see `S3MessageHandler.Command` enum) operations in the provided S3 bucket. + +The Java Configuration is: + +[source,java] +---- +@SpringBootApplication +public static class MyConfiguration { + + @Autowired + private S3AsyncClient amazonS3; + + @Bean + @ServiceActivator(inputChannel = "s3UploadChannel") + public MessageHandler s3MessageHandler() { + return new S3MessageHandler(this.amazonS3, "my-bucket"); + } + +} +---- + +With this config you can send a message with the `java.io.File` as `payload` and the `transferManager.upload()` operation will be performed, where the file name is used as a S3 Object key. + +See more information in the `S3MessageHandler` JavaDocs. + +NOTE: The AWS SDK recommends to use `S3CrtAsyncClient` for `S3TransferManager`, therefore an `S3AsyncClient.crtBuilder()` has to be used to achieve respective upload and download requirements, what is done internally in the `S3MessageHandler` when `S3CrtAsyncClient`-based constructor is used. + +The `S3MessageHandler` can be used as an Outbound Gateway with the `produceReply = true` constructor argument for Java Configuration. + +The "request-reply" nature of this gateway is async and the `Transfer` result from the `TransferManager` operation is sent to the `outputChannel`, assuming the transfer progress observation in the downstream flow. + +The `TransferListener` can be supplied to the `S3MessageHandler` to track the transfer progress per requests. + +See more information in the `S3MessageHandler` Javadocs. + +The Spring Integration dependency (as well as `s3-transfer-manager` and `aws-crt-client`) in the `spring-cloud-aws-s3` module are `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used. +For convenience, a dedicated `spring-cloud-aws-starter-integration-s3` is provided managing all the required dependencies for Spring Integration support with Amazon S3. + diff --git a/pom.xml b/pom.xml index 96538ab75..9372e8c2a 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ spring-cloud-aws-starters/spring-cloud-aws-starter-metrics spring-cloud-aws-starters/spring-cloud-aws-starter-parameter-store spring-cloud-aws-starters/spring-cloud-aws-starter-s3 + spring-cloud-aws-starters/spring-cloud-aws-starter-integration-s3 spring-cloud-aws-starters/spring-cloud-aws-starter-secrets-manager spring-cloud-aws-starters/spring-cloud-aws-starter-ses spring-cloud-aws-starters/spring-cloud-aws-starter-sns diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml index df111f992..367af6de3 100644 --- a/spring-cloud-aws-dependencies/pom.xml +++ b/spring-cloud-aws-dependencies/pom.xml @@ -202,6 +202,12 @@ ${project.version} + + io.awspring.cloud + spring-cloud-aws-starter-integration-s3 + ${project.version} + + io.awspring.cloud spring-cloud-aws-starter-sns diff --git a/spring-cloud-aws-s3/pom.xml b/spring-cloud-aws-s3/pom.xml index d0db70219..7044eeacd 100644 --- a/spring-cloud-aws-s3/pom.xml +++ b/spring-cloud-aws-s3/pom.xml @@ -40,6 +40,11 @@ aws-crt-client true + + org.springframework.integration + spring-integration-file + true + com.fasterxml.jackson.core jackson-databind @@ -55,6 +60,11 @@ junit-jupiter test + + org.springframework.integration + spring-integration-test + test + diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3FileInfo.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3FileInfo.java new file mode 100644 index 000000000..634f3a94a --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3FileInfo.java @@ -0,0 +1,88 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.util.Date; +import org.springframework.integration.file.remote.AbstractFileInfo; +import org.springframework.util.Assert; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * An Amazon S3 {@link org.springframework.integration.file.remote.FileInfo} implementation. + * + * @author Christian Tzolov + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3FileInfo extends AbstractFileInfo { + + private final S3Object s3Object; + + public S3FileInfo(S3Object s3Object) { + Assert.notNull(s3Object, "s3Object must not be null"); + this.s3Object = s3Object; + } + + @Override + public boolean isDirectory() { + return false; + } + + @Override + public boolean isLink() { + return false; + } + + @Override + public long getSize() { + return this.s3Object.size(); + } + + @Override + public long getModified() { + return this.s3Object.lastModified().getEpochSecond(); + } + + @Override + public String getFilename() { + return this.s3Object.key(); + } + + /** + * A permissions representation string. Throws {@link UnsupportedOperationException} to avoid extra + * {@link software.amazon.awssdk.services.s3.S3Client#getObjectAcl} REST call. The target application may choose to + * do that according to its logic. + * @return the permissions representation string. + */ + @Override + public String getPermissions() { + throw new UnsupportedOperationException("Use [AmazonS3.getObjectAcl()] to obtain permissions."); + } + + @Override + public S3Object getFileInfo() { + return this.s3Object; + } + + @Override + public String toString() { + return "FileInfo [isDirectory=" + isDirectory() + ", isLink=" + isLink() + ", Size=" + getSize() + + ", ModifiedTime=" + new Date(getModified()) + ", Filename=" + getFilename() + ", RemoteDirectory=" + + getRemoteDirectory() + "]"; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizer.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizer.java new file mode 100644 index 000000000..edc74afdc --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizer.java @@ -0,0 +1,87 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.io.File; +import java.io.IOException; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.file.remote.session.Session; +import org.springframework.integration.file.remote.session.SessionFactory; +import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer; +import org.springframework.integration.metadata.SimpleMetadataStore; +import org.springframework.lang.Nullable; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * An implementation of {@link AbstractInboundFileSynchronizer} for Amazon S3. + * + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3InboundFileSynchronizer extends AbstractInboundFileSynchronizer { + + public S3InboundFileSynchronizer() { + this(new S3SessionFactory()); + } + + public S3InboundFileSynchronizer(S3Client amazonS3) { + this(new S3SessionFactory(amazonS3)); + } + + /** + * Create a synchronizer with the {@link SessionFactory} used to acquire {@link Session} instances. + * @param sessionFactory The session factory. + */ + @SuppressWarnings("this-escape") + public S3InboundFileSynchronizer(SessionFactory sessionFactory) { + super(sessionFactory); + doSetRemoteDirectoryExpression(new LiteralExpression(null)); + doSetFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "s3MessageSource")); + } + + @Override + protected boolean isFile(S3Object file) { + return true; + } + + @Override + protected String getFilename(S3Object file) { + return (file != null ? file.key() : null); + } + + @Override + protected long getModified(S3Object file) { + return file.lastModified().getEpochSecond(); + } + + @Override + protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, + @Nullable EvaluationContext localFileEvaluationContext, S3Object remoteFile, File localDirectory, + Session session) throws IOException { + + return super.copyFileToLocalDirectory(((S3Session) session).normalizeBucketName(remoteDirectoryPath), + localFileEvaluationContext, remoteFile, localDirectory, session); + } + + @Override + protected String protocol() { + return "s3"; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizingMessageSource.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizingMessageSource.java new file mode 100644 index 000000000..cecc86a87 --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3InboundFileSynchronizingMessageSource.java @@ -0,0 +1,47 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.io.File; +import java.util.Comparator; +import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer; +import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * A {@link org.springframework.integration.core.MessageSource} implementation for the Amazon S3. + * + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3InboundFileSynchronizingMessageSource extends AbstractInboundFileSynchronizingMessageSource { + + public S3InboundFileSynchronizingMessageSource(AbstractInboundFileSynchronizer synchronizer) { + super(synchronizer); + } + + public S3InboundFileSynchronizingMessageSource(AbstractInboundFileSynchronizer synchronizer, + Comparator comparator) { + + super(synchronizer, comparator); + } + + public String getComponentType() { + return "aws:s3-inbound-channel-adapter"; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3MessageHandler.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3MessageHandler.java new file mode 100644 index 000000000..32fa24d8a --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3MessageHandler.java @@ -0,0 +1,454 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.CompletionException; +import java.util.function.BiConsumer; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.expression.ValueExpression; +import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; +import org.springframework.integration.support.utils.IntegrationUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandlingException; +import org.springframework.util.Assert; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.internal.util.Mimetype; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.CopyRequest; +import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest; +import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest; +import software.amazon.awssdk.transfer.s3.model.Transfer; +import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest; +import software.amazon.awssdk.transfer.s3.model.UploadRequest; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; +import software.amazon.awssdk.utils.IoUtils; +import software.amazon.awssdk.utils.Md5Utils; + +/** + * The {@link AbstractReplyProducingMessageHandler} implementation for the Amazon S3 services. + *

+ * The implementation is fully based on the {@link S3TransferManager} and support its {@code upload}, {@code download} + * and {@code copy} operations which can be determined by the provided or evaluated via SpEL expression at runtime + * {@link Command}. + *

+ * This {@link AbstractReplyProducingMessageHandler} can behave as a "one-way" (by default) or "request-reply" component + * according to the {@link #produceReply} constructor argument. + *

+ * The "one-way" behavior is also blocking, which is achieved with the {@link Transfer#completionFuture()} invocation. + * Consider to use an async upstream hand off if this blocking behavior isn't appropriate. + *

+ * The "request-reply" behavior is async and the {@link Transfer} result from the {@link S3TransferManager} operation is + * sent to the {@link #getOutputChannel()}, assuming the transfer progress observation in the downstream flow. + *

+ * The {@link TransferListener} can be provided to track the transfer progress. Also, see a {@link Transfer} API + * returned as a reply message from this handler. + *

+ * For the upload operation the {@link BiConsumer} callback can be supplied to populate options on a + * {@link PutObjectRequest.Builder} against request message. + *

+ * For download operation the {@code payload} must be a {@link File} instance, representing a single file for downloaded + * content or directory to download all files from the S3 virtual directory. + *

+ * An S3 Object {@code key} for upload and download can be determined by the provided {@link #keyExpression} or the + * {@link File#getName()} is used directly. The former has precedence. + *

+ * For copy operation all {@link #keyExpression}, {@link #destinationBucketExpression} and + * {@link #destinationKeyExpression} are required and must not evaluate to {@code null}. + *

+ * + * @author Artem Bilan + * @author John Logan + * + * @since 4.0 + * + * @see S3TransferManager + */ +public class S3MessageHandler extends AbstractReplyProducingMessageHandler { + + private final S3TransferManager transferManager; + + private final boolean produceReply; + + private final Expression bucketExpression; + + private EvaluationContext evaluationContext; + + private Expression keyExpression; + + private Expression destinationBucketExpression; + + private Expression destinationKeyExpression; + + private Expression commandExpression = new ValueExpression<>(Command.UPLOAD); + + private BiConsumer> uploadMetadataProvider = (builder, message) -> { + }; + + private TransferListener transferListener; + + public S3MessageHandler(S3AsyncClient amazonS3, String bucket) { + this(amazonS3, bucket, false); + } + + public S3MessageHandler(S3AsyncClient amazonS3, Expression bucketExpression) { + this(amazonS3, bucketExpression, false); + } + + public S3MessageHandler(S3AsyncClient amazonS3, String bucket, boolean produceReply) { + this(amazonS3, new LiteralExpression(bucket), produceReply); + Assert.notNull(bucket, "'bucket' must not be null"); + } + + public S3MessageHandler(S3AsyncClient amazonS3, Expression bucketExpression, boolean produceReply) { + this(S3TransferManager.builder().s3Client(amazonS3).build(), bucketExpression, produceReply); + Assert.notNull(amazonS3, "'amazonS3' must not be null"); + } + + public S3MessageHandler(S3TransferManager transferManager, String bucket) { + this(transferManager, bucket, false); + } + + public S3MessageHandler(S3TransferManager transferManager, Expression bucketExpression) { + this(transferManager, bucketExpression, false); + } + + public S3MessageHandler(S3TransferManager transferManager, String bucket, boolean produceReply) { + this(transferManager, new LiteralExpression(bucket), produceReply); + Assert.notNull(bucket, "'bucket' must not be null"); + } + + public S3MessageHandler(S3TransferManager transferManager, Expression bucketExpression, boolean produceReply) { + Assert.notNull(transferManager, "'transferManager' must not be null"); + Assert.notNull(bucketExpression, "'bucketExpression' must not be null"); + this.transferManager = transferManager; + this.bucketExpression = bucketExpression; + this.produceReply = produceReply; + } + + /** + * The SpEL expression to evaluate S3 object key at runtime against {@code requestMessage}. + * @param keyExpression the SpEL expression for S3 key. + */ + public void setKeyExpression(Expression keyExpression) { + this.keyExpression = keyExpression; + } + + /** + * Specify a {@link Command} to perform against {@link S3TransferManager}. + * @param command The {@link Command} to use. + * @see Command + */ + public void setCommand(Command command) { + Assert.notNull(command, "'command' must not be null"); + setCommandExpression(new ValueExpression<>(command)); + } + + /** + * The SpEL expression to evaluate the command to perform on {@link S3TransferManager}: {@code upload}, + * {@code download} or {@code copy}. + * @param commandExpression the SpEL expression to evaluate the {@link S3TransferManager} operation. + * @see Command + */ + public void setCommandExpression(Expression commandExpression) { + Assert.notNull(commandExpression, "'commandExpression' must not be null"); + this.commandExpression = commandExpression; + } + + /** + * The SpEL expression to evaluate the target S3 bucket for copy operation. + * @param destinationBucketExpression the SpEL expression for destination bucket. + * @see S3TransferManager#copy(CopyRequest) + */ + public void setDestinationBucketExpression(Expression destinationBucketExpression) { + this.destinationBucketExpression = destinationBucketExpression; + } + + /** + * The SpEL expression to evaluate the target S3 key for copy operation. + * @param destinationKeyExpression the SpEL expression for destination key. + * @see S3TransferManager#copy(CopyRequest) + */ + public void setDestinationKeyExpression(Expression destinationKeyExpression) { + this.destinationKeyExpression = destinationKeyExpression; + } + + /** + * Specify an {@link BiConsumer} callback to populate the metadata for upload operation, e.g. {@code Content-MD5}, + * {@code Content-Type} or any other required options. + * @param uploadMetadataProvider the {@link BiConsumer} to use for upload request option settings. + */ + public void setUploadMetadataProvider(BiConsumer> uploadMetadataProvider) { + Assert.notNull(uploadMetadataProvider, "'uploadMetadataProvider' must not be null"); + this.uploadMetadataProvider = uploadMetadataProvider; + } + + public void setTransferListener(TransferListener transferListener) { + this.transferListener = transferListener; + } + + @Override + public String getComponentType() { + return "aws:s3-outbound-channel-adapter"; + } + + @Override + protected void doInit() { + Assert.notNull(this.bucketExpression, "The 'bucketExpression' must not be null"); + super.doInit(); + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + } + + @Override + protected Object handleRequestMessage(Message requestMessage) { + Command command = this.commandExpression.getValue(this.evaluationContext, requestMessage, Command.class); + Assert.state(command != null, () -> "'commandExpression' [" + this.commandExpression.getExpressionString() + + "] cannot evaluate to null."); + + Transfer transfer = switch (command) { + case UPLOAD -> upload(requestMessage); + case DOWNLOAD -> download(requestMessage); + case COPY -> copy(requestMessage); + }; + + if (this.produceReply) { + return transfer; + } + else { + try { + transfer.completionFuture().join(); + } + catch (CompletionException ex) { + throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(requestMessage, + () -> "Failed to transfer file", ex.getCause()); + } + return null; + } + } + + private Transfer upload(Message requestMessage) { + Object payload = requestMessage.getPayload(); + String bucketName = obtainBucket(requestMessage); + + String key = null; + if (this.keyExpression != null) { + key = this.keyExpression.getValue(this.evaluationContext, requestMessage, String.class); + } + else if (payload instanceof File fileToUpload) { + key = fileToUpload.getName(); + } + + if (payload instanceof File fileToUpload && fileToUpload.isDirectory()) { + UploadDirectoryRequest.Builder uploadDirectoryRequest = UploadDirectoryRequest.builder().bucket(bucketName) + .source(fileToUpload.toPath()).s3Prefix(key); + + if (this.transferListener != null) { + uploadDirectoryRequest.uploadFileRequestTransformer( + (fileUpload) -> fileUpload.addTransferListener(this.transferListener)); + } + + return this.transferManager.uploadDirectory(uploadDirectoryRequest.build()); + } + else { + PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder() + .applyMutation((builder) -> this.uploadMetadataProvider.accept(builder, requestMessage)) + .bucket(bucketName).key(key); + + PutObjectRequest putObjectRequest = putObjectRequestBuilder.build(); + + AsyncRequestBody requestBody; + try { + if (payload instanceof InputStream inputStream) { + byte[] body = IoUtils.toByteArray(inputStream); + if (putObjectRequest.contentMD5() == null) { + putObjectRequestBuilder.contentMD5(Md5Utils.md5AsBase64(body)); + inputStream.reset(); + } + requestBody = AsyncRequestBody.fromBytes(body); + } + else if (payload instanceof File fileToUpload) { + if (putObjectRequest.contentMD5() == null) { + putObjectRequestBuilder.contentMD5(Md5Utils.md5AsBase64(fileToUpload)); + } + if (putObjectRequest.contentLength() == null) { + putObjectRequestBuilder.contentLength(fileToUpload.length()); + } + if (putObjectRequest.contentType() == null) { + putObjectRequestBuilder.contentType(Mimetype.getInstance().getMimetype(fileToUpload)); + } + requestBody = AsyncRequestBody.fromFile(fileToUpload); + } + else if (payload instanceof byte[] payloadBytes) { + if (putObjectRequest.contentMD5() == null) { + putObjectRequestBuilder.contentMD5(Md5Utils.md5AsBase64(payloadBytes)); + } + if (putObjectRequest.contentLength() == null) { + putObjectRequestBuilder.contentLength((long) payloadBytes.length); + } + requestBody = AsyncRequestBody.fromBytes(payloadBytes); + } + else { + throw new IllegalArgumentException("Unsupported payload type: [" + payload.getClass() + + "]. The only supported payloads for the upload request are " + + "java.io.File, java.io.InputStream, byte[] and PutObjectRequest."); + } + } + catch (IOException e) { + throw new MessageHandlingException(requestMessage, e); + } + + if (key == null) { + if (this.keyExpression != null) { + throw new IllegalStateException("The 'keyExpression' [" + this.keyExpression.getExpressionString() + + "] must not evaluate to null. Root object is: " + requestMessage); + } + else { + throw new IllegalStateException("Specify a 'keyExpression' for non-java.io.File payloads"); + } + } + + UploadRequest.Builder uploadRequest = UploadRequest.builder() + .putObjectRequest(putObjectRequestBuilder.build()).requestBody(requestBody); + + if (transferListener != null) { + uploadRequest.addTransferListener(transferListener); + } + + return this.transferManager.upload(uploadRequest.build()); + } + } + + private Transfer download(Message requestMessage) { + Object payload = requestMessage.getPayload(); + Assert.state(payload instanceof File, () -> "For the 'DOWNLOAD' operation the 'payload' must be of " + + "'java.io.File' type, but gotten: [" + payload.getClass() + ']'); + + File targetFile = (File) payload; + + String bucket = obtainBucket(requestMessage); + + String key = this.keyExpression != null + ? this.keyExpression.getValue(this.evaluationContext, requestMessage, String.class) + : null; + + if (targetFile.isDirectory()) { + DownloadDirectoryRequest.Builder downloadDirectoryRequest = DownloadDirectoryRequest.builder() + .bucket(bucket).destination(targetFile.toPath()) + .listObjectsV2RequestTransformer(filter -> filter.prefix(key)); + if (this.transferListener != null) { + downloadDirectoryRequest.downloadFileRequestTransformer( + (fileDownload) -> fileDownload.addTransferListener(this.transferListener)); + } + return this.transferManager.downloadDirectory(downloadDirectoryRequest.build()); + } + else { + DownloadFileRequest.Builder downloadFileRequest = DownloadFileRequest.builder().destination(targetFile) + .getObjectRequest(request -> request.bucket(bucket).key(key != null ? key : targetFile.getName())); + if (this.transferListener != null) { + downloadFileRequest.addTransferListener(this.transferListener); + } + return this.transferManager.downloadFile(downloadFileRequest.build()); + } + } + + private Transfer copy(Message requestMessage) { + String sourceBucketName = obtainBucket(requestMessage); + + String sourceKey = null; + if (this.keyExpression != null) { + sourceKey = this.keyExpression.getValue(this.evaluationContext, requestMessage, String.class); + } + + Assert.state(sourceKey != null, () -> "The 'keyExpression' must not be null for 'copy' operation " + + "and 'keyExpression' can't evaluate to null. " + "Root object is: " + requestMessage); + + String destinationBucketName = null; + if (this.destinationBucketExpression != null) { + destinationBucketName = this.destinationBucketExpression.getValue(this.evaluationContext, requestMessage, + String.class); + } + + Assert.state(destinationBucketName != null, + () -> "The 'destinationBucketExpression' must not be null for 'copy' operation " + + "and can't evaluate to null. Root object is: " + requestMessage); + + String destinationKey = null; + if (this.destinationKeyExpression != null) { + destinationKey = this.destinationKeyExpression.getValue(this.evaluationContext, requestMessage, + String.class); + } + + Assert.state(destinationKey != null, + () -> "The 'destinationKeyExpression' must not be null for 'copy' operation " + + "and can't evaluate to null. Root object is: " + requestMessage); + + CopyObjectRequest.Builder copyObjectRequest = CopyObjectRequest.builder().sourceBucket(sourceBucketName) + .sourceKey(sourceKey).destinationBucket(destinationBucketName).destinationKey(destinationKey); + + CopyRequest.Builder copyRequest = CopyRequest.builder().copyObjectRequest(copyObjectRequest.build()); + if (this.transferListener != null) { + copyRequest.addTransferListener(this.transferListener); + } + return this.transferManager.copy(copyRequest.build()); + } + + private String obtainBucket(Message requestMessage) { + String bucketName; + if (this.bucketExpression instanceof LiteralExpression) { + bucketName = (String) this.bucketExpression.getValue(); + } + else { + bucketName = this.bucketExpression.getValue(this.evaluationContext, requestMessage, String.class); + } + Assert.state(bucketName != null, () -> "The 'bucketExpression' [" + this.bucketExpression.getExpressionString() + + "] must not evaluate to null. Root object is: " + requestMessage); + + return bucketName; + } + + /** + * The {@link S3MessageHandler} mode. + * + * @see #setCommand + */ + public enum Command { + + /** + * The command to perform {@link S3TransferManager#upload} operation. + */ + UPLOAD, + + /** + * The command to perform {@link S3TransferManager#download} operation. + */ + DOWNLOAD, + + /** + * The command to perform {@link S3TransferManager#copy} operation. + */ + COPY + + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3PersistentAcceptOnceFileListFilter.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3PersistentAcceptOnceFileListFilter.java new file mode 100644 index 000000000..60ea2e219 --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3PersistentAcceptOnceFileListFilter.java @@ -0,0 +1,56 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter; +import org.springframework.integration.metadata.ConcurrentMetadataStore; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * Persistent file list filter using the server's file timestamp to detect if we've already 'seen' this file. + * + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3PersistentAcceptOnceFileListFilter extends AbstractPersistentAcceptOnceFileListFilter { + + public S3PersistentAcceptOnceFileListFilter(ConcurrentMetadataStore store, String prefix) { + super(store, prefix); + } + + @Override + protected long modified(S3Object file) { + return (file != null) ? file.lastModified().getEpochSecond() : 0L; + } + + @Override + protected String fileName(S3Object file) { + return (file != null) ? file.key() : null; + } + + /** + * Always return false since no directory notion in S3. + * @param file the {@link S3Object} + * @return always false: S3 does not have a notion of directory + * @since 2.5 + */ + @Override + protected boolean isDirectory(S3Object file) { + return false; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RegexPatternFileListFilter.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RegexPatternFileListFilter.java new file mode 100644 index 000000000..c4684c918 --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RegexPatternFileListFilter.java @@ -0,0 +1,49 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.util.regex.Pattern; +import org.springframework.integration.file.filters.AbstractRegexPatternFileListFilter; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * Implementation of {@link AbstractRegexPatternFileListFilter} for Amazon S3. + * + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3RegexPatternFileListFilter extends AbstractRegexPatternFileListFilter { + + public S3RegexPatternFileListFilter(String pattern) { + super(pattern); + } + + public S3RegexPatternFileListFilter(Pattern pattern) { + super(pattern); + } + + @Override + protected String getFilename(S3Object file) { + return (file != null) ? file.key() : null; + } + + @Override + protected boolean isDirectory(S3Object file) { + return false; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RemoteFileTemplate.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RemoteFileTemplate.java new file mode 100644 index 000000000..ece9fb05f --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3RemoteFileTemplate.java @@ -0,0 +1,67 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.io.IOException; +import java.io.UncheckedIOException; +import org.springframework.integration.file.remote.ClientCallback; +import org.springframework.integration.file.remote.RemoteFileTemplate; +import org.springframework.integration.file.remote.session.SessionFactory; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * An Amazon S3 specific {@link RemoteFileTemplate} extension. + * + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3RemoteFileTemplate extends RemoteFileTemplate { + + public S3RemoteFileTemplate() { + this(new S3SessionFactory()); + } + + public S3RemoteFileTemplate(S3Client amazonS3) { + this(new S3SessionFactory(amazonS3)); + } + + /** + * Construct a {@link RemoteFileTemplate} with the supplied session factory. + * @param sessionFactory the session factory. + */ + public S3RemoteFileTemplate(SessionFactory sessionFactory) { + super(sessionFactory); + } + + @SuppressWarnings("unchecked") + @Override + public T executeWithClient(final ClientCallback callback) { + return callback.doWithClient((C) this.sessionFactory.getSession().getClientInstance()); + } + + @Override + public boolean exists(final String path) { + try { + return this.sessionFactory.getSession().exists(path); + } + catch (IOException ex) { + throw new UncheckedIOException("Failed to check the path " + path, ex); + } + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3Session.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3Session.java new file mode 100644 index 000000000..2ee9a87f8 --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3Session.java @@ -0,0 +1,261 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; +import org.springframework.beans.DirectFieldAccessor; +import org.springframework.integration.file.remote.session.Session; +import org.springframework.util.Assert; +import org.springframework.util.StreamUtils; +import org.springframework.util.StringUtils; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.utils.IoUtils; + +/** + * An Amazon S3 {@link Session} implementation. + * + * @author Artem Bilan + * @author Jim Krygowski + * @author Anwar Chirakkattil + * @author Xavier François + * @author Rogerio Lino + * + * @since 4.0 + */ +public class S3Session implements Session { + + private final S3Client amazonS3; + + private String endpoint; + + public S3Session(S3Client amazonS3) { + Assert.notNull(amazonS3, "'amazonS3' must not be null."); + this.amazonS3 = amazonS3; + } + + public void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } + + @Override + public S3Object[] list(String path) { + String[] bucketPrefix = splitPathToBucketAndKey(path, false); + + ListObjectsRequest.Builder listObjectsRequest = ListObjectsRequest.builder().bucket(bucketPrefix[0]); + if (bucketPrefix.length > 1) { + listObjectsRequest.prefix(bucketPrefix[1]); + } + + /* + * For listing objects, Amazon S3 returns up to 1,000 keys in the response. If you have more than 1,000 keys in + * your bucket, the response will be truncated. You should always check for if the response is truncated. + */ + ListObjectsResponse objectListing; + List objectSummaries = new ArrayList<>(); + do { + objectListing = this.amazonS3.listObjects(listObjectsRequest.build()); + List contents = objectListing.contents(); + objectSummaries.addAll(contents); + if (Boolean.TRUE.equals(objectListing.isTruncated())) { + listObjectsRequest.marker(contents.get(contents.size() - 1).key()); + } + } + while (Boolean.TRUE.equals(objectListing.isTruncated())); + + return objectSummaries.toArray(new S3Object[0]); + } + + @Override + public String[] listNames(String path) { + String[] bucketPrefix = splitPathToBucketAndKey(path, false); + + ListObjectsRequest.Builder listObjectsRequest = ListObjectsRequest.builder().bucket(bucketPrefix[0]); + if (bucketPrefix.length > 1) { + listObjectsRequest.prefix(bucketPrefix[1]); + } + + /* + * For listing objects, Amazon S3 returns up to 1,000 keys in the response. If you have more than 1,000 keys in + * your bucket, the response will be truncated. You should always check for if the response is truncated. + */ + ListObjectsResponse objectListing; + List names = new ArrayList<>(); + do { + objectListing = this.amazonS3.listObjects(listObjectsRequest.build()); + List contents = objectListing.contents(); + for (S3Object objectSummary : contents) { + names.add(objectSummary.key()); + } + if (Boolean.TRUE.equals(objectListing.isTruncated())) { + listObjectsRequest.marker(contents.get(contents.size() - 1).key()); + } + } + while (Boolean.TRUE.equals(objectListing.isTruncated())); + + return names.toArray(new String[0]); + } + + @Override + public boolean remove(String path) { + String[] bucketKey = splitPathToBucketAndKey(path, true); + this.amazonS3.deleteObject(request -> request.bucket(bucketKey[0]).key(bucketKey[1])); + return true; + } + + @Override + public void rename(String pathFrom, String pathTo) { + String[] bucketKeyFrom = splitPathToBucketAndKey(pathFrom, true); + String[] bucketKeyTo = splitPathToBucketAndKey(pathTo, true); + CopyObjectRequest.Builder copyRequest = CopyObjectRequest.builder().sourceBucket(bucketKeyFrom[0]) + .sourceKey(bucketKeyFrom[1]).destinationBucket(bucketKeyTo[0]).destinationKey(bucketKeyTo[1]); + this.amazonS3.copyObject(copyRequest.build()); + + // Delete the source + this.amazonS3.deleteObject(request -> request.bucket(bucketKeyFrom[0]).key(bucketKeyFrom[1])); + } + + @Override + public void read(String source, OutputStream outputStream) throws IOException { + String[] bucketKey = splitPathToBucketAndKey(source, true); + GetObjectRequest.Builder getObjectRequest = GetObjectRequest.builder().bucket(bucketKey[0]).key(bucketKey[1]); + try (InputStream inputStream = this.amazonS3.getObject(getObjectRequest.build())) { + StreamUtils.copy(inputStream, outputStream); + } + } + + @Override + public void write(InputStream inputStream, String destination) { + Assert.notNull(inputStream, "'inputStream' must not be null."); + String[] bucketKey = splitPathToBucketAndKey(destination, true); + PutObjectRequest.Builder putObjectRequest = PutObjectRequest.builder().bucket(bucketKey[0]).key(bucketKey[1]); + try { + this.amazonS3.putObject(putObjectRequest.build(), RequestBody.fromBytes(IoUtils.toByteArray(inputStream))); + } + catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + @Override + public void append(InputStream inputStream, String destination) { + throw new UnsupportedOperationException("The 'append' operation isn't supported by the Amazon S3 protocol."); + } + + @Override + public boolean mkdir(String directory) { + this.amazonS3.createBucket(request -> request.bucket(directory)); + return true; + } + + @Override + public boolean rmdir(String directory) { + this.amazonS3.deleteBucket(request -> request.bucket(directory)); + return true; + } + + @Override + public boolean exists(String path) { + String[] bucketKey = splitPathToBucketAndKey(path, true); + try { + this.amazonS3.getObjectAttributes(request -> request.bucket(bucketKey[0]).key(bucketKey[1])); + } + catch (NoSuchKeyException ex) { + return false; + } + return true; + } + + @Override + public InputStream readRaw(String source) { + String[] bucketKey = splitPathToBucketAndKey(source, true); + return this.amazonS3.getObject(request -> request.bucket(bucketKey[0]).key(bucketKey[1])); + } + + @Override + public void close() { + // No-op. This session is just direct wrapper for the AmazonS3 + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public boolean finalizeRaw() { + return true; + } + + @Override + public Object getClientInstance() { + return this.amazonS3; + } + + @Override + public String getHostPort() { + if (this.endpoint != null) { + return this.endpoint; + } + else { + synchronized (this) { + if (this.endpoint != null) { + return this.endpoint; + } + DirectFieldAccessor dfa = new DirectFieldAccessor(this.amazonS3.utilities()); + Region region = (Region) dfa.getPropertyValue("region"); + this.endpoint = String.format("%s.%s:%d", S3Client.SERVICE_NAME, region, 443); + return this.endpoint; + } + } + } + + public String normalizeBucketName(String path) { + return splitPathToBucketAndKey(path, false)[0]; + } + + private String[] splitPathToBucketAndKey(String path, boolean requireKey) { + Assert.hasText(path, "'path' must not be empty String."); + + path = StringUtils.trimLeadingCharacter(path, '/'); + + String[] bucketKey = path.split("/", 2); + + if (requireKey) { + Assert.state(bucketKey.length == 2, "'path' must in pattern [BUCKET/KEY]."); + Assert.state(bucketKey[0].length() >= 3, "S3 bucket name must be at least 3 characters long."); + } + else { + Assert.state(bucketKey.length > 0 && bucketKey[0].length() >= 3, + "S3 bucket name must be at least 3 characters long."); + } + return bucketKey; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SessionFactory.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SessionFactory.java new file mode 100644 index 000000000..f54356c9d --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SessionFactory.java @@ -0,0 +1,65 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import org.springframework.integration.file.remote.session.SessionFactory; +import org.springframework.integration.file.remote.session.SharedSessionCapable; +import org.springframework.util.Assert; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * An Amazon S3 specific {@link SessionFactory} implementation. Also, this class implements {@link SharedSessionCapable} + * around the single instance, since the {@link S3Session} is simple thread-safe wrapper for the {@link S3Client}. + * + * @author Artem Bilan + * @author Xavier François + * + * @since 4.0 + */ +public class S3SessionFactory implements SessionFactory, SharedSessionCapable { + + private final S3Session s3Session; + + public S3SessionFactory() { + this(S3Client.create()); + } + + public S3SessionFactory(S3Client amazonS3) { + Assert.notNull(amazonS3, "'amazonS3' must not be null."); + this.s3Session = new S3Session(amazonS3); + } + + @Override + public S3Session getSession() { + return this.s3Session; + } + + @Override + public boolean isSharedSession() { + return true; + } + + @Override + public void resetSharedSession() { + // No-op. The S3Session is stateless and can be used concurrently. + } + + public void setEndpoint(String endpoint) { + this.s3Session.setEndpoint(endpoint); + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SimplePatternFileListFilter.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SimplePatternFileListFilter.java new file mode 100644 index 000000000..813b41294 --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3SimplePatternFileListFilter.java @@ -0,0 +1,44 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import org.springframework.integration.file.filters.AbstractSimplePatternFileListFilter; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * Implementation of {@link AbstractSimplePatternFileListFilter} for Amazon S3. + * + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3SimplePatternFileListFilter extends AbstractSimplePatternFileListFilter { + + public S3SimplePatternFileListFilter(String pattern) { + super(pattern); + } + + @Override + protected String getFilename(S3Object file) { + return (file != null) ? file.key() : null; + } + + @Override + protected boolean isDirectory(S3Object file) { + return false; + } + +} diff --git a/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3StreamingMessageSource.java b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3StreamingMessageSource.java new file mode 100644 index 000000000..8942f50e9 --- /dev/null +++ b/spring-cloud-aws-s3/src/main/java/io/awspring/cloud/s3/integration/S3StreamingMessageSource.java @@ -0,0 +1,73 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import org.springframework.integration.file.remote.AbstractFileInfo; +import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource; +import org.springframework.integration.file.remote.RemoteFileTemplate; +import org.springframework.integration.metadata.SimpleMetadataStore; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * A {@link AbstractRemoteFileStreamingMessageSource} implementation for the Amazon S3. + * + * @author Christian Tzolov + * @author Artem Bilan + * + * @since 4.0 + */ +public class S3StreamingMessageSource extends AbstractRemoteFileStreamingMessageSource { + + public S3StreamingMessageSource(RemoteFileTemplate template) { + super(template, null); + } + + @SuppressWarnings("this-escape") + public S3StreamingMessageSource(RemoteFileTemplate template, Comparator comparator) { + super(template, comparator); + doSetFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "s3StreamingMessageSource")); + } + + @Override + protected List> asFileInfoList(Collection collection) { + return collection.stream().map(S3FileInfo::new).collect(Collectors.toList()); + } + + @Override + public String getComponentType() { + return "aws:s3-inbound-streaming-channel-adapter"; + } + + @Override + protected AbstractFileInfo poll() { + AbstractFileInfo file = super.poll(); + if (file != null) { + S3Session s3Session = (S3Session) getRemoteFileTemplate().getSession(); + file.setRemoteDirectory(s3Session.normalizeBucketName(file.getRemoteDirectory())); + } + return file; + } + + @Override + protected boolean isDirectory(S3Object file) { + return false; + } + +} diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/LocalstackContainerTest.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/LocalstackContainerTest.java new file mode 100644 index 000000000..637c0397b --- /dev/null +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/LocalstackContainerTest.java @@ -0,0 +1,72 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3; + +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; + +/** + * The base contract for JUnit tests based on the container for Localstack. The Testcontainers 'reuse' option must be + * disabled, so Ryuk container is started and will clean all the containers up from this test suite after JVM exit. + * Since the Localstack container instance is shared via static property, it is going to be started only once per JVM; + * therefore, the target Docker container is reused automatically. + * + * @author Artem Bilan + * + * @since 4.0 + */ +@Testcontainers(disabledWithoutDocker = true) +public interface LocalstackContainerTest { + + LocalStackContainer LOCAL_STACK_CONTAINER = new LocalStackContainer( + DockerImageName.parse("localstack/localstack:4.4.0")).withEnv("S3_SKIP_SIGNATURE_VALIDATION", "0"); + + @BeforeAll + static void startContainer() { + LOCAL_STACK_CONTAINER.start(); + } + + static S3AsyncClient s3AsyncClient() { + return S3AsyncClient.crtBuilder().region(Region.of(LOCAL_STACK_CONTAINER.getRegion())) + .credentialsProvider(credentialsProvider()).endpointOverride(LOCAL_STACK_CONTAINER.getEndpoint()) + .build(); + } + + static S3Client s3Client() { + return applyAwsClientOptions(S3Client.builder()); + } + + static AwsCredentialsProvider credentialsProvider() { + return StaticCredentialsProvider.create( + AwsBasicCredentials.create(LOCAL_STACK_CONTAINER.getAccessKey(), LOCAL_STACK_CONTAINER.getSecretKey())); + } + + private static , T> T applyAwsClientOptions(B clientBuilder) { + return clientBuilder.region(Region.of(LOCAL_STACK_CONTAINER.getRegion())) + .credentialsProvider(credentialsProvider()).endpointOverride(LOCAL_STACK_CONTAINER.getEndpoint()) + .build(); + } + +} diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3PathMatchingResourcePatternResolverTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3PathMatchingResourcePatternResolverTests.java index f1147df0b..ad0d7b15d 100644 --- a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3PathMatchingResourcePatternResolverTests.java +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3PathMatchingResourcePatternResolverTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2023 the original author or authors. + * Copyright 2013-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,35 +24,23 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.PutObjectRequest; -@Testcontainers -class S3PathMatchingResourcePatternResolverTests { - private static final RequestBody requestBody = RequestBody.fromString("test-file-content"); +/** + * @author Tobias Soloschenko + * @author Artem Bilan + */ +class S3PathMatchingResourcePatternResolverTests implements LocalstackContainerTest { - @Container - static LocalStackContainer localstack = new LocalStackContainer( - DockerImageName.parse("localstack/localstack:4.4.0")); + private static final RequestBody requestBody = RequestBody.fromString("test-file-content"); private static ResourcePatternResolver resourceLoader; @BeforeAll static void beforeAll() { - // region and credentials are irrelevant for test, but must be added to make - // test work on environments without AWS cli configured - StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider - .create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())); - S3Client client = S3Client.builder().region(Region.of(localstack.getRegion())) - .credentialsProvider(credentialsProvider).endpointOverride(localstack.getEndpoint()).build(); + S3Client client = LocalstackContainerTest.s3Client(); // prepare buckets and objects for tests client.createBucket(request -> request.bucket("my-bucket")); @@ -134,4 +122,5 @@ private static ResourcePatternResolver getResourceLoader(S3Client s3Client) { loader.addProtocolResolver(new S3ProtocolResolver(s3Client)); return new S3PathMatchingResourcePatternResolver(s3Client, new PathMatchingResourcePatternResolver(loader)); } + } diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3ResourceIntegrationTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3ResourceIntegrationTests.java index 74bcd77aa..80055b2fc 100644 --- a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3ResourceIntegrationTests.java +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3ResourceIntegrationTests.java @@ -40,15 +40,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.shaded.com.google.common.io.Files; -import org.testcontainers.utility.DockerImageName; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -62,17 +55,16 @@ * * @author Maciej Walkowiak * @author Anton Perez + * @author Artem Bilan */ -@Testcontainers -class S3ResourceIntegrationTests { - private static final int DEFAULT_PART_SIZE = 5242880; +class S3ResourceIntegrationTests implements LocalstackContainerTest { - @Container - static LocalStackContainer localstack = new LocalStackContainer( - DockerImageName.parse("localstack/localstack:4.4.0")); + private static final int DEFAULT_PART_SIZE = 5242880; private static S3Client client; + private static S3AsyncClient asyncClient; + private static S3TransferManager s3TransferManager; // Required for the @TestAvailableOutputStreamProviders annotation @@ -85,14 +77,8 @@ private static Stream availableS3OutputStreamProviders() @BeforeAll static void beforeAll() { - // region and credentials are irrelevant for test, but must be added to make - // test work on environments without AWS cli configured - StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider - .create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())); - asyncClient = S3AsyncClient.builder().region(Region.of(localstack.getRegion())) - .credentialsProvider(credentialsProvider).endpointOverride(localstack.getEndpoint()).build(); - client = S3Client.builder().region(Region.of(localstack.getRegion())).credentialsProvider(credentialsProvider) - .endpointOverride(localstack.getEndpoint()).build(); + asyncClient = LocalstackContainerTest.s3AsyncClient(); + client = LocalstackContainerTest.s3Client(); s3TransferManager = S3TransferManager.builder().s3Client(asyncClient).build(); client.createBucket(request -> request.bucket("first-bucket")); } @@ -148,8 +134,8 @@ void contentLengthThrowsWhenResourceDoesNotExist(S3OutputStreamProvider s3Output @TestAvailableOutputStreamProviders void returnsResourceUrl(S3OutputStreamProvider s3OutputStreamProvider) throws IOException { S3Resource resource = s3Resource("s3://first-bucket/a-file.txt", s3OutputStreamProvider); - assertThat(resource.getURL().toString()) - .isEqualTo("http://127.0.0.1:" + localstack.getFirstMappedPort() + "/first-bucket/a-file.txt"); + assertThat(resource.getURL().toString()).isEqualTo("http://127.0.0.1:" + + LocalstackContainerTest.LOCAL_STACK_CONTAINER.getFirstMappedPort() + "/first-bucket/a-file.txt"); } @TestAvailableOutputStreamProviders @@ -162,10 +148,12 @@ void returnsEmptyUrlToBucketWhenObjectIsEmpty(S3OutputStreamProvider s3OutputStr void returnsEncodedResourceUrlAndUri(S3OutputStreamProvider s3OutputStreamProvider) throws IOException, URISyntaxException { S3Resource resource = s3Resource("s3://first-bucket/some/[objectName]", s3OutputStreamProvider); - assertThat(resource.getURL().toString()).isEqualTo( - "http://127.0.0.1:" + localstack.getFirstMappedPort() + "/first-bucket/some/%5BobjectName%5D"); + assertThat(resource.getURL().toString()) + .isEqualTo("http://127.0.0.1:" + LocalstackContainerTest.LOCAL_STACK_CONTAINER.getFirstMappedPort() + + "/first-bucket/some/%5BobjectName%5D"); assertThat(resource.getURI()).isEqualTo( - new URI("http://127.0.0.1:" + localstack.getFirstMappedPort() + "/first-bucket/some/%5BobjectName%5D")); + new URI("http://127.0.0.1:" + LocalstackContainerTest.LOCAL_STACK_CONTAINER.getFirstMappedPort() + + "/first-bucket/some/%5BobjectName%5D")); } @TestAvailableOutputStreamProviders @@ -292,6 +280,7 @@ private String retrieveContent(S3Resource resource) throws IOException { } static class Person { + private String name; public String getName() { @@ -306,6 +295,7 @@ public void setName(String name) { public String toString() { return "Person{" + "name='" + name + '\'' + '}'; } + } @Target(ElementType.METHOD) @@ -313,5 +303,7 @@ public String toString() { @ParameterizedTest @MethodSource("availableS3OutputStreamProviders") @interface TestAvailableOutputStreamProviders { + } + } diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3TemplateIntegrationTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3TemplateIntegrationTests.java index cde964ad2..f3b127b07 100644 --- a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3TemplateIntegrationTests.java +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/S3TemplateIntegrationTests.java @@ -42,12 +42,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.util.StreamUtils; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.http.HttpStatusCode; @@ -67,31 +61,24 @@ * @author Yuki Yoshida * @author Ziemowit Stolarczyk * @author Hardik Singh Behl + * @author Artem Bilan */ -@Testcontainers -class S3TemplateIntegrationTests { +class S3TemplateIntegrationTests implements LocalstackContainerTest { private static final String BUCKET_NAME = "test-bucket"; - @Container - static LocalStackContainer localstack = new LocalStackContainer( - DockerImageName.parse("localstack/localstack:4.4.0")).withEnv("S3_SKIP_SIGNATURE_VALIDATION", "0"); - private static S3Client client; private static S3Presigner presigner; + private S3Template s3Template; @BeforeAll static void beforeAll() { - // region and credentials are irrelevant for test, but must be added to make - // test work on environments without AWS cli configured - StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider - .create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())); - client = S3Client.builder().region(Region.of(localstack.getRegion())).credentialsProvider(credentialsProvider) - .endpointOverride(localstack.getEndpoint()).build(); - presigner = S3Presigner.builder().region(Region.of(localstack.getRegion())) - .credentialsProvider(credentialsProvider).endpointOverride(localstack.getEndpoint()).build(); + client = LocalstackContainerTest.s3Client(); + presigner = S3Presigner.builder().region(Region.of(LocalstackContainerTest.LOCAL_STACK_CONTAINER.getRegion())) + .credentialsProvider(LocalstackContainerTest.credentialsProvider()) + .endpointOverride(LocalstackContainerTest.LOCAL_STACK_CONTAINER.getEndpoint()).build(); } @BeforeEach @@ -363,7 +350,9 @@ private String calculateContentMD5(String content) { } static class Person { + private String firstName; + private String lastName; public Person() { @@ -389,6 +378,7 @@ public String getFirstName() { public void setFirstName(String firstName) { this.firstName = firstName; } + } } diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3InboundChannelAdapterTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3InboundChannelAdapterTests.java new file mode 100644 index 000000000..c5e4bbdbc --- /dev/null +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3InboundChannelAdapterTests.java @@ -0,0 +1,149 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.s3.LocalstackContainerTest; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Path; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.file.FileHeaders; +import org.springframework.integration.file.filters.AcceptOnceFileListFilter; +import org.springframework.messaging.Message; +import org.springframework.messaging.PollableChannel; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.FileCopyUtils; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; + +/** + * @author Artem Bilan + * @author Jim Krygowski + * @author Xavier François + * + * @since 4.0 + */ +@SpringJUnitConfig +@DirtiesContext +class S3InboundChannelAdapterTests implements LocalstackContainerTest { + + private static final ExpressionParser PARSER = new SpelExpressionParser(); + + private static final String S3_BUCKET = "s3-bucket"; + + private static S3Client S3; + + @TempDir + static Path TEMPORARY_FOLDER; + + private static File LOCAL_FOLDER; + + @Autowired + private PollableChannel s3FilesChannel; + + @BeforeAll + static void setup() { + S3 = LocalstackContainerTest.s3Client(); + S3.createBucket(request -> request.bucket(S3_BUCKET)); + S3.putObject(request -> request.bucket(S3_BUCKET).key("subdir/a.test"), RequestBody.fromString("Hello")); + S3.putObject(request -> request.bucket(S3_BUCKET).key("subdir/b.test"), RequestBody.fromString("Bye")); + + LOCAL_FOLDER = TEMPORARY_FOLDER.resolve("local").toFile(); + } + + @Test + void s3InboundChannelAdapter() throws IOException { + Message message = this.s3FilesChannel.receive(10000); + assertThat(message).isNotNull(); + assertThat(message.getPayload()).isInstanceOf(File.class); + File localFile = (File) message.getPayload(); + assertThat(localFile).hasName("A.TEST.a"); + + String content = FileCopyUtils.copyToString(new FileReader(localFile)); + assertThat(content).isEqualTo("Hello"); + + message = this.s3FilesChannel.receive(10000); + assertThat(message).isNotNull(); + assertThat(message.getPayload()).isInstanceOf(File.class); + localFile = (File) message.getPayload(); + assertThat(localFile).hasName("B.TEST.a"); + + content = FileCopyUtils.copyToString(new FileReader(localFile)); + assertThat(content).isEqualTo("Bye"); + + assertThat(message.getHeaders()).containsKeys(FileHeaders.REMOTE_DIRECTORY, FileHeaders.REMOTE_HOST_PORT, + FileHeaders.REMOTE_FILE); + } + + @Configuration + @EnableIntegration + public static class Config { + + @Bean + public S3SessionFactory s3SessionFactory() { + S3SessionFactory s3SessionFactory = new S3SessionFactory(S3); + s3SessionFactory.setEndpoint("s3-url.com:8000"); + return s3SessionFactory; + } + + @Bean + public S3InboundFileSynchronizer s3InboundFileSynchronizer() { + S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(s3SessionFactory()); + synchronizer.setDeleteRemoteFiles(true); + synchronizer.setPreserveTimestamp(true); + synchronizer.setRemoteDirectory(S3_BUCKET); + synchronizer.setFilter(new S3RegexPatternFileListFilter(".*\\.test$")); + Expression expression = PARSER.parseExpression( + "(#this.contains('/') ? #this.substring(#this.lastIndexOf('/') + 1) : #this).toUpperCase() + '.a'"); + synchronizer.setLocalFilenameGeneratorExpression(expression); + return synchronizer; + } + + @Bean + @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "100")) + public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() { + S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource( + s3InboundFileSynchronizer()); + messageSource.setAutoCreateLocalDirectory(true); + messageSource.setLocalDirectory(LOCAL_FOLDER); + messageSource.setLocalFilter(new AcceptOnceFileListFilter<>()); + return messageSource; + } + + @Bean + public PollableChannel s3FilesChannel() { + return new QueueChannel(); + } + + } + +} diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3MessageHandlerTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3MessageHandlerTests.java new file mode 100644 index 000000000..ab3bb582f --- /dev/null +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3MessageHandlerTests.java @@ -0,0 +1,338 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.awspring.cloud.s3.LocalstackContainerTest; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.expression.Expression; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.test.util.TestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.FileCopyUtils; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.CreateBucketResponse; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.transfer.s3.model.Copy; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; +import software.amazon.awssdk.utils.StringInputStream; + +/** + * @author Artem Bilan + * @author John Logan + * @author Jim Krygowski + * + * @since 4.0 + */ +@SpringJUnitConfig +@DirtiesContext +class S3MessageHandlerTests implements LocalstackContainerTest { + + static S3AsyncClient S3; + + // define the bucket and file names used throughout the test + static final String S3_BUCKET_NAME = "my-bucket"; + + static final String S3_FILE_KEY_BAR = "subdir/bar"; + + static final String S3_FILE_KEY_FOO = "subdir/foo"; + + static final SpelExpressionParser PARSER = new SpelExpressionParser(); + + @TempDir + static Path temporaryFolder; + + @Autowired + ContextConfiguration contextConfiguration; + + @Autowired + MessageChannel s3SendChannel; + + @Autowired + MessageChannel s3ProcessChannel; + + @Autowired + PollableChannel s3ReplyChannel; + + @Autowired + @Qualifier("s3MessageHandler") + S3MessageHandler s3MessageHandler; + + @BeforeAll + static void setup() { + S3 = LocalstackContainerTest.s3AsyncClient(); + S3.createBucket(request -> request.bucket(S3_BUCKET_NAME)).join(); + } + + @BeforeEach + void prepareBucket() { + S3.listObjects(request -> request.bucket(S3_BUCKET_NAME)).thenCompose(result -> { + if (result.hasContents()) { + return S3.deleteObjects(request -> request.bucket(S3_BUCKET_NAME) + .delete(delete -> delete.objects(result.contents().stream().map(S3Object::key) + .map(key -> ObjectIdentifier.builder().key(key).build()).toList()))); + } + else { + return CompletableFuture.completedFuture(null); + } + }).join(); + + this.contextConfiguration.transferCompletedLatch = new CountDownLatch(1); + } + + @Test + void uploadFile() throws IOException, InterruptedException { + File file = new File(temporaryFolder.toFile(), "foo.mp3"); + file.createNewFile(); + byte[] testData = "test data".getBytes(); + FileCopyUtils.copy(testData, file); + Message message = MessageBuilder.withPayload(file) + .setHeader("s3Command", S3MessageHandler.Command.UPLOAD.name()).build(); + + this.s3SendChannel.send(message); + assertThat(this.contextConfiguration.transferCompletedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + File outputFile = new File(temporaryFolder.toFile(), "outputFile1"); + + GetObjectResponse getObjectResponse = S3 + .getObject(request -> request.bucket(S3_BUCKET_NAME).key("foo.mp3"), outputFile.toPath()).join(); + + assertThat(getObjectResponse.contentLength()).isEqualTo(testData.length); + assertThat(getObjectResponse.contentType()).isEqualTo("audio/mpeg"); + + assertThat(FileCopyUtils.copyToByteArray(outputFile)).isEqualTo(testData); + } + + @Test + void uploadInputStream() throws IOException, InterruptedException { + Expression actualKeyExpression = TestUtils.getPropertyValue(this.s3MessageHandler, "keyExpression", + Expression.class); + + this.s3MessageHandler.setKeyExpression(null); + + String testData = "a"; + + InputStream payload = new StringInputStream(testData); + Message message = MessageBuilder.withPayload(payload) + .setHeader("s3Command", S3MessageHandler.Command.UPLOAD.name()).setHeader("key", "myStream").build(); + + assertThatThrownBy(() -> this.s3SendChannel.send(message)) + .hasCauseExactlyInstanceOf(IllegalStateException.class) + .hasStackTraceContaining("Specify a 'keyExpression' for non-java.io.File payloads"); + + this.s3MessageHandler.setKeyExpression(actualKeyExpression); + + this.s3SendChannel.send(message); + + assertThat(this.contextConfiguration.transferCompletedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + File outputFile = new File(temporaryFolder.toFile(), "outputFile2"); + + GetObjectResponse getObjectResponse = S3 + .getObject(request -> request.bucket(S3_BUCKET_NAME).key("myStream"), outputFile.toPath()).join(); + + assertThat(getObjectResponse.contentLength()).isEqualTo(testData.length()); + assertThat(getObjectResponse.contentType()).isEqualTo("application/json"); + assertThat(getObjectResponse.contentDisposition()).isEqualTo("test.json"); + + assertThat(FileCopyUtils.copyToByteArray(outputFile)).isEqualTo(testData.getBytes()); + } + + @Test + void uploadByteArray() throws InterruptedException, IOException { + byte[] payload = "b".getBytes(StandardCharsets.UTF_8); + Message message = MessageBuilder.withPayload(payload) + .setHeader("s3Command", S3MessageHandler.Command.UPLOAD.name()).setHeader("key", "myStream").build(); + + this.s3SendChannel.send(message); + + assertThat(this.contextConfiguration.transferCompletedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + File outputFile = new File(temporaryFolder.toFile(), "outputFile3"); + + GetObjectResponse getObjectResponse = S3 + .getObject(request -> request.bucket(S3_BUCKET_NAME).key("myStream"), outputFile.toPath()).join(); + + assertThat(getObjectResponse.contentLength()).isEqualTo(payload.length); + assertThat(getObjectResponse.contentType()).isEqualTo("application/json"); + assertThat(getObjectResponse.contentDisposition()).isEqualTo("test.json"); + + assertThat(FileCopyUtils.copyToByteArray(outputFile)).isEqualTo(payload); + } + + @Test + void downloadDirectory() throws IOException { + CompletableFuture bb = S3.putObject( + request -> request.bucket(S3_BUCKET_NAME).key(S3_FILE_KEY_BAR), AsyncRequestBody.fromString("bb")); + CompletableFuture f = S3.putObject( + request -> request.bucket(S3_BUCKET_NAME).key(S3_FILE_KEY_FOO), AsyncRequestBody.fromString("f")); + + CompletableFuture.allOf(bb, f).join(); + + File directoryForDownload = new File(temporaryFolder.toFile(), "myFolder"); + directoryForDownload.mkdir(); + Message message = MessageBuilder.withPayload(directoryForDownload) + .setHeader("s3Command", S3MessageHandler.Command.DOWNLOAD).build(); + + this.s3SendChannel.send(message); + + // get the "root" directory + File[] directoryArray = directoryForDownload.listFiles(); + assertThat(directoryArray).isNotNull(); + assertThat(directoryArray.length).isEqualTo(1); + + File subDirectory = directoryArray[0]; + assertThat(subDirectory).hasName("subdir"); + + // get the files we downloaded + File[] fileArray = subDirectory.listFiles(); + assertThat(fileArray).isNotNull(); + assertThat(fileArray.length).isEqualTo(2); + + List files = Arrays.asList(fileArray); + files.sort(Comparator.comparing(File::getName)); + + File file1 = files.get(0); + assertThat(file1).hasName(S3_FILE_KEY_BAR.split("/", 2)[1]); + assertThat(FileCopyUtils.copyToString(new FileReader(file1))).isEqualTo("bb"); + + File file2 = files.get(1); + assertThat(file2).hasName(S3_FILE_KEY_FOO.split("/", 2)[1]); + assertThat(FileCopyUtils.copyToString(new FileReader(file2))).isEqualTo("f"); + } + + @Test + void copy() throws IOException { + byte[] testData = "ff".getBytes(); + CompletableFuture mySource = S3.putObject( + request -> request.bucket(S3_BUCKET_NAME).key("mySource"), AsyncRequestBody.fromBytes(testData)); + CompletableFuture theirBucket = S3 + .createBucket(request -> request.bucket("their-bucket")); + + CompletableFuture.allOf(mySource, theirBucket).join(); + Map payload = new HashMap<>(); + payload.put("key", "mySource"); + payload.put("destination", "their-bucket"); + payload.put("destinationKey", "theirTarget"); + this.s3ProcessChannel.send(new GenericMessage<>(payload)); + + Message receive = this.s3ReplyChannel.receive(10000); + assertThat(receive).isNotNull(); + + assertThat(receive.getPayload()).isInstanceOf(Copy.class); + Copy copy = (Copy) receive.getPayload(); + + copy.completionFuture().join(); + + File outputFile = new File(temporaryFolder.toFile(), "outputFile4"); + + GetObjectResponse getObjectResponse = S3 + .getObject(request -> request.bucket("their-bucket").key("theirTarget"), outputFile.toPath()).join(); + + assertThat(getObjectResponse.contentLength()).isEqualTo(testData.length); + + assertThat(FileCopyUtils.copyToByteArray(outputFile)).isEqualTo(testData); + } + + @Configuration + @EnableIntegration + public static class ContextConfiguration { + + volatile CountDownLatch transferCompletedLatch; + + @Bean + @ServiceActivator(inputChannel = "s3SendChannel") + public MessageHandler s3MessageHandler() { + S3MessageHandler s3MessageHandler = new S3MessageHandler(S3, S3_BUCKET_NAME); + s3MessageHandler.setCommandExpression(PARSER.parseExpression("headers.s3Command")); + Expression keyExpression = PARSER.parseExpression( + "payload instanceof T(java.io.File) and !payload.directory ? payload.name : headers[key]"); + s3MessageHandler.setKeyExpression(keyExpression); + s3MessageHandler.setUploadMetadataProvider((metadata, message) -> { + if (message.getPayload() instanceof InputStream || message.getPayload() instanceof byte[]) { + metadata.contentLength(1L).contentType("application/json").contentDisposition("test.json") + .acl(ObjectCannedACL.PUBLIC_READ_WRITE); + } + }); + s3MessageHandler.setTransferListener(new TransferListener() { + + @Override + public void transferComplete(Context.TransferComplete context) { + transferCompletedLatch.countDown(); + } + + }); + return s3MessageHandler; + } + + @Bean + public PollableChannel s3ReplyChannel() { + return new QueueChannel(); + } + + @Bean + @ServiceActivator(inputChannel = "s3ProcessChannel") + public MessageHandler s3ProcessMessageHandler() { + S3MessageHandler s3MessageHandler = new S3MessageHandler(S3, S3_BUCKET_NAME, true); + s3MessageHandler.setOutputChannel(s3ReplyChannel()); + s3MessageHandler.setCommand(S3MessageHandler.Command.COPY); + s3MessageHandler.setKeyExpression(PARSER.parseExpression("payload.key")); + s3MessageHandler.setDestinationBucketExpression(PARSER.parseExpression("payload.destination")); + s3MessageHandler.setDestinationKeyExpression(PARSER.parseExpression("payload.destinationKey")); + return s3MessageHandler; + } + + } + +} diff --git a/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3StreamingChannelAdapterTests.java b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3StreamingChannelAdapterTests.java new file mode 100644 index 000000000..843a85f4c --- /dev/null +++ b/spring-cloud-aws-s3/src/test/java/io/awspring/cloud/s3/integration/S3StreamingChannelAdapterTests.java @@ -0,0 +1,120 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.s3.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.s3.LocalstackContainerTest; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Comparator; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.file.FileHeaders; +import org.springframework.integration.metadata.SimpleMetadataStore; +import org.springframework.messaging.Message; +import org.springframework.messaging.PollableChannel; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.S3Object; + +/** + * @author Christian Tzolov + * @author Artem Bilan + * + * @since 4.0 + */ +@SpringJUnitConfig +@DirtiesContext +class S3StreamingChannelAdapterTests implements LocalstackContainerTest { + + private static final String S3_BUCKET = "s3-bucket"; + + private static S3Client S3; + + @Autowired + private PollableChannel s3FilesChannel; + + @BeforeAll + static void setup() { + S3 = LocalstackContainerTest.s3Client(); + S3.createBucket(request -> request.bucket(S3_BUCKET)); + S3.putObject(request -> request.bucket(S3_BUCKET).key("subdir/a.test"), RequestBody.fromString("Hello")); + S3.putObject(request -> request.bucket(S3_BUCKET).key("subdir/b.test"), RequestBody.fromString("Bye")); + } + + @Test + void s3InboundStreamingChannelAdapter() throws IOException { + Message message = this.s3FilesChannel.receive(10000); + assertThat(message).isNotNull(); + assertThat(message.getPayload()).isInstanceOf(InputStream.class); + assertThat(message.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("subdir/a.test"); + + InputStream inputStreamA = (InputStream) message.getPayload(); + assertThat(inputStreamA).isNotNull(); + assertThat(IOUtils.toString(inputStreamA, Charset.defaultCharset())).isEqualTo("Hello"); + inputStreamA.close(); + + message = this.s3FilesChannel.receive(10000); + assertThat(message).isNotNull(); + assertThat(message.getPayload()).isInstanceOf(InputStream.class); + assertThat(message.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("subdir/b.test"); + assertThat(message.getHeaders()).containsKeys(FileHeaders.REMOTE_DIRECTORY, FileHeaders.REMOTE_HOST_PORT, + FileHeaders.REMOTE_FILE); + InputStream inputStreamB = (InputStream) message.getPayload(); + assertThat(IOUtils.toString(inputStreamB, Charset.defaultCharset())).isEqualTo("Bye"); + + assertThat(this.s3FilesChannel.receive(10)).isNull(); + + inputStreamB.close(); + } + + @Configuration + @EnableIntegration + public static class Config { + + @Bean + @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "100")) + public S3StreamingMessageSource s3InboundStreamingMessageSource() { + S3SessionFactory s3SessionFactory = new S3SessionFactory(S3); + S3RemoteFileTemplate s3FileTemplate = new S3RemoteFileTemplate(s3SessionFactory); + S3StreamingMessageSource s3MessageSource = new S3StreamingMessageSource(s3FileTemplate, + Comparator.comparing(S3Object::key)); + s3MessageSource.setRemoteDirectory("/" + S3_BUCKET + "/subdir"); + s3MessageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "streaming")); + + return s3MessageSource; + } + + @Bean + public PollableChannel s3FilesChannel() { + return new QueueChannel(); + } + + } + +} diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-s3/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-s3/pom.xml new file mode 100644 index 000000000..4c8cea3ba --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-s3/pom.xml @@ -0,0 +1,35 @@ + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + Spring Cloud AWS Starter for Spring Integration with S3 + spring-cloud-aws-starter-integration-s3 + + + + io.awspring.cloud + spring-cloud-aws-starter-s3 + + + software.amazon.awssdk + s3-transfer-manager + + + software.amazon.awssdk + aws-crt-client + + + org.springframework.integration + spring-integration-file + + + +