Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ include::core.adoc[]

include::dynamodb.adoc[]

include::kinesis.adoc[]

include::s3.adoc[]

include::ses.adoc[]
Expand Down
50 changes: 50 additions & 0 deletions docs/src/main/asciidoc/kinesis.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
[#spring-cloud-aws-kinesis]
== Kinesis Integration

The https://aws.amazon.com/kinesis/[Kinesis] is a platform for streaming data on AWS, making it easy to load and analyze streaming data and also providing the ability for you to build custom streaming data applications for specialized needs.

// TODO: auto-configuration

=== Spring Integration Support

Also, starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] channel adapters for Amazon Kinesis.

The `KinesisMessageHandler` is an `AbstractMessageHandler` to perform put record(s) to the Kinesis stream.
The stream, partition key (or explicit hash key) and sequence number can be determined against a request message via evaluation provided expressions or can be specified statically.
They also can be specified as `KinesisHeaders.STREAM`, `KinesisHeaders.PARTITION_KEY` and `KinesisHeaders.SEQUENCE_NUMBER` respectively.

The `KinesisMessageHandler` can be configured with the `outputChannel` for sending a `Message` on successful put operation.
The payload is the original request and additional `KinesisHeaders.SHARD` and `KinesisHeaders.SEQUENCE_NUMBER` headers are populated from the `PutRecordResposne`.
If the request payload is a `PutRecordsRequest`, the full `PutRecordsResponse` is populated in the `KinesisHeaders.SERVICE_RESULT` header instead.

When an async failure is happened on the put operation, the `ErrorMessage` is sent to the `errorChannel` header or global one.
The payload is an `MessageHandlingException`.

The `payload` of request message can be:

- `PutRecordsRequest` to perform `KinesisAsyncClient.putRecords`
- `PutRecordRequest` to perform `KinesisAsyncClient.putRecord`
- `ByteBuffer` to represent data of the `PutRecordRequest`
- `byte[]` which is wrapped to the `ByteBuffer`
- any other type that is converted to the `byte[]` by the provided `Converter`; the `SerializingConverter` is used by default.

The Java Configuration for the message handler is:

[source,java]
----
@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public MessageHandler kinesisMessageHandler(KinesisAsyncClient amazonKinesis,
MessageChannel channel) {
KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
kinesisMessageHandler.setPartitionKey("1");
kinesisMessageHandler.setOutputChannel(channel);
return kinesisMessageHandler;
}
----

The Kinesis service does not provide a "headers"(attributes) abstraction, so the `KinesisMessageHandler` can be configured with the `OutboundMessageMapper` to embed message headers into the record data alongside the payload.
See `EmbeddedHeadersJsonMessageMapper` implementation for more information.

The Spring Integration dependency in the `spring-cloud-aws-kinesis` module is `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
For convenience, a dedicated `spring-cloud-aws-starter-integration-kinesis` is provided managing all the required dependencies for Spring Integration support with a classical Amazon Kinesis client.
4 changes: 3 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
<module>spring-cloud-aws-sns</module>
<module>spring-cloud-aws-sqs</module>
<module>spring-cloud-aws-dynamodb</module>
<module>spring-cloud-aws-kinesis</module>
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis</module>
<module>spring-cloud-aws-s3</module>
<module>spring-cloud-aws-testcontainers</module>
<module>spring-cloud-aws-starters/spring-cloud-aws-starter</module>
Expand All @@ -60,7 +62,7 @@
<module>spring-cloud-aws-test</module>
<module>spring-cloud-aws-modulith</module>
<module>docs</module>
</modules>
</modules>

<dependencyManagement>
<dependencies>
Expand Down
23 changes: 23 additions & 0 deletions spring-cloud-aws-dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
<properties>
<spotless.version>2.31.0</spotless.version>
<awssdk-v2.version>2.32.28</awssdk-v2.version>
<kcl.version>3.1.2</kcl.version>
<kpl.version>1.0.4</kpl.version>
<amazon.dax.version>2.0.5</amazon.dax.version>
<amazon.encryption.s3.version>3.3.5</amazon.encryption.s3.version>
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
Expand Down Expand Up @@ -111,6 +113,16 @@
<artifactId>spring-cloud-aws-dynamodb</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-kinesis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-integration-kinesis</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.awspring.cloud</groupId>
Expand Down Expand Up @@ -237,6 +249,17 @@
<artifactId>jakarta.mail</artifactId>
<version>${eclipse.jakarta.mail.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>${kcl.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>${kpl.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
55 changes: 55 additions & 0 deletions spring-cloud-aws-kinesis/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>4.0.0-SNAPSHOT</version>
</parent>

<artifactId>spring-cloud-aws-kinesis</artifactId>
<name>Spring Cloud AWS Kinesis Integration</name>

<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>



</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.kinesis.integration;

import org.springframework.core.convert.converter.Converter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;

/**
* A simple {@link MessageConverter} that delegates to a {@link Converter}.
*
* @author Artem Bilan
*
* @since 4.0
*/
record ConvertingFromMessageConverter(Converter<Object, ?> delegate) implements MessageConverter {

@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
return this.delegate.convert(message.getPayload());
}

@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.kinesis.integration;

/**
* Constants for Kinesis message headers.
*
* @author Artem Bilan
*
* @since 4.0
*/
public final class KinesisHeaders {

/**
* Kinesis headers prefix to be used by all headers added by the framework.
*/
public static final String PREFIX = "Kinesis_";

/**
* The {@value STREAM} header for sending data to Kinesis.
*/
public static final String STREAM = PREFIX + "stream";

/**
* The {@value RECEIVED_STREAM} header for receiving data from Kinesis.
*/
public static final String RECEIVED_STREAM = PREFIX + "receivedStream";

/**
* The {@value PARTITION_KEY} header for sending data to Kinesis.
*/
public static final String PARTITION_KEY = PREFIX + "partitionKey";

/**
* The {@value SEQUENCE_NUMBER} header for sending data to Kinesis.
*/
public static final String SEQUENCE_NUMBER = PREFIX + "sequenceNumber";

/**
* The {@value SHARD} header to represent Kinesis shardId.
*/
public static final String SHARD = PREFIX + "shard";

/**
* The {@value SERVICE_RESULT} header represents a
* {@link software.amazon.awssdk.services.kinesis.model.KinesisResponse}.
*/
public static final String SERVICE_RESULT = PREFIX + "serviceResult";

/**
* The {@value RECEIVED_PARTITION_KEY} header for receiving data from Kinesis.
*/
public static final String RECEIVED_PARTITION_KEY = PREFIX + "receivedPartitionKey";

/**
* The {@value RECEIVED_SEQUENCE_NUMBER} header for receiving data from Kinesis.
*/
public static final String RECEIVED_SEQUENCE_NUMBER = PREFIX + "receivedSequenceNumber";

/**
* The {@value CHECKPOINTER} header for checkpoint the shard sequenceNumber.
*/
public static final String CHECKPOINTER = PREFIX + "checkpointer";

/**
* The {@value RAW_RECORD} header represents received Kinesis record(s).
*/
public static final String RAW_RECORD = PREFIX + "rawRecord";

private KinesisHeaders() {
}

}
Loading