Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions docs/src/main/asciidoc/s3.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,149 @@ Sample IAM policy granting access to `spring-cloud-aws-demo` bucket:
]
}
----

=== Spring Integration Support

Starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] channel adapters for Amazon SQS.

The S3 Channel Adapters are based on the `S3Client` template and `S3TransferManager`.
See their specification and Javadocs for more information.

The S3 Inbound Channel Adapter is represented by the `S3InboundFileSynchronizingMessageSource` and allows pulling S3 objects as files from the S3 bucket to the local directory for synchronization.
This adapter is fully similar to the Inbound Channel Adapters in the FTP and SFTP Spring Integration modules.
See more information in the https://docs.spring.io/spring-integration/reference/ftp.html[FTP/FTPS Adapters Chapter] for common options or `SessionFactory`, `RemoteFileTemplate` and `FileListFilter` abstractions.

The Java Configuration is:

[source,java]
----
@SpringBootApplication
public static class MyConfiguration {

@Autowired
private S3Client amazonS3;

@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer() {
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(this.amazonS3);
synchronizer.setDeleteRemoteFiles(true);
synchronizer.setPreserveTimestamp(true);
synchronizer.setRemoteDirectory(S3_BUCKET);
synchronizer.setFilter(new S3RegexPatternFileListFilter(".*\\.test$"));
Expression expression = PARSER.parseExpression("#this.toUpperCase() + '.a'");
synchronizer.setLocalFilenameGeneratorExpression(expression);
return synchronizer;
}

@Bean
@InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "100"))
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() {
S3InboundFileSynchronizingMessageSource messageSource =
new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer());
messageSource.setAutoCreateLocalDirectory(true);
messageSource.setLocalDirectory(LOCAL_FOLDER);
messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>());
return messageSource;
}

@Bean
public PollableChannel s3FilesChannel() {
return new QueueChannel();
}
}
----

With this config you receive messages with `java.io.File` `payload` from the `s3FilesChannel` after periodic synchronization of content from the Amazon S3 bucket into the local directory.

The `S3StreamingMessageSource` adapter produces messages with payloads of type `InputStream`, allowing S3 objects to be fetched without writing to the local file system.
Since the session remains open, the consuming application is responsible for closing the session when the file has been consumed.
The session is provided in the closeableResource header (`IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE`).
Standard framework components, such as the `FileSplitter` and `StreamTransformer` will automatically close the session.

The following Spring Boot application provides an example of configuring the S3 inbound streaming adapter using Java configuration:

[source,java]
----
@SpringBootApplication
public class S3JavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(S3JavaApplication.class)
.web(false)
.run(args);
}

@Autowired
private S3Client amazonS3;

@Bean
@InboundChannelAdapter(value = "s3Channel", poller = @Poller(fixedDelay = "100"))
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
messageSource.setRemoteDirectory(S3_BUCKET);
messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
"streaming"));
return messageSource;
}

@Bean
@Transformer(inputChannel = "s3Channel", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer();
}

@Bean
public S3RemoteFileTemplate template() {
return new S3RemoteFileTemplate(new S3SessionFactory(this.amazonS3));
}

@Bean
public PollableChannel s3Channel() {
return new QueueChannel();
}
}
----

> NOTE: Unlike the non-streaming inbound channel adapter, this adapter does not prevent duplicates by default.
> If you do not delete the remote file and wish to prevent the file being processed again, you can configure an `S3PersistentFileListFilter` in the `filter` attribute.
> If you don’t want to persist the state, an in-memory `SimpleMetadataStore` can be used with the filter.
> If you wish to use a filename pattern (or regex) as well, use a `CompositeFileListFilter`.

The `S3MessageHandler` is an Outbound Channel Adapter and allows performing `upload`, `download` and `copy` (see `S3MessageHandler.Command` enum) operations in the provided S3 bucket.

The Java Configuration is:

[source,java]
----
@SpringBootApplication
public static class MyConfiguration {

@Autowired
private S3AsyncClient amazonS3;

@Bean
@ServiceActivator(inputChannel = "s3UploadChannel")
public MessageHandler s3MessageHandler() {
return new S3MessageHandler(this.amazonS3, "my-bucket");
}

}
----

With this config you can send a message with the `java.io.File` as `payload` and the `transferManager.upload()` operation will be performed, where the file name is used as a S3 Object key.

See more information in the `S3MessageHandler` JavaDocs.

NOTE: The AWS SDK recommends to use `S3CrtAsyncClient` for `S3TransferManager`, therefore an `S3AsyncClient.crtBuilder()` has to be used to achieve respective upload and download requirements, what is done internally in the `S3MessageHandler` when `S3CrtAsyncClient`-based constructor is used.

The `S3MessageHandler` can be used as an Outbound Gateway with the `produceReply = true` constructor argument for Java Configuration.

The "request-reply" nature of this gateway is async and the `Transfer` result from the `TransferManager` operation is sent to the `outputChannel`, assuming the transfer progress observation in the downstream flow.

The `TransferListener` can be supplied to the `S3MessageHandler` to track the transfer progress per requests.

See more information in the `S3MessageHandler` Javadocs.

The Spring Integration dependency (as well as `s3-transfer-manager` and `aws-crt-client`) in the `spring-cloud-aws-s3` module are `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
For convenience, a dedicated `spring-cloud-aws-starter-integration-s3` is provided managing all the required dependencies for Spring Integration support with Amazon S3.

1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-metrics</module>
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-parameter-store</module>
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-s3</module>
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-s3</module>
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-secrets-manager</module>
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-ses</module>
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-sns</module>
Expand Down
6 changes: 6 additions & 0 deletions spring-cloud-aws-dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-integration-s3</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sns</artifactId>
Expand Down
10 changes: 10 additions & 0 deletions spring-cloud-aws-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<artifactId>aws-crt-client</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand All @@ -55,6 +60,11 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.s3.integration;

import java.util.Date;
import org.springframework.integration.file.remote.AbstractFileInfo;
import org.springframework.util.Assert;
import software.amazon.awssdk.services.s3.model.S3Object;

/**
* An Amazon S3 {@link org.springframework.integration.file.remote.FileInfo} implementation.
*
* @author Christian Tzolov
* @author Artem Bilan
*
* @since 4.0
*/
public class S3FileInfo extends AbstractFileInfo<S3Object> {

private final S3Object s3Object;

public S3FileInfo(S3Object s3Object) {
Assert.notNull(s3Object, "s3Object must not be null");
this.s3Object = s3Object;
}

@Override
public boolean isDirectory() {
return false;
}

@Override
public boolean isLink() {
return false;
}

@Override
public long getSize() {
return this.s3Object.size();
}

@Override
public long getModified() {
return this.s3Object.lastModified().getEpochSecond();
}

@Override
public String getFilename() {
return this.s3Object.key();
}

/**
* A permissions representation string. Throws {@link UnsupportedOperationException} to avoid extra
* {@link software.amazon.awssdk.services.s3.S3Client#getObjectAcl} REST call. The target application may choose to
* do that according to its logic.
* @return the permissions representation string.
*/
@Override
public String getPermissions() {
throw new UnsupportedOperationException("Use [AmazonS3.getObjectAcl()] to obtain permissions.");
}

@Override
public S3Object getFileInfo() {
return this.s3Object;
}

@Override
public String toString() {
return "FileInfo [isDirectory=" + isDirectory() + ", isLink=" + isLink() + ", Size=" + getSize()
+ ", ModifiedTime=" + new Date(getModified()) + ", Filename=" + getFilename() + ", RemoteDirectory="
+ getRemoteDirectory() + "]";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.s3.integration;

import java.io.File;
import java.io.IOException;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.lang.Nullable;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.S3Object;

/**
* An implementation of {@link AbstractInboundFileSynchronizer} for Amazon S3.
*
* @author Artem Bilan
*
* @since 4.0
*/
public class S3InboundFileSynchronizer extends AbstractInboundFileSynchronizer<S3Object> {

public S3InboundFileSynchronizer() {
this(new S3SessionFactory());
}

public S3InboundFileSynchronizer(S3Client amazonS3) {
this(new S3SessionFactory(amazonS3));
}

/**
* Create a synchronizer with the {@link SessionFactory} used to acquire {@link Session} instances.
* @param sessionFactory The session factory.
*/
@SuppressWarnings("this-escape")
public S3InboundFileSynchronizer(SessionFactory<S3Object> sessionFactory) {
super(sessionFactory);
doSetRemoteDirectoryExpression(new LiteralExpression(null));
doSetFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "s3MessageSource"));
}

@Override
protected boolean isFile(S3Object file) {
return true;
}

@Override
protected String getFilename(S3Object file) {
return (file != null ? file.key() : null);
}

@Override
protected long getModified(S3Object file) {
return file.lastModified().getEpochSecond();
}

@Override
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath,
@Nullable EvaluationContext localFileEvaluationContext, S3Object remoteFile, File localDirectory,
Session<S3Object> session) throws IOException {

return super.copyFileToLocalDirectory(((S3Session) session).normalizeBucketName(remoteDirectoryPath),
localFileEvaluationContext, remoteFile, localDirectory, session);
}

@Override
protected String protocol() {
return "s3";
}

}
Loading