diff --git a/docs/src/main/asciidoc/index.adoc b/docs/src/main/asciidoc/index.adoc
index f2fb9d107..95df4eb93 100644
--- a/docs/src/main/asciidoc/index.adoc
+++ b/docs/src/main/asciidoc/index.adoc
@@ -140,6 +140,8 @@ include::core.adoc[]
include::dynamodb.adoc[]
+include::kinesis.adoc[]
+
include::s3.adoc[]
include::ses.adoc[]
diff --git a/docs/src/main/asciidoc/kinesis.adoc b/docs/src/main/asciidoc/kinesis.adoc
new file mode 100644
index 000000000..80f7919de
--- /dev/null
+++ b/docs/src/main/asciidoc/kinesis.adoc
@@ -0,0 +1,50 @@
+[#spring-cloud-aws-kinesis]
+== Kinesis Integration
+
+The https://aws.amazon.com/kinesis/[Kinesis] is a platform for streaming data on AWS, making it easy to load and analyze streaming data and also providing the ability for you to build custom streaming data applications for specialized needs.
+
+// TODO: auto-configuration
+
+=== Spring Integration Support
+
+Also, starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] channel adapters for Amazon Kinesis.
+
+The `KinesisMessageHandler` is an `AbstractMessageHandler` to perform put record(s) to the Kinesis stream.
+The stream, partition key (or explicit hash key) and sequence number can be determined against a request message via evaluation provided expressions or can be specified statically.
+They also can be specified as `KinesisHeaders.STREAM`, `KinesisHeaders.PARTITION_KEY` and `KinesisHeaders.SEQUENCE_NUMBER` respectively.
+
+The `KinesisMessageHandler` can be configured with the `outputChannel` for sending a `Message` on successful put operation.
+The payload is the original request and additional `KinesisHeaders.SHARD` and `KinesisHeaders.SEQUENCE_NUMBER` headers are populated from the `PutRecordResposne`.
+If the request payload is a `PutRecordsRequest`, the full `PutRecordsResponse` is populated in the `KinesisHeaders.SERVICE_RESULT` header instead.
+
+When an async failure is happened on the put operation, the `ErrorMessage` is sent to the `errorChannel` header or global one.
+The payload is an `MessageHandlingException`.
+
+The `payload` of request message can be:
+
+- `PutRecordsRequest` to perform `KinesisAsyncClient.putRecords`
+- `PutRecordRequest` to perform `KinesisAsyncClient.putRecord`
+- `ByteBuffer` to represent data of the `PutRecordRequest`
+- `byte[]` which is wrapped to the `ByteBuffer`
+- any other type that is converted to the `byte[]` by the provided `Converter`; the `SerializingConverter` is used by default.
+
+The Java Configuration for the message handler is:
+
+[source,java]
+----
+@Bean
+@ServiceActivator(inputChannel = "kinesisSendChannel")
+public MessageHandler kinesisMessageHandler(KinesisAsyncClient amazonKinesis,
+ MessageChannel channel) {
+ KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
+ kinesisMessageHandler.setPartitionKey("1");
+ kinesisMessageHandler.setOutputChannel(channel);
+ return kinesisMessageHandler;
+}
+----
+
+The Kinesis service does not provide a "headers"(attributes) abstraction, so the `KinesisMessageHandler` can be configured with the `OutboundMessageMapper` to embed message headers into the record data alongside the payload.
+See `EmbeddedHeadersJsonMessageMapper` implementation for more information.
+
+The Spring Integration dependency in the `spring-cloud-aws-kinesis` module is `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
+For convenience, a dedicated `spring-cloud-aws-starter-integration-kinesis` is provided managing all the required dependencies for Spring Integration support with a classical Amazon Kinesis client.
diff --git a/pom.xml b/pom.xml
index 96538ab75..3da0bfbe6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,8 @@
spring-cloud-aws-sns
spring-cloud-aws-sqs
spring-cloud-aws-dynamodb
+ spring-cloud-aws-kinesis
+ spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis
spring-cloud-aws-s3
spring-cloud-aws-testcontainers
spring-cloud-aws-starters/spring-cloud-aws-starter
@@ -60,7 +62,7 @@
spring-cloud-aws-test
spring-cloud-aws-modulith
docs
-
+
diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml
index df111f992..0de60bd2f 100644
--- a/spring-cloud-aws-dependencies/pom.xml
+++ b/spring-cloud-aws-dependencies/pom.xml
@@ -25,6 +25,8 @@
2.31.0
2.32.28
+ 3.1.2
+ 1.0.4
2.0.5
3.3.5
1.6
@@ -111,6 +113,16 @@
spring-cloud-aws-dynamodb
${project.version}
+
+ io.awspring.cloud
+ spring-cloud-aws-kinesis
+ ${project.version}
+
+
+ io.awspring.cloud
+ spring-cloud-aws-starter-integration-kinesis
+ ${project.version}
+
io.awspring.cloud
@@ -237,6 +249,17 @@
jakarta.mail
${eclipse.jakarta.mail.version}
+
+ software.amazon.kinesis
+ amazon-kinesis-client
+ ${kcl.version}
+
+
+ software.amazon.kinesis
+ amazon-kinesis-producer
+ ${kpl.version}
+
+
diff --git a/spring-cloud-aws-kinesis/pom.xml b/spring-cloud-aws-kinesis/pom.xml
new file mode 100644
index 000000000..ffee8bff3
--- /dev/null
+++ b/spring-cloud-aws-kinesis/pom.xml
@@ -0,0 +1,55 @@
+
+
+ 4.0.0
+
+ io.awspring.cloud
+ spring-cloud-aws
+ 4.0.0-SNAPSHOT
+
+
+ spring-cloud-aws-kinesis
+ Spring Cloud AWS Kinesis Integration
+
+
+
+ software.amazon.awssdk
+ kinesis
+ true
+
+
+ software.amazon.kinesis
+ amazon-kinesis-client
+ true
+
+
+ software.amazon.kinesis
+ amazon-kinesis-producer
+ true
+
+
+ org.springframework.integration
+ spring-integration-core
+ true
+
+
+ org.testcontainers
+ localstack
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+ org.springframework.integration
+ spring-integration-test
+ test
+
+
+
+
+
+
diff --git a/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/ConvertingFromMessageConverter.java b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/ConvertingFromMessageConverter.java
new file mode 100644
index 000000000..78c73ca63
--- /dev/null
+++ b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/ConvertingFromMessageConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.kinesis.integration;
+
+import org.springframework.core.convert.converter.Converter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.MessageConverter;
+
+/**
+ * A simple {@link MessageConverter} that delegates to a {@link Converter}.
+ *
+ * @author Artem Bilan
+ *
+ * @since 4.0
+ */
+record ConvertingFromMessageConverter(Converter delegate) implements MessageConverter {
+
+ @Override
+ public Object fromMessage(Message> message, Class> targetClass) {
+ return this.delegate.convert(message.getPayload());
+ }
+
+ @Override
+ public Message> toMessage(Object payload, MessageHeaders headers) {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git a/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KinesisHeaders.java b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KinesisHeaders.java
new file mode 100644
index 000000000..ac4faff69
--- /dev/null
+++ b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KinesisHeaders.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.kinesis.integration;
+
+/**
+ * Constants for Kinesis message headers.
+ *
+ * @author Artem Bilan
+ *
+ * @since 4.0
+ */
+public final class KinesisHeaders {
+
+ /**
+ * Kinesis headers prefix to be used by all headers added by the framework.
+ */
+ public static final String PREFIX = "Kinesis_";
+
+ /**
+ * The {@value STREAM} header for sending data to Kinesis.
+ */
+ public static final String STREAM = PREFIX + "stream";
+
+ /**
+ * The {@value RECEIVED_STREAM} header for receiving data from Kinesis.
+ */
+ public static final String RECEIVED_STREAM = PREFIX + "receivedStream";
+
+ /**
+ * The {@value PARTITION_KEY} header for sending data to Kinesis.
+ */
+ public static final String PARTITION_KEY = PREFIX + "partitionKey";
+
+ /**
+ * The {@value SEQUENCE_NUMBER} header for sending data to Kinesis.
+ */
+ public static final String SEQUENCE_NUMBER = PREFIX + "sequenceNumber";
+
+ /**
+ * The {@value SHARD} header to represent Kinesis shardId.
+ */
+ public static final String SHARD = PREFIX + "shard";
+
+ /**
+ * The {@value SERVICE_RESULT} header represents a
+ * {@link software.amazon.awssdk.services.kinesis.model.KinesisResponse}.
+ */
+ public static final String SERVICE_RESULT = PREFIX + "serviceResult";
+
+ /**
+ * The {@value RECEIVED_PARTITION_KEY} header for receiving data from Kinesis.
+ */
+ public static final String RECEIVED_PARTITION_KEY = PREFIX + "receivedPartitionKey";
+
+ /**
+ * The {@value RECEIVED_SEQUENCE_NUMBER} header for receiving data from Kinesis.
+ */
+ public static final String RECEIVED_SEQUENCE_NUMBER = PREFIX + "receivedSequenceNumber";
+
+ /**
+ * The {@value CHECKPOINTER} header for checkpoint the shard sequenceNumber.
+ */
+ public static final String CHECKPOINTER = PREFIX + "checkpointer";
+
+ /**
+ * The {@value RAW_RECORD} header represents received Kinesis record(s).
+ */
+ public static final String RAW_RECORD = PREFIX + "rawRecord";
+
+ private KinesisHeaders() {
+ }
+
+}
diff --git a/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KinesisMessageHandler.java b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KinesisMessageHandler.java
new file mode 100644
index 000000000..294c1afef
--- /dev/null
+++ b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KinesisMessageHandler.java
@@ -0,0 +1,311 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.kinesis.integration;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.springframework.core.serializer.support.SerializingConverter;
+import org.springframework.expression.EvaluationContext;
+import org.springframework.expression.Expression;
+import org.springframework.expression.common.LiteralExpression;
+import org.springframework.integration.MessageTimeoutException;
+import org.springframework.integration.expression.ExpressionUtils;
+import org.springframework.integration.expression.ValueExpression;
+import org.springframework.integration.handler.AbstractMessageHandler;
+import org.springframework.integration.handler.AbstractMessageProducingHandler;
+import org.springframework.integration.mapping.OutboundMessageMapper;
+import org.springframework.integration.support.MutableMessage;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHandlingException;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.MessageConversionException;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.KinesisRequest;
+import software.amazon.awssdk.services.kinesis.model.KinesisResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+
+/**
+ * The {@link AbstractMessageHandler} implementation for the Amazon Kinesis {@code putRecord(s)}.
+ *
+ * @author Artem Bilan
+ * @author Jacob Severson
+ *
+ * @since 4.0
+ *
+ * @see KinesisAsyncClient#putRecord(PutRecordRequest)
+ * @see KinesisAsyncClient#putRecords(PutRecordsRequest)
+ */
+public class KinesisMessageHandler extends AbstractMessageProducingHandler {
+
+ private static final long DEFAULT_SEND_TIMEOUT = 10000;
+
+ private final KinesisAsyncClient amazonKinesis;
+
+ private MessageConverter messageConverter = new ConvertingFromMessageConverter(new SerializingConverter());
+
+ private Expression streamExpression;
+
+ private Expression partitionKeyExpression;
+
+ private Expression explicitHashKeyExpression;
+
+ private Expression sequenceNumberExpression;
+
+ private OutboundMessageMapper embeddedHeadersMapper;
+
+ private EvaluationContext evaluationContext;
+
+ private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT);
+
+ public KinesisMessageHandler(KinesisAsyncClient amazonKinesis) {
+ Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null.");
+ this.amazonKinesis = amazonKinesis;
+ }
+
+ /**
+ * Configure a {@link MessageConverter} for converting payload to {@code byte[]} for Kinesis record.
+ * @param messageConverter the {@link MessageConverter} to use.
+ */
+ public void setMessageConverter(MessageConverter messageConverter) {
+ Assert.notNull(messageConverter, "'messageConverter' must not be null.");
+ this.messageConverter = messageConverter;
+ }
+
+ public void setStream(String stream) {
+ setStreamExpression(new LiteralExpression(stream));
+ }
+
+ public void setStreamExpressionString(String streamExpression) {
+ setStreamExpression(EXPRESSION_PARSER.parseExpression(streamExpression));
+ }
+
+ public void setStreamExpression(Expression streamExpression) {
+ this.streamExpression = streamExpression;
+ }
+
+ public void setPartitionKey(String partitionKey) {
+ setPartitionKeyExpression(new LiteralExpression(partitionKey));
+ }
+
+ public void setPartitionKeyExpressionString(String partitionKeyExpression) {
+ setPartitionKeyExpression(EXPRESSION_PARSER.parseExpression(partitionKeyExpression));
+ }
+
+ public void setPartitionKeyExpression(Expression partitionKeyExpression) {
+ this.partitionKeyExpression = partitionKeyExpression;
+ }
+
+ public void setExplicitHashKey(String explicitHashKey) {
+ setExplicitHashKeyExpression(new LiteralExpression(explicitHashKey));
+ }
+
+ public void setExplicitHashKeyExpressionString(String explicitHashKeyExpression) {
+ setExplicitHashKeyExpression(EXPRESSION_PARSER.parseExpression(explicitHashKeyExpression));
+ }
+
+ public void setExplicitHashKeyExpression(Expression explicitHashKeyExpression) {
+ this.explicitHashKeyExpression = explicitHashKeyExpression;
+ }
+
+ public void setSequenceNumberExpressionString(String sequenceNumberExpression) {
+ setSequenceNumberExpression(EXPRESSION_PARSER.parseExpression(sequenceNumberExpression));
+ }
+
+ public void setSequenceNumberExpression(Expression sequenceNumberExpression) {
+ this.sequenceNumberExpression = sequenceNumberExpression;
+ }
+
+ /**
+ * Specify a {@link OutboundMessageMapper} for embedding message headers into the record data together with payload.
+ * @param embeddedHeadersMapper the {@link OutboundMessageMapper} to embed headers into the record data.
+ * @see org.springframework.integration.support.json.EmbeddedHeadersJsonMessageMapper
+ */
+ public void setEmbeddedHeadersMapper(OutboundMessageMapper embeddedHeadersMapper) {
+ this.embeddedHeadersMapper = embeddedHeadersMapper;
+ }
+
+ public void setSendTimeout(long sendTimeout) {
+ setSendTimeoutExpression(new ValueExpression<>(sendTimeout));
+ }
+
+ public void setSendTimeoutExpressionString(String sendTimeoutExpression) {
+ setSendTimeoutExpression(EXPRESSION_PARSER.parseExpression(sendTimeoutExpression));
+ }
+
+ public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
+ Assert.notNull(sendTimeoutExpression, "'sendTimeoutExpression' must not be null");
+ this.sendTimeoutExpression = sendTimeoutExpression;
+ }
+
+ @Override
+ protected void onInit() {
+ super.onInit();
+ this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
+ }
+
+ @Override
+ protected boolean shouldCopyRequestHeaders() {
+ return false;
+ }
+
+ @Override
+ protected void handleMessageInternal(Message> message) {
+ KinesisRequest request = messageToAwsRequest(message);
+ CompletableFuture> resultFuture = handleMessageToAws(request)
+ .handle((response, ex) -> handleResponse(message, request, response, ex));
+
+ if (isAsync()) {
+ sendOutputs(resultFuture, message);
+ return;
+ }
+
+ Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
+
+ try {
+ if (sendTimeout == null || sendTimeout < 0) {
+ resultFuture.get();
+ }
+ else {
+ resultFuture.get(sendTimeout, TimeUnit.MILLISECONDS);
+ }
+ }
+ catch (TimeoutException te) {
+ throw new MessageTimeoutException(message, "Timeout waiting for response from AmazonKinesis", te);
+ }
+ catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(ex);
+ }
+ catch (ExecutionException ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ protected Message> handleResponse(Message> message, KinesisRequest request, KinesisResponse response,
+ Throwable cause) {
+
+ if (cause != null) {
+ throw new MessageHandlingException(message, cause);
+ }
+ return getMessageBuilderFactory().fromMessage(message)
+ .copyHeadersIfAbsent(additionalOnSuccessHeaders(request, response)).build();
+ }
+
+ private KinesisRequest messageToAwsRequest(Message> message) {
+ if (message.getPayload() instanceof PutRecordsRequest putRecordsRequest) {
+ return putRecordsRequest;
+ }
+ else {
+ return message.getPayload() instanceof PutRecordRequest putRecordRequest ? putRecordRequest
+ : buildPutRecordRequest(message);
+ }
+ }
+
+ private PutRecordRequest buildPutRecordRequest(Message> message) {
+ MessageHeaders messageHeaders = message.getHeaders();
+ String stream = messageHeaders.get(KinesisHeaders.STREAM, String.class);
+ if (!StringUtils.hasText(stream) && this.streamExpression != null) {
+ stream = this.streamExpression.getValue(this.evaluationContext, message, String.class);
+ }
+ Assert.state(stream != null,
+ "'stream' must not be null for sending a Kinesis record. "
+ + "Consider configuring this handler with a 'stream'( or 'streamExpression') or supply an "
+ + "'aws_stream' message header.");
+
+ String partitionKey = messageHeaders.get(KinesisHeaders.PARTITION_KEY, String.class);
+ if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) {
+ partitionKey = this.partitionKeyExpression.getValue(this.evaluationContext, message, String.class);
+ }
+ Assert.state(partitionKey != null, "'partitionKey' must not be null for sending a Kinesis record. "
+ + "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') or supply an "
+ + "'aws_partitionKey' message header.");
+
+ String explicitHashKey = (this.explicitHashKeyExpression != null
+ ? this.explicitHashKeyExpression.getValue(this.evaluationContext, message, String.class)
+ : null);
+
+ String sequenceNumber = messageHeaders.get(KinesisHeaders.SEQUENCE_NUMBER, String.class);
+ if (!StringUtils.hasText(sequenceNumber) && this.sequenceNumberExpression != null) {
+ sequenceNumber = this.sequenceNumberExpression.getValue(this.evaluationContext, message, String.class);
+ }
+
+ Object payload = message.getPayload();
+
+ SdkBytes data = null;
+
+ Message> messageToEmbed = null;
+
+ if (payload instanceof ByteBuffer byteBuffer) {
+ data = SdkBytes.fromByteBuffer(byteBuffer);
+ if (this.embeddedHeadersMapper != null) {
+ messageToEmbed = new MutableMessage<>(data.asByteArray(), messageHeaders);
+ }
+ }
+ else {
+ byte[] bytes = (byte[]) (payload instanceof byte[] ? payload
+ : this.messageConverter.fromMessage(message, byte[].class));
+ Assert.notNull(bytes, "payload cannot be null");
+ if (this.embeddedHeadersMapper != null) {
+ messageToEmbed = new MutableMessage<>(bytes, messageHeaders);
+ }
+ else {
+ data = SdkBytes.fromByteArray(bytes);
+ }
+ }
+
+ if (messageToEmbed != null) {
+ try {
+ byte[] bytes = this.embeddedHeadersMapper.fromMessage(messageToEmbed);
+ Assert.notNull(bytes, "payload cannot be null");
+ data = SdkBytes.fromByteArray(bytes);
+ }
+ catch (Exception ex) {
+ throw new MessageConversionException(message, "Cannot embedded headers to payload", ex);
+ }
+ }
+
+ return PutRecordRequest.builder().streamName(stream).partitionKey(partitionKey).explicitHashKey(explicitHashKey)
+ .sequenceNumberForOrdering(sequenceNumber).data(data).build();
+ }
+
+ private CompletableFuture extends KinesisResponse> handleMessageToAws(KinesisRequest request) {
+ if (request instanceof PutRecordsRequest putRecordsRequest) {
+ return this.amazonKinesis.putRecords(putRecordsRequest);
+ }
+ else {
+ return this.amazonKinesis.putRecord((PutRecordRequest) request);
+ }
+ }
+
+ protected Map additionalOnSuccessHeaders(KinesisRequest request, KinesisResponse response) {
+ if (response instanceof PutRecordResponse putRecordResponse) {
+ return Map.of(KinesisHeaders.SHARD, putRecordResponse.shardId(), KinesisHeaders.SEQUENCE_NUMBER,
+ putRecordResponse.sequenceNumber(), KinesisHeaders.SERVICE_RESULT, response);
+ }
+ return Map.of(KinesisHeaders.SERVICE_RESULT, response);
+ }
+
+}
diff --git a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/LocalstackContainerTest.java b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/LocalstackContainerTest.java
new file mode 100644
index 000000000..445957bdb
--- /dev/null
+++ b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/LocalstackContainerTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.kinesis;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+/**
+ * The base contract for JUnit tests based on the container for Localstack. The Testcontainers 'reuse' option must be
+ * disabled, so, Ryuk container is started and will clean all the containers up from this test suite after JVM exit.
+ * Since the Localstack container instance is shared via static property, it is going to be started only once per JVM;
+ * therefore, the target Docker container is reused automatically.
+ *
+ * @author Artem Bilan
+ *
+ * @since 4.0
+ */
+@Testcontainers(disabledWithoutDocker = true)
+public interface LocalstackContainerTest {
+
+ LocalStackContainer LOCAL_STACK_CONTAINER = new LocalStackContainer(
+ DockerImageName.parse("localstack/localstack:4.4.0"));
+
+ @BeforeAll
+ static void startContainer() {
+ LOCAL_STACK_CONTAINER.start();
+ }
+
+ static KinesisAsyncClient kinesisClient() {
+ return applyAwsClientOptions(KinesisAsyncClient.builder().httpClientBuilder(NettyNioAsyncHttpClient.builder()));
+ }
+
+ static AwsCredentialsProvider credentialsProvider() {
+ return StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(LOCAL_STACK_CONTAINER.getAccessKey(), LOCAL_STACK_CONTAINER.getSecretKey()));
+ }
+
+ private static , T> T applyAwsClientOptions(B clientBuilder) {
+ return clientBuilder.region(Region.of(LOCAL_STACK_CONTAINER.getRegion()))
+ .credentialsProvider(credentialsProvider()).endpointOverride(LOCAL_STACK_CONTAINER.getEndpoint())
+ .build();
+ }
+
+}
diff --git a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisMessageHandlerIntegrationTests.java b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisMessageHandlerIntegrationTests.java
new file mode 100644
index 000000000..49b9198e2
--- /dev/null
+++ b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisMessageHandlerIntegrationTests.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.kinesis.integration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import io.awspring.cloud.kinesis.LocalstackContainerTest;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+import software.amazon.awssdk.core.waiters.WaiterResponse;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StreamDescription;
+
+/**
+ * @author Artem Bilan
+ *
+ * @since 4.0
+ */
+@SpringJUnitConfig
+@DirtiesContext
+class KinesisMessageHandlerIntegrationTests implements LocalstackContainerTest {
+
+ private static final String TEST_STREAM = "TestStream";
+
+ private static KinesisAsyncClient AMAZON_KINESIS;
+
+ private static String TEST_STREAM_SHARD_ID;
+
+ @Autowired
+ MessageChannel kinesisSendChannel;
+
+ @BeforeAll
+ static void setup() {
+ AMAZON_KINESIS = LocalstackContainerTest.kinesisClient();
+ WaiterResponse describeStreamResponse = AMAZON_KINESIS
+ .createStream(request -> request.streamName(TEST_STREAM).shardCount(1))
+ .thenCompose(result -> AMAZON_KINESIS.waiter()
+ .waitUntilStreamExists(request -> request.streamName(TEST_STREAM)))
+ .join();
+
+ StreamDescription streamDescription = describeStreamResponse.matched().response().get().streamDescription();
+ TEST_STREAM_SHARD_ID = streamDescription.shards().get(0).shardId();
+ }
+
+ @Test
+ void kinesisMessageHandler() {
+ String shardIterator = AMAZON_KINESIS
+ .getShardIterator(builder -> builder.shardId(TEST_STREAM_SHARD_ID).streamName(TEST_STREAM)
+ .shardIteratorType(ShardIteratorType.LATEST))
+ .thenApply(GetShardIteratorResponse::shardIterator).join();
+
+ Message> message = MessageBuilder.withPayload("message").setHeader(KinesisHeaders.PARTITION_KEY, "testKey")
+ .build();
+
+ this.kinesisSendChannel.send(message);
+
+ AtomicReference recordReference = new AtomicReference<>();
+ AtomicReference nextShardIteratorReference = new AtomicReference<>(shardIterator);
+
+ await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
+ GetRecordsResponse getRecordsResponse = AMAZON_KINESIS
+ .getRecords(builder -> builder.shardIterator(nextShardIteratorReference.get())).join();
+ assertThat(getRecordsResponse).isNotNull();
+ List records = getRecordsResponse.records();
+ try {
+ assertThat(records).isNotEmpty();
+ }
+ catch (AssertionError ex) {
+ nextShardIteratorReference.set(getRecordsResponse.nextShardIterator());
+ throw ex;
+ }
+ recordReference.set(records.get(0));
+ });
+
+ assertThat(recordReference.get().partitionKey()).isEqualTo("testKey");
+ assertThat(recordReference.get().data().asUtf8String()).isEqualTo("message");
+ }
+
+ @Configuration(proxyBeanMethods = false)
+ @EnableIntegration
+ public static class ContextConfiguration {
+
+ @Bean
+ @ServiceActivator(inputChannel = "kinesisSendChannel")
+ public MessageHandler kinesisMessageHandler() {
+ KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(AMAZON_KINESIS);
+ kinesisMessageHandler.setStream(TEST_STREAM);
+ kinesisMessageHandler.setMessageConverter(
+ new ConvertingFromMessageConverter(source -> source.toString().getBytes(StandardCharsets.UTF_8)));
+ return kinesisMessageHandler;
+ }
+
+ }
+
+}
diff --git a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisMessageHandlerTests.java b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisMessageHandlerTests.java
new file mode 100644
index 000000000..dfaf3a12d
--- /dev/null
+++ b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisMessageHandlerTests.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.kinesis.integration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.entry;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CompletableFuture;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.serializer.support.SerializingConverter;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.expression.FunctionExpression;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessageHandlingException;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.support.GenericMessage;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+
+/**
+ * @author Artem Bilan
+ *
+ * @since 4.0
+ */
+@SpringJUnitConfig
+@DirtiesContext
+class KinesisMessageHandlerTests {
+
+ @Autowired
+ KinesisAsyncClient amazonKinesis;
+
+ @Autowired
+ MessageChannel kinesisSendChannel;
+
+ @Autowired
+ KinesisMessageHandler kinesisMessageHandler;
+
+ @Test
+ @SuppressWarnings({ "unchecked", "removal" })
+ void kinesisMessageHandler() {
+ final Message> message = MessageBuilder.withPayload("message").build();
+
+ assertThatExceptionOfType(MessageHandlingException.class)
+ .isThrownBy(() -> this.kinesisSendChannel.send(message))
+ .withCauseInstanceOf(IllegalStateException.class)
+ .withStackTraceContaining("'stream' must not be null for sending a Kinesis record");
+
+ this.kinesisMessageHandler.setStream("testStream");
+
+ assertThatExceptionOfType(MessageHandlingException.class)
+ .isThrownBy(() -> this.kinesisSendChannel.send(message))
+ .withCauseInstanceOf(IllegalStateException.class)
+ .withStackTraceContaining("'partitionKey' must not be null for sending a Kinesis record");
+
+ Message> message2 = MessageBuilder.fromMessage(message).setHeader("partitionKey", "testKey")
+ .setHeader("sequenceNumber", "10").setHeader("testHeader", "testValue").build();
+
+ this.kinesisSendChannel.send(message2);
+
+ ArgumentCaptor putRecordRequestArgumentCaptor = ArgumentCaptor
+ .forClass(PutRecordRequest.class);
+
+ verify(this.amazonKinesis).putRecord(putRecordRequestArgumentCaptor.capture());
+
+ PutRecordRequest putRecordRequest = putRecordRequestArgumentCaptor.getValue();
+
+ assertThat(putRecordRequest.streamName()).isEqualTo("testStream");
+ assertThat(putRecordRequest.partitionKey()).isEqualTo("testKey");
+ assertThat(putRecordRequest.sequenceNumberForOrdering()).isEqualTo("10");
+ assertThat(putRecordRequest.explicitHashKey()).isNull();
+
+ Message> messageToCheck = new org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper()
+ .toMessage(putRecordRequest.data().asByteArray());
+
+ assertThat(messageToCheck.getHeaders()).contains(entry("testHeader", "testValue"));
+ assertThat(messageToCheck.getPayload()).isEqualTo("message".getBytes());
+
+ message2 = new GenericMessage<>(PutRecordsRequest.builder().streamName("myStream")
+ .records(request -> request.data(SdkBytes.fromByteArray("test".getBytes())).partitionKey("testKey"))
+ .build());
+
+ this.kinesisSendChannel.send(message2);
+
+ ArgumentCaptor putRecordsRequestArgumentCaptor = ArgumentCaptor
+ .forClass(PutRecordsRequest.class);
+ verify(this.amazonKinesis).putRecords(putRecordsRequestArgumentCaptor.capture());
+
+ PutRecordsRequest putRecordsRequest = putRecordsRequestArgumentCaptor.getValue();
+
+ assertThat(putRecordsRequest.streamName()).isEqualTo("myStream");
+ assertThat(putRecordsRequest.records()).containsExactlyInAnyOrder(PutRecordsRequestEntry.builder()
+ .data(SdkBytes.fromByteArray("test".getBytes())).partitionKey("testKey").build());
+ }
+
+ @Configuration
+ @EnableIntegration
+ static class ContextConfiguration {
+
+ @Bean
+ @SuppressWarnings("unchecked")
+ KinesisAsyncClient amazonKinesis() {
+ KinesisAsyncClient mock = mock();
+
+ given(mock.putRecord(any(PutRecordRequest.class))).willReturn(mock(CompletableFuture.class));
+
+ given(mock.putRecords(any(PutRecordsRequest.class))).willReturn(mock(CompletableFuture.class));
+
+ return mock;
+ }
+
+ @Bean
+ @ServiceActivator(inputChannel = "kinesisSendChannel")
+ @SuppressWarnings("removal")
+ MessageHandler kinesisMessageHandler() {
+ KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis());
+ kinesisMessageHandler.setAsync(true);
+ kinesisMessageHandler.setPartitionKeyExpression(
+ new FunctionExpression>(m -> m.getHeaders().get("partitionKey")));
+ kinesisMessageHandler.setSequenceNumberExpression(
+ new FunctionExpression>(m -> m.getHeaders().get("sequenceNumber")));
+ kinesisMessageHandler.setMessageConverter(new MessageConverter() {
+
+ private SerializingConverter serializingConverter = new SerializingConverter();
+
+ @Override
+ public Object fromMessage(Message> message, Class> targetClass) {
+ Object source = message.getPayload();
+ if (source instanceof String) {
+ return ((String) source).getBytes();
+ }
+ else {
+ return this.serializingConverter.convert(source);
+ }
+ }
+
+ @Override
+ public Message> toMessage(Object payload, MessageHeaders headers) {
+ return null;
+ }
+
+ });
+ kinesisMessageHandler.setEmbeddedHeadersMapper(
+ new org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper("testHeader"));
+ return kinesisMessageHandler;
+ }
+
+ }
+
+}
diff --git a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisProducingMessageHandlerTests.java b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisProducingMessageHandlerTests.java
new file mode 100644
index 000000000..8de38c802
--- /dev/null
+++ b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisProducingMessageHandlerTests.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.kinesis.integration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.CompletableFuture;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.serializer.support.SerializingConverter;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.QueueChannel;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.expression.FunctionExpression;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessageHandlingException;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.PollableChannel;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+
+/**
+ * @author Jacob Severson
+ * @author Artem Bilan
+ *
+ * @since 4.0
+ */
+@SpringJUnitConfig
+@DirtiesContext
+class KinesisProducingMessageHandlerTests {
+
+ @Autowired
+ MessageChannel kinesisSendChannel;
+
+ @Autowired
+ KinesisMessageHandler kinesisMessageHandler;
+
+ @Autowired
+ PollableChannel errorChannel;
+
+ @Autowired
+ PollableChannel successChannel;
+
+ @Test
+ void kinesisMessageHandler() {
+ final Message> message = MessageBuilder.withPayload("message").setErrorChannel(this.errorChannel).build();
+
+ assertThatExceptionOfType(MessageHandlingException.class)
+ .isThrownBy(() -> this.kinesisSendChannel.send(message))
+ .withCauseInstanceOf(IllegalStateException.class)
+ .withStackTraceContaining("'stream' must not be null for sending a Kinesis record");
+
+ this.kinesisMessageHandler.setStream("testStream");
+
+ assertThatExceptionOfType(MessageHandlingException.class)
+ .isThrownBy(() -> this.kinesisSendChannel.send(message))
+ .withCauseInstanceOf(IllegalStateException.class)
+ .withStackTraceContaining("'partitionKey' must not be null for sending a Kinesis record");
+
+ Message> message2 = MessageBuilder.fromMessage(message).setHeader("partitionKey", "someKey")
+ .setHeader("sequenceNumber", "10").build();
+
+ this.kinesisSendChannel.send(message2);
+
+ Message> success = this.successChannel.receive(10000);
+ assertThat(success.getHeaders()).containsEntry("partitionKey", "someKey").containsEntry("sequenceNumber", "10")
+ .containsKeys(KinesisHeaders.SHARD, KinesisHeaders.SEQUENCE_NUMBER);
+ assertThat(success.getPayload()).isEqualTo("message");
+
+ message2 = MessageBuilder.fromMessage(message).setHeader("partitionKey", "someKey")
+ .setHeader("sequenceNumber", "10").build();
+
+ this.kinesisSendChannel.send(message2);
+
+ Message> failed = this.errorChannel.receive(10000);
+ MessageHandlingException putRecordFailure = (MessageHandlingException) failed.getPayload();
+ assertThat(putRecordFailure.getCause().getMessage()).isEqualTo("putRecordRequestEx");
+ assertThat(putRecordFailure.getFailedMessage()).isSameAs(message2);
+
+ PutRecordsRequestEntry testRecordEntry = PutRecordsRequestEntry.builder().data(SdkBytes.fromUtf8String("test"))
+ .partitionKey("testKey").build();
+
+ message2 = MessageBuilder
+ .withPayload(PutRecordsRequest.builder().streamName("myStream").records(testRecordEntry).build())
+ .setErrorChannel(this.errorChannel).build();
+
+ this.kinesisSendChannel.send(message2);
+
+ success = this.successChannel.receive(10000);
+ assertThat(((PutRecordsRequest) success.getPayload()).records()).containsExactlyInAnyOrder(testRecordEntry);
+
+ this.kinesisSendChannel.send(message2);
+
+ failed = this.errorChannel.receive(10000);
+ MessageHandlingException putRecordsFailure = (MessageHandlingException) failed.getPayload();
+ assertThat(putRecordsFailure.getCause().getMessage()).isEqualTo("putRecordsRequestEx");
+ assertThat(putRecordsFailure.getFailedMessage()).isSameAs(message2);
+ }
+
+ @Configuration
+ @EnableIntegration
+ static class ContextConfiguration {
+
+ @Bean
+ KinesisAsyncClient amazonKinesis() {
+ KinesisAsyncClient mock = mock();
+
+ given(mock.putRecord(any(PutRecordRequest.class))).willAnswer(invocation -> {
+ PutRecordRequest request = invocation.getArgument(0);
+ PutRecordResponse.Builder result = PutRecordResponse.builder()
+ .sequenceNumber(request.sequenceNumberForOrdering()).shardId("shardId-1");
+ return CompletableFuture.completedFuture(result.build());
+ }).willAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("putRecordRequestEx")));
+
+ given(mock.putRecords(any(PutRecordsRequest.class)))
+ .willAnswer(invocation -> CompletableFuture.completedFuture(PutRecordsResponse.builder().build()))
+ .willAnswer(
+ invocation -> CompletableFuture.failedFuture(new RuntimeException("putRecordsRequestEx")));
+
+ return mock;
+ }
+
+ @Bean
+ PollableChannel errorChannel() {
+ return new QueueChannel();
+ }
+
+ @Bean
+ PollableChannel successChannel() {
+ return new QueueChannel();
+ }
+
+ @Bean
+ @ServiceActivator(inputChannel = "kinesisSendChannel")
+ MessageHandler kinesisMessageHandler() {
+ KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis());
+ kinesisMessageHandler.setAsync(true);
+ kinesisMessageHandler.setPartitionKeyExpression(
+ new FunctionExpression>(m -> m.getHeaders().get("partitionKey")));
+ kinesisMessageHandler.setSequenceNumberExpression(
+ new FunctionExpression>(m -> m.getHeaders().get("sequenceNumber")));
+ kinesisMessageHandler.setOutputChannel(successChannel());
+ kinesisMessageHandler.setMessageConverter(new MessageConverter() {
+
+ private SerializingConverter serializingConverter = new SerializingConverter();
+
+ @Override
+ public Object fromMessage(Message> message, Class> targetClass) {
+ Object source = message.getPayload();
+ if (source instanceof String) {
+ return ((String) source).getBytes();
+ }
+ else {
+ return this.serializingConverter.convert(source);
+ }
+ }
+
+ @Override
+ public Message> toMessage(Object payload, MessageHeaders headers) {
+ return null;
+ }
+
+ });
+ return kinesisMessageHandler;
+ }
+
+ }
+
+}
diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis/pom.xml
new file mode 100644
index 000000000..901400663
--- /dev/null
+++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis/pom.xml
@@ -0,0 +1,35 @@
+
+
+
+ spring-cloud-aws
+ io.awspring.cloud
+ 4.0.0-SNAPSHOT
+ ../../pom.xml
+
+ 4.0.0
+
+ Spring Cloud AWS Starter for Spring Integration with Kinesis
+ spring-cloud-aws-starter-integration-kinesis
+
+
+
+ io.awspring.cloud
+ spring-cloud-aws-starter
+
+
+ io.awspring.cloud
+ spring-cloud-aws-kinesis
+
+
+ software.amazon.awssdk
+ kinesis
+
+
+ org.springframework.integration
+ spring-integration-core
+
+
+
+