Skip to content

Commit bd07b5d

Browse files
authored
Add Spring Integration for S3 support (#1476)
* Copy all the S3 functionality from the Spring Integration AWS project * Document the feature * Add `spring-cloud-aws-starter-integration-s3` convenient module * Introduce `LocalstackContainerTest` in the tests, similar to the one in DynamoDB module. Reuse this abstraction for all the S3 integration tests to avoid unnecessary LocalStack container restarts while we running all the tests in module
1 parent b3562aa commit bd07b5d

23 files changed

+2205
-66
lines changed

docs/src/main/asciidoc/s3.adoc

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,3 +641,149 @@ Sample IAM policy granting access to `spring-cloud-aws-demo` bucket:
641641
]
642642
}
643643
----
644+
645+
=== Spring Integration Support
646+
647+
Starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] channel adapters for Amazon SQS.
648+
649+
The S3 Channel Adapters are based on the `S3Client` template and `S3TransferManager`.
650+
See their specification and Javadocs for more information.
651+
652+
The S3 Inbound Channel Adapter is represented by the `S3InboundFileSynchronizingMessageSource` and allows pulling S3 objects as files from the S3 bucket to the local directory for synchronization.
653+
This adapter is fully similar to the Inbound Channel Adapters in the FTP and SFTP Spring Integration modules.
654+
See more information in the https://docs.spring.io/spring-integration/reference/ftp.html[FTP/FTPS Adapters Chapter] for common options or `SessionFactory`, `RemoteFileTemplate` and `FileListFilter` abstractions.
655+
656+
The Java Configuration is:
657+
658+
[source,java]
659+
----
660+
@SpringBootApplication
661+
public static class MyConfiguration {
662+
663+
@Autowired
664+
private S3Client amazonS3;
665+
666+
@Bean
667+
public S3InboundFileSynchronizer s3InboundFileSynchronizer() {
668+
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(this.amazonS3);
669+
synchronizer.setDeleteRemoteFiles(true);
670+
synchronizer.setPreserveTimestamp(true);
671+
synchronizer.setRemoteDirectory(S3_BUCKET);
672+
synchronizer.setFilter(new S3RegexPatternFileListFilter(".*\\.test$"));
673+
Expression expression = PARSER.parseExpression("#this.toUpperCase() + '.a'");
674+
synchronizer.setLocalFilenameGeneratorExpression(expression);
675+
return synchronizer;
676+
}
677+
678+
@Bean
679+
@InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "100"))
680+
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() {
681+
S3InboundFileSynchronizingMessageSource messageSource =
682+
new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer());
683+
messageSource.setAutoCreateLocalDirectory(true);
684+
messageSource.setLocalDirectory(LOCAL_FOLDER);
685+
messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>());
686+
return messageSource;
687+
}
688+
689+
@Bean
690+
public PollableChannel s3FilesChannel() {
691+
return new QueueChannel();
692+
}
693+
}
694+
----
695+
696+
With this config you receive messages with `java.io.File` `payload` from the `s3FilesChannel` after periodic synchronization of content from the Amazon S3 bucket into the local directory.
697+
698+
The `S3StreamingMessageSource` adapter produces messages with payloads of type `InputStream`, allowing S3 objects to be fetched without writing to the local file system.
699+
Since the session remains open, the consuming application is responsible for closing the session when the file has been consumed.
700+
The session is provided in the closeableResource header (`IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE`).
701+
Standard framework components, such as the `FileSplitter` and `StreamTransformer` will automatically close the session.
702+
703+
The following Spring Boot application provides an example of configuring the S3 inbound streaming adapter using Java configuration:
704+
705+
[source,java]
706+
----
707+
@SpringBootApplication
708+
public class S3JavaApplication {
709+
710+
public static void main(String[] args) {
711+
new SpringApplicationBuilder(S3JavaApplication.class)
712+
.web(false)
713+
.run(args);
714+
}
715+
716+
@Autowired
717+
private S3Client amazonS3;
718+
719+
@Bean
720+
@InboundChannelAdapter(value = "s3Channel", poller = @Poller(fixedDelay = "100"))
721+
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
722+
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
723+
messageSource.setRemoteDirectory(S3_BUCKET);
724+
messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
725+
"streaming"));
726+
return messageSource;
727+
}
728+
729+
@Bean
730+
@Transformer(inputChannel = "s3Channel", outputChannel = "data")
731+
public org.springframework.integration.transformer.Transformer transformer() {
732+
return new StreamTransformer();
733+
}
734+
735+
@Bean
736+
public S3RemoteFileTemplate template() {
737+
return new S3RemoteFileTemplate(new S3SessionFactory(this.amazonS3));
738+
}
739+
740+
@Bean
741+
public PollableChannel s3Channel() {
742+
return new QueueChannel();
743+
}
744+
}
745+
----
746+
747+
> NOTE: Unlike the non-streaming inbound channel adapter, this adapter does not prevent duplicates by default.
748+
> If you do not delete the remote file and wish to prevent the file being processed again, you can configure an `S3PersistentFileListFilter` in the `filter` attribute.
749+
> If you don’t want to persist the state, an in-memory `SimpleMetadataStore` can be used with the filter.
750+
> If you wish to use a filename pattern (or regex) as well, use a `CompositeFileListFilter`.
751+
752+
The `S3MessageHandler` is an Outbound Channel Adapter and allows performing `upload`, `download` and `copy` (see `S3MessageHandler.Command` enum) operations in the provided S3 bucket.
753+
754+
The Java Configuration is:
755+
756+
[source,java]
757+
----
758+
@SpringBootApplication
759+
public static class MyConfiguration {
760+
761+
@Autowired
762+
private S3AsyncClient amazonS3;
763+
764+
@Bean
765+
@ServiceActivator(inputChannel = "s3UploadChannel")
766+
public MessageHandler s3MessageHandler() {
767+
return new S3MessageHandler(this.amazonS3, "my-bucket");
768+
}
769+
770+
}
771+
----
772+
773+
With this config you can send a message with the `java.io.File` as `payload` and the `transferManager.upload()` operation will be performed, where the file name is used as a S3 Object key.
774+
775+
See more information in the `S3MessageHandler` JavaDocs.
776+
777+
NOTE: The AWS SDK recommends to use `S3CrtAsyncClient` for `S3TransferManager`, therefore an `S3AsyncClient.crtBuilder()` has to be used to achieve respective upload and download requirements, what is done internally in the `S3MessageHandler` when `S3CrtAsyncClient`-based constructor is used.
778+
779+
The `S3MessageHandler` can be used as an Outbound Gateway with the `produceReply = true` constructor argument for Java Configuration.
780+
781+
The "request-reply" nature of this gateway is async and the `Transfer` result from the `TransferManager` operation is sent to the `outputChannel`, assuming the transfer progress observation in the downstream flow.
782+
783+
The `TransferListener` can be supplied to the `S3MessageHandler` to track the transfer progress per requests.
784+
785+
See more information in the `S3MessageHandler` Javadocs.
786+
787+
The Spring Integration dependency (as well as `s3-transfer-manager` and `aws-crt-client`) in the `spring-cloud-aws-s3` module are `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
788+
For convenience, a dedicated `spring-cloud-aws-starter-integration-s3` is provided managing all the required dependencies for Spring Integration support with Amazon S3.
789+

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-metrics</module>
5353
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-parameter-store</module>
5454
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-s3</module>
55+
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-s3</module>
5556
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-secrets-manager</module>
5657
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-ses</module>
5758
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-sns</module>

spring-cloud-aws-dependencies/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,12 @@
202202
<version>${project.version}</version>
203203
</dependency>
204204

205+
<dependency>
206+
<groupId>io.awspring.cloud</groupId>
207+
<artifactId>spring-cloud-aws-starter-integration-s3</artifactId>
208+
<version>${project.version}</version>
209+
</dependency>
210+
205211
<dependency>
206212
<groupId>io.awspring.cloud</groupId>
207213
<artifactId>spring-cloud-aws-starter-sns</artifactId>

spring-cloud-aws-s3/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@
4040
<artifactId>aws-crt-client</artifactId>
4141
<optional>true</optional>
4242
</dependency>
43+
<dependency>
44+
<groupId>org.springframework.integration</groupId>
45+
<artifactId>spring-integration-file</artifactId>
46+
<optional>true</optional>
47+
</dependency>
4348
<dependency>
4449
<groupId>com.fasterxml.jackson.core</groupId>
4550
<artifactId>jackson-databind</artifactId>
@@ -55,6 +60,11 @@
5560
<artifactId>junit-jupiter</artifactId>
5661
<scope>test</scope>
5762
</dependency>
63+
<dependency>
64+
<groupId>org.springframework.integration</groupId>
65+
<artifactId>spring-integration-test</artifactId>
66+
<scope>test</scope>
67+
</dependency>
5868

5969
</dependencies>
6070
</project>
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.s3.integration;
17+
18+
import java.util.Date;
19+
import org.springframework.integration.file.remote.AbstractFileInfo;
20+
import org.springframework.util.Assert;
21+
import software.amazon.awssdk.services.s3.model.S3Object;
22+
23+
/**
24+
* An Amazon S3 {@link org.springframework.integration.file.remote.FileInfo} implementation.
25+
*
26+
* @author Christian Tzolov
27+
* @author Artem Bilan
28+
*
29+
* @since 4.0
30+
*/
31+
public class S3FileInfo extends AbstractFileInfo<S3Object> {
32+
33+
private final S3Object s3Object;
34+
35+
public S3FileInfo(S3Object s3Object) {
36+
Assert.notNull(s3Object, "s3Object must not be null");
37+
this.s3Object = s3Object;
38+
}
39+
40+
@Override
41+
public boolean isDirectory() {
42+
return false;
43+
}
44+
45+
@Override
46+
public boolean isLink() {
47+
return false;
48+
}
49+
50+
@Override
51+
public long getSize() {
52+
return this.s3Object.size();
53+
}
54+
55+
@Override
56+
public long getModified() {
57+
return this.s3Object.lastModified().getEpochSecond();
58+
}
59+
60+
@Override
61+
public String getFilename() {
62+
return this.s3Object.key();
63+
}
64+
65+
/**
66+
* A permissions representation string. Throws {@link UnsupportedOperationException} to avoid extra
67+
* {@link software.amazon.awssdk.services.s3.S3Client#getObjectAcl} REST call. The target application may choose to
68+
* do that according to its logic.
69+
* @return the permissions representation string.
70+
*/
71+
@Override
72+
public String getPermissions() {
73+
throw new UnsupportedOperationException("Use [AmazonS3.getObjectAcl()] to obtain permissions.");
74+
}
75+
76+
@Override
77+
public S3Object getFileInfo() {
78+
return this.s3Object;
79+
}
80+
81+
@Override
82+
public String toString() {
83+
return "FileInfo [isDirectory=" + isDirectory() + ", isLink=" + isLink() + ", Size=" + getSize()
84+
+ ", ModifiedTime=" + new Date(getModified()) + ", Filename=" + getFilename() + ", RemoteDirectory="
85+
+ getRemoteDirectory() + "]";
86+
}
87+
88+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.s3.integration;
17+
18+
import java.io.File;
19+
import java.io.IOException;
20+
import org.springframework.expression.EvaluationContext;
21+
import org.springframework.expression.common.LiteralExpression;
22+
import org.springframework.integration.file.remote.session.Session;
23+
import org.springframework.integration.file.remote.session.SessionFactory;
24+
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer;
25+
import org.springframework.integration.metadata.SimpleMetadataStore;
26+
import org.springframework.lang.Nullable;
27+
import software.amazon.awssdk.services.s3.S3Client;
28+
import software.amazon.awssdk.services.s3.model.S3Object;
29+
30+
/**
31+
* An implementation of {@link AbstractInboundFileSynchronizer} for Amazon S3.
32+
*
33+
* @author Artem Bilan
34+
*
35+
* @since 4.0
36+
*/
37+
public class S3InboundFileSynchronizer extends AbstractInboundFileSynchronizer<S3Object> {
38+
39+
public S3InboundFileSynchronizer() {
40+
this(new S3SessionFactory());
41+
}
42+
43+
public S3InboundFileSynchronizer(S3Client amazonS3) {
44+
this(new S3SessionFactory(amazonS3));
45+
}
46+
47+
/**
48+
* Create a synchronizer with the {@link SessionFactory} used to acquire {@link Session} instances.
49+
* @param sessionFactory The session factory.
50+
*/
51+
@SuppressWarnings("this-escape")
52+
public S3InboundFileSynchronizer(SessionFactory<S3Object> sessionFactory) {
53+
super(sessionFactory);
54+
doSetRemoteDirectoryExpression(new LiteralExpression(null));
55+
doSetFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "s3MessageSource"));
56+
}
57+
58+
@Override
59+
protected boolean isFile(S3Object file) {
60+
return true;
61+
}
62+
63+
@Override
64+
protected String getFilename(S3Object file) {
65+
return (file != null ? file.key() : null);
66+
}
67+
68+
@Override
69+
protected long getModified(S3Object file) {
70+
return file.lastModified().getEpochSecond();
71+
}
72+
73+
@Override
74+
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath,
75+
@Nullable EvaluationContext localFileEvaluationContext, S3Object remoteFile, File localDirectory,
76+
Session<S3Object> session) throws IOException {
77+
78+
return super.copyFileToLocalDirectory(((S3Session) session).normalizeBucketName(remoteDirectoryPath),
79+
localFileEvaluationContext, remoteFile, localDirectory, session);
80+
}
81+
82+
@Override
83+
protected String protocol() {
84+
return "s3";
85+
}
86+
87+
}

0 commit comments

Comments
 (0)