diff --git a/demos/roms-streams/README.md b/demos/roms-streams/README.md
new file mode 100644
index 000000000..cf0140b9a
--- /dev/null
+++ b/demos/roms-streams/README.md
@@ -0,0 +1,333 @@
+# Redis Streams Consumer Framework
+
+This framework provides automatic bean creation for Redis Stream consumers using Spring annotations.
+
+## Overview
+
+The framework consists of two main annotations:
+- `@EnableRedisStreams`: Enables the automatic scanning and bean creation for Redis Stream consumers
+- `@RedisStreamConsumer`: Marks a class as a Redis Stream consumer with specific configuration
+
+## Quick Start
+
+### 1. Enable Redis Streams
+
+Add the `@EnableRedisStreams` annotation to your configuration class:
+
+```java
+@Configuration
+@EnableRedisStreams(basePackages = "com.redis.om.streams.consumer")
+public class RedisStreamsConfiguration {
+ // Your configuration
+}
+```
+
+### 2. Create a Consumer
+
+Create a class that extends `RedisStreamsConsumer` and annotate it with `@RedisStreamConsumer`:
+
+```java
+@RedisStreamConsumer(
+ topicName = "myTopic",
+ groupName = "myGroup",
+ consumerName = "myConsumer",
+ autoAck = false,
+ cluster = false
+)
+public class MyRedisStreamsConsumer extends RedisStreamsConsumer {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Scheduled(fixedDelayString = "${redis.streams.fixed-delay:1000}")
+ public boolean process() {
+ TopicEntry topicEntry = consume();
+ if (topicEntry != null) {
+ logger.info("{} processing topic: {}", getClass().getSimpleName(), topicEntry);
+ return true;
+ }
+ return false;
+ }
+}
+```
+
+## Annotation Configuration
+
+### @EnableRedisStreams
+
+| Attribute | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `basePackages` | String[] | `{"com.redis.om.streams"}` | Base packages to scan for `@RedisStreamConsumer` annotated classes |
+| `value` | String[] | `{}` | Alias for `basePackages` |
+
+### @RedisStreamConsumer
+
+| Attribute | Type | Default | Description |
+|-----------|------|---------|----------------------------------------------------------------|
+| `topicName` | String | **Required** | Name of the Redis Stream topic |
+| `groupName` | String | **Required** | Name of the consumer group |
+| `consumerName` | String | `""` | Name of the consumer within the group |
+| `autoAck` | boolean | `false` | Whether the consumer can acknowledge messages |
+| `cluster` | boolean | `false` | Whether to use cluster mode |
+
+## Consumer Types
+
+The framework automatically creates different types of consumer groups based on the annotation configuration:
+
+### 1. No Acknowledgment Consumer (default)
+```java
+@RedisStreamConsumer(
+ topicName = "topic",
+ groupName = "group",
+ autoAck = false,
+ cluster = false
+)
+```
+Creates: `NoAckConsumerGroup`
+
+### 2. Acknowledgment Consumer
+```java
+@RedisStreamConsumer(
+ topicName = "topic",
+ groupName = "group",
+ autoAck = true,
+ cluster = false
+)
+```
+Creates: `ConsumerGroup`
+
+### 3. Cluster Consumer
+```java
+@RedisStreamConsumer(
+ topicName = "topic",
+ groupName = "group",
+ autoAck = true,
+ cluster = true
+)
+```
+Creates: `SingleClusterPelConsumerGroup`
+
+## Automatic Bean Creation
+
+When you use `@EnableRedisStreams`, the framework automatically creates the following beans for each consumer:
+
+1. **SerialTopicConfig**: Configuration for the topic
+2. **TopicManager**: Manages the Redis Stream topic
+3. **ConsumerGroup**: The appropriate consumer group based on configuration
+4. **Consumer Class**: The annotated consumer class itself
+
+### Bean Naming Convention
+
+- `SerialTopicConfig`: `{topicName}SerialTopicConfig`
+- `TopicManager`: `{topicName}TopicManager` (unique per topic, shared between consumers of the same topic)
+- `ConsumerGroup`: `{groupName}ConsumerGroup` or `{groupName}NoAckConsumerGroup` or `{groupName}SingleClusterPelConsumerGroup` (unique per group, shared between consumers of the same group)
+- `Consumer Class`: `{className}` (uncapitalized)
+
+## Requirements
+
+### Method Requirements
+
+Every consumer class must have a `process()` method annotated with `@Scheduled`:
+
+```java
+@Scheduled(fixedDelayString = "${redis.streams.fixed-delay:1000}")
+public boolean process() {
+ // Your processing logic here
+ TopicEntry topicEntry = consume();
+ // Process the message
+ return acknowledge(topicEntry); // or return true/false
+}
+```
+
+### Dependencies
+
+Make sure you have the following dependencies in your `pom.xml`:
+
+```xml
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ com.redis.om
+ redis-om-spring
+ 1.0.0-RC3
+
+
+ org.projectlombok
+ lombok
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+```
+
+### Configuration
+
+Ensure that `@EnableRedisStreams` is enabled in your configuration:
+
+```java
+@Configuration
+@EnableRedisStreams(basePackages = "com.redis.om.streams.consumer")
+public class RedisStreamsConfiguration {
+ // Your configuration
+}
+```
+
+## Examples
+
+### Example 1: Basic Consumer with Consumer Name
+```java
+@RedisStreamConsumer(topicName = "topicFoo", groupName = "groupFoo", consumerName = "Foo")
+public class FooRedisStreamsConsumer extends RedisStreamsConsumer {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Scheduled(fixedDelayString = "${redis.streams.fixed-delay:1000}")
+ public boolean process() {
+ TopicEntry topicEntry = consume();
+ if (topicEntry != null) {
+ logger.info("{} processing topic: {}", getClass().getSimpleName(), topicEntry);
+ }
+ return true;
+ }
+}
+```
+
+### Example 2: Acknowledgment Consumer
+```java
+@RedisStreamConsumer(
+ topicName = "topicFoo",
+ groupName = "groupFoo",
+ autoAck = true
+)
+public class AckRedisStreamsConsumer extends RedisStreamsConsumer {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Scheduled(fixedDelayString = "${redis.streams.fixed-delay:1000}")
+ public boolean process() {
+ TopicEntry topicEntry = consume();
+ if (topicEntry != null) {
+ logger.info("{} processing topic: {}", getClass().getSimpleName(), topicEntry);
+ return acknowledge(topicEntry);
+ }
+ return false;
+ }
+}
+```
+
+### Example 3: No-Ack Consumer (Explicit)
+```java
+@RedisStreamConsumer(
+ topicName = "topicFoo",
+ groupName = "groupFoo",
+ autoAck = false
+)
+public class NoAckFooRedisStreamsConsumer extends RedisStreamsConsumer {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Scheduled(fixedDelayString = "${redis.streams.fixed-delay:1000}")
+ public boolean process() {
+ TopicEntry topicEntry = consume();
+ if (topicEntry != null) {
+ logger.info("{} processing topic: {}", getClass().getSimpleName(), topicEntry);
+ }
+ return true;
+ }
+}
+```
+
+## Configuration Properties
+
+Configure your application in `application.properties`:
+
+```properties
+# Server Configuration
+server.port=8080
+spring.application.name=redis-om-spring-streams
+
+# Spring Data Redis Configuration
+spring.data.redis.host=localhost
+spring.data.redis.port=6379
+spring.data.redis.username=
+spring.data.redis.password=
+
+# Redis Streams Configuration
+redis.streams.fixed-delay=5000
+```
+
+## Logging
+
+The framework provides detailed logging for bean creation and consumer operations. You can configure logging levels in your `application.properties`:
+
+```properties
+logging.level.com.redis.om.streams.config=INFO
+logging.level.com.redis.om.streams.consumer=INFO
+```
+
+## Error Handling
+
+The framework handles various error scenarios:
+
+- **ClassNotFoundException**: Logs error and continues with other consumers
+- **InvalidTopicException**: Throws IllegalStateException during TopicManager creation
+- **TopicNotFoundException**: Handled by individual consumers
+- **InvalidMessageException**: Handled by producers
+- **ProducerTimeoutException**: Handled by producers
+
+## Best Practices
+
+1. **Package Organization**: Keep your consumers in dedicated packages for better organization
+2. **Bean Naming**: Use descriptive topic and group names to avoid conflicts
+3. **Error Handling**: Implement proper error handling in your `process()` methods
+4. **Logging**: Use appropriate log levels for debugging and monitoring
+5. **Configuration**: Use environment-specific configurations for different deployment environments
+6. **Scheduling**: Use configurable delays with `fixedDelayString` for easy tuning
+
+## Troubleshooting
+
+### Common Issues
+
+1. **No beans created**: Check that `@EnableRedisStreams` is properly configured and base packages are correct
+2. **Scheduling not working**: Ensure `@EnableScheduling` is enabled in your configuration
+3. **Redis connection issues**: Verify Redis connection configuration in `application.properties`
+4. **Bean conflicts**: Check for duplicate bean names, especially with topic configurations
+5. **Message production issues**: Verify that the `Producer` bean is properly configured
+
+### Debug Mode
+
+Enable debug logging to see detailed bean creation information:
+
+```properties
+logging.level.com.redis.om.streams=DEBUG
+```
+
+## Sample project structure for this demo
+
+```
+src/main/java/com/redis/om/streams/
+├── config/
+│ └── RedisStreamsConfiguration.java
+├── consumer/
+│ ├── AckRedisStreamsConsumer.java
+│ ├── FooRedisStreamsConsumer.java
+│ └── NoAckFooRedisStreamsConsumer.java
+├── controller/
+│ └── StreamsController.java
+├── model/
+│ └── TextData.java
+└── DemoApplication.java
+```
+
+## Running the Application
+
+1. Start Redis server
+2. Run the Spring Boot application
+3. Use the REST endpoints to produce messages
+4. Watch the consumer logs to see message processing
\ No newline at end of file
diff --git a/demos/roms-streams/build.gradle b/demos/roms-streams/build.gradle
new file mode 100644
index 000000000..eb2eec42c
--- /dev/null
+++ b/demos/roms-streams/build.gradle
@@ -0,0 +1,70 @@
+plugins {
+ id 'org.springframework.boot' version '3.4.5'
+ // id 'io.spring.dependency-management' version '1.1.7'
+ id 'java'
+}
+
+java {
+ toolchain {
+ languageVersion = JavaLanguageVersion.of(21)
+ }
+}
+
+compileJava.options.encoding = 'UTF-8'
+compileTestJava.options.encoding = 'UTF-8'
+
+// Optional: Skip deploy if using a publishing step
+tasks.withType(PublishToMavenRepository).configureEach {
+ enabled = false
+}
+
+repositories {
+ mavenLocal()
+ mavenCentral()
+ maven {
+ name = 'Spring Milestones'
+ url = 'https://repo.spring.io/milestone'
+ }
+ maven {
+ name = 'Spring Snapshots'
+ url = 'https://repo.spring.io/snapshot'
+ }
+}
+
+dependencies {
+ // Spring
+ implementation 'org.springframework.boot:spring-boot-starter-web'
+
+ // Redis OM for Spring (RC version)
+ implementation project(':redis-om-spring')
+
+ // Jedis client
+ implementation 'redis.clients:jedis:5.2.0'
+
+ // Jackson
+ implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.3'
+
+ // Faker
+ implementation('com.github.javafaker:javafaker:1.0.2') {
+ exclude group: 'org.yaml', module: 'snakeyaml'
+ }
+
+ // Lombok
+ compileOnly 'org.projectlombok:lombok:1.18.36'
+ annotationProcessor 'org.projectlombok:lombok:1.18.36'
+
+ // Test
+ testImplementation 'org.springframework.boot:spring-boot-starter-test:3.4.5'
+ testImplementation 'org.mockito:mockito-core:5.14.2'
+ testImplementation 'org.testcontainers:testcontainers:1.20.4'
+ testImplementation 'org.testcontainers:junit-jupiter:1.20.4'
+ testImplementation 'com.redis:testcontainers-redis:2.2.4'
+}
+
+tasks.withType(JavaCompile).configureEach {
+ options.encoding = 'UTF-8'
+}
+
+test {
+ useJUnitPlatform()
+}
diff --git a/demos/roms-streams/src/main/java/com/redis/om/streams/DemoApplication.java b/demos/roms-streams/src/main/java/com/redis/om/streams/DemoApplication.java
new file mode 100644
index 000000000..7456c1d50
--- /dev/null
+++ b/demos/roms-streams/src/main/java/com/redis/om/streams/DemoApplication.java
@@ -0,0 +1,13 @@
+package com.redis.om.streams;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class DemoApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(DemoApplication.class, args);
+ }
+
+}
diff --git a/demos/roms-streams/src/main/java/com/redis/om/streams/config/RedisStreamsConfiguration.java b/demos/roms-streams/src/main/java/com/redis/om/streams/config/RedisStreamsConfiguration.java
new file mode 100644
index 000000000..066880aee
--- /dev/null
+++ b/demos/roms-streams/src/main/java/com/redis/om/streams/config/RedisStreamsConfiguration.java
@@ -0,0 +1,72 @@
+package com.redis.om.streams.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+
+import com.redis.om.streams.Producer;
+import com.redis.om.streams.annotation.EnableRedisStreams;
+import com.redis.om.streams.command.serial.TopicProducer;
+
+import jakarta.annotation.PostConstruct;
+import redis.clients.jedis.JedisPooled;
+
+@Configuration
+@EnableRedisStreams(
+ basePackages = "com.redis.om.streams.consumer"
+)
+public class RedisStreamsConfiguration {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Value(
+ "${spring.data.redis.host}"
+ )
+ private String host;
+ @Value(
+ "${spring.data.redis.port}"
+ )
+ private int port;
+ @Value(
+ "${spring.data.redis.username}"
+ )
+ private String username;
+ @Value(
+ "${spring.data.redis.password}"
+ )
+ private String password;
+
+ @PostConstruct
+ private void init() {
+ logger.info("{} init", getClass().getSimpleName());
+ }
+
+ @Bean
+ public JedisConnectionFactory redisConnectionFactory() {
+ RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
+ redisStandaloneConfiguration.setHostName(host);
+ redisStandaloneConfiguration.setPort(port);
+ redisStandaloneConfiguration.setUsername(username);
+ redisStandaloneConfiguration.setPassword(password);
+ JedisConnectionFactory jediConnectionFactory = new JedisConnectionFactory(redisStandaloneConfiguration);
+ jediConnectionFactory.setConvertPipelineAndTxResults(false);
+ return jediConnectionFactory;
+ }
+
+ @Bean
+ public JedisPooled jedisPooled() {
+ logger.info("Creating JedisPooled");
+ return new JedisPooled(host, port);
+ }
+
+ @Bean
+ public Producer topicProducer(JedisPooled jedisPooled) {
+ logger.info("Creating TopicProducer");
+ return new TopicProducer(jedisPooled, "topicFoo");
+ }
+
+}
\ No newline at end of file
diff --git a/demos/roms-streams/src/main/java/com/redis/om/streams/consumer/AckRedisStreamsConsumer.java b/demos/roms-streams/src/main/java/com/redis/om/streams/consumer/AckRedisStreamsConsumer.java
new file mode 100644
index 000000000..60aac6e5e
--- /dev/null
+++ b/demos/roms-streams/src/main/java/com/redis/om/streams/consumer/AckRedisStreamsConsumer.java
@@ -0,0 +1,35 @@
+package com.redis.om.streams.consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+
+import com.redis.om.streams.TopicEntry;
+import com.redis.om.streams.annotation.RedisStreamConsumer;
+
+import jakarta.annotation.PostConstruct;
+
+@RedisStreamConsumer(
+ topicName = "topicFoo", groupName = "groupFoo", autoAck = true
+)
+public class AckRedisStreamsConsumer extends RedisStreamsConsumer {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @PostConstruct
+ private void init() {
+ logger.info("{} init", getClass().getSimpleName());
+ }
+
+ @Scheduled(
+ fixedDelayString = "${redis.streams.fixed-delay:1000}"
+ )
+ public boolean process() {
+ TopicEntry topicEntry = consume();
+ if (topicEntry != null) {
+ logger.info("{} processing topic: {}", getClass().getSimpleName(), topicEntry);
+ return acknowledge(topicEntry);
+ }
+ return false;
+ }
+}
diff --git a/demos/roms-streams/src/main/java/com/redis/om/streams/consumer/FooRedisStreamsConsumer.java b/demos/roms-streams/src/main/java/com/redis/om/streams/consumer/FooRedisStreamsConsumer.java
new file mode 100644
index 000000000..d8376392e
--- /dev/null
+++ b/demos/roms-streams/src/main/java/com/redis/om/streams/consumer/FooRedisStreamsConsumer.java
@@ -0,0 +1,34 @@
+package com.redis.om.streams.consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+
+import com.redis.om.streams.TopicEntry;
+import com.redis.om.streams.annotation.RedisStreamConsumer;
+
+import jakarta.annotation.PostConstruct;
+
+@RedisStreamConsumer(
+ topicName = "topicFoo", groupName = "groupFoo", consumerName = "Foo"
+)
+public class FooRedisStreamsConsumer extends RedisStreamsConsumer {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @PostConstruct
+ private void init() {
+ logger.info("{} init", getClass().getSimpleName());
+ }
+
+ @Scheduled(
+ fixedDelayString = "${redis.streams.fixed-delay:1000}"
+ )
+ public boolean process() {
+ TopicEntry topicEntry = consume();
+ if (topicEntry != null) {
+ logger.info("{} processing topic: {}", getClass().getSimpleName(), topicEntry);
+ }
+ return true;
+ }
+}
diff --git a/demos/roms-streams/src/main/java/com/redis/om/streams/consumer/NoAckFooRedisStreamsConsumer.java b/demos/roms-streams/src/main/java/com/redis/om/streams/consumer/NoAckFooRedisStreamsConsumer.java
new file mode 100644
index 000000000..731ef191e
--- /dev/null
+++ b/demos/roms-streams/src/main/java/com/redis/om/streams/consumer/NoAckFooRedisStreamsConsumer.java
@@ -0,0 +1,34 @@
+package com.redis.om.streams.consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+
+import com.redis.om.streams.TopicEntry;
+import com.redis.om.streams.annotation.RedisStreamConsumer;
+
+import jakarta.annotation.PostConstruct;
+
+@RedisStreamConsumer(
+ topicName = "topicFoo", groupName = "groupFoo", autoAck = false
+)
+public class NoAckFooRedisStreamsConsumer extends RedisStreamsConsumer {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @PostConstruct
+ private void init() {
+ logger.info("{} init", getClass().getSimpleName());
+ }
+
+ @Scheduled(
+ fixedDelayString = "${redis.streams.fixed-delay:1000}"
+ )
+ public boolean process() {
+ TopicEntry topicEntry = consume();
+ if (topicEntry != null) {
+ logger.info("{} processing topic: {}", getClass().getSimpleName(), topicEntry);
+ }
+ return true;
+ }
+}
diff --git a/demos/roms-streams/src/main/java/com/redis/om/streams/controller/StreamsController.java b/demos/roms-streams/src/main/java/com/redis/om/streams/controller/StreamsController.java
new file mode 100644
index 000000000..7e7465a67
--- /dev/null
+++ b/demos/roms-streams/src/main/java/com/redis/om/streams/controller/StreamsController.java
@@ -0,0 +1,116 @@
+package com.redis.om.streams.controller;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.javafaker.Faker;
+import com.redis.om.streams.Producer;
+import com.redis.om.streams.exception.InvalidMessageException;
+import com.redis.om.streams.exception.ProducerTimeoutException;
+import com.redis.om.streams.exception.TopicNotFoundException;
+import com.redis.om.streams.model.TextData;
+
+@RestController
+@RequestMapping(
+ "/api/streams"
+)
+public class StreamsController {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ public static boolean stopLoading = Boolean.FALSE;
+
+ private final Producer producer;
+ private final ObjectMapper objectMapper;
+
+ public StreamsController(Producer producer, ObjectMapper objectMapper) {
+ this.producer = producer;
+ this.objectMapper = objectMapper;
+ }
+
+ private void create(TextData textData) {
+ try {
+ producer.produce(objectMapper.convertValue(textData, new TypeReference<>() {
+ }));
+ } catch (TopicNotFoundException | InvalidMessageException | ProducerTimeoutException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ @GetMapping(
+ path = "/start-load"
+ )
+ public ResponseEntity startLoading() {
+ Faker faker = new Faker();
+ StreamsController.stopLoading = Boolean.FALSE;
+ AtomicInteger created = new AtomicInteger();
+ while (!StreamsController.stopLoading) {
+ TextData textData = TextData.of();
+ try {
+ textData.setId(created.getAndIncrement());
+ textData.setName(faker.dune().character());
+ textData.setDescription(faker.dune().quote());
+ create(textData);
+ showSpinner(created.get());
+ } catch (Exception e) {
+ logger.error("Error while creating new TextData: {}", textData, e);
+ return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
+ }
+ }
+ System.out.print("\r ");
+ System.out.print("\r");
+ return ResponseEntity.ok(created.get());
+ }
+
+ @GetMapping(
+ path = "/start-load/{count}"
+ )
+ public ResponseEntity startLoading(@PathVariable int count) {
+ Faker faker = new Faker();
+ StreamsController.stopLoading = Boolean.FALSE;
+ AtomicInteger created = new AtomicInteger();
+ while (created.get() < count) {
+ TextData textData = TextData.of();
+ try {
+ textData.setId(created.getAndIncrement());
+ textData.setName(faker.dune().character());
+ textData.setDescription(faker.dune().quote());
+ create(textData);
+ showSpinner(created.get());
+ } catch (Exception e) {
+ logger.error("Error while creating new TextData: {}", textData, e);
+ return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
+ }
+ }
+ System.out.print("\r ");
+ System.out.print("\r");
+ return ResponseEntity.ok(created.get());
+ }
+
+ @GetMapping(
+ path = "/stop-load"
+ )
+ public ResponseEntity stopLoading() {
+ StreamsController.stopLoading = true;
+ return ResponseEntity.noContent().build();
+ }
+
+ // private final List wheel = List.of("|", "/", "-", "\\", "|", "/", "-", "\\");
+ // private final String wheel = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏";
+ // private final String wheel = "⠹⠸⠼⠧⠇⠏";
+ private void showSpinner(int count) {
+ String s = String.format("%,d", count);
+ String wheel = "⠁ ⠃ ⠇ ⠧⠷⠿⡿⣟⣯⣷";
+ System.out.printf("\rProgress: " + s + " -> " + wheel.charAt(count % wheel.length()) + " ");
+ }
+}
diff --git a/demos/roms-streams/src/main/java/com/redis/om/streams/model/TextData.java b/demos/roms-streams/src/main/java/com/redis/om/streams/model/TextData.java
new file mode 100644
index 000000000..153c3b20c
--- /dev/null
+++ b/demos/roms-streams/src/main/java/com/redis/om/streams/model/TextData.java
@@ -0,0 +1,14 @@
+package com.redis.om.streams.model;
+
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+
+@Data
+@RequiredArgsConstructor(
+ staticName = "of"
+)
+public class TextData {
+ private int id;
+ private String name;
+ private String description;
+}
diff --git a/demos/roms-streams/src/main/resources/application.properties b/demos/roms-streams/src/main/resources/application.properties
new file mode 100644
index 000000000..dd0628bae
--- /dev/null
+++ b/demos/roms-streams/src/main/resources/application.properties
@@ -0,0 +1,10 @@
+server.port=8080
+spring.application.name=redis-om-spring-streams
+
+# Spring Data Redis Configuration
+spring.data.redis.host=localhost
+spring.data.redis.port=6379
+spring.data.redis.username=
+spring.data.redis.password=
+
+redis.streams.fixed-delay=5000
diff --git a/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/RedisStreamsConsumerTest.java b/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/RedisStreamsConsumerTest.java
new file mode 100644
index 000000000..1bc1994fd
--- /dev/null
+++ b/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/RedisStreamsConsumerTest.java
@@ -0,0 +1,193 @@
+package com.redis.om.streams.consumer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.redis.om.streams.Producer;
+import com.redis.om.streams.TopicEntry;
+import com.redis.om.streams.TopicEntryId;
+import com.redis.om.streams.annotation.RedisStreamConsumer;
+import com.redis.om.streams.command.serial.TopicProducer;
+import com.redis.om.streams.model.TextData;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import redis.clients.jedis.JedisPooled;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+@SpringBootTest
+@Testcontainers
+class RedisStreamsConsumerTest {
+
+ @Container
+ static GenericContainer> redis = new GenericContainer<>(DockerImageName.parse("redis:8.0.2-alpine"))
+ .withExposedPorts(6379);
+
+ @DynamicPropertySource
+ static void redisProperties(DynamicPropertyRegistry registry) {
+ registry.add("spring.data.redis.host", redis::getHost);
+ registry.add("spring.data.redis.port", redis::getFirstMappedPort);
+ registry.add("spring.data.redis.username", () -> "");
+ registry.add("spring.data.redis.password", () -> "");
+ registry.add("redis.streams.fixed-delay", () -> "1000");
+ }
+
+ @Autowired
+ private Test_tFoo_gBar_Ack_NoCluster test_tFoo_gBar_ack_noCluster;
+ @Autowired
+ private Test_tFoo_gBar_NoAck_NoCluster test_tFoo_gBar_noAck_noCluster;
+ @Autowired
+ private Test_tFoo_gFoo_Ack_NoCluster test_tFoo_gFoo_ack_noCluster;
+ @Autowired
+ private Test_tFoo_gFoo_NoAck_NoCluster test_tFoo_gFoo_noAck_noCluster;
+
+ @Autowired
+ private JedisPooled jedisPooled;
+
+ private Producer producerFoo;
+ private Producer producerBar;
+ private ObjectMapper objectMapper;
+
+ @BeforeEach
+ void setUp() {
+ producerFoo = new TopicProducer(jedisPooled, "topicFoo");
+ producerBar = new TopicProducer(jedisPooled, "topicFoo");
+ objectMapper = new ObjectMapper();
+ }
+
+ @Test
+ void testConsumerInitialization() {
+ assertNotNull(test_tFoo_gBar_ack_noCluster, "Consumer should be initialized");
+ assertNotNull(test_tFoo_gBar_noAck_noCluster, "Consumer should be initialized");
+ assertNotNull(test_tFoo_gFoo_ack_noCluster, "Consumer should be initialized");
+ assertNotNull(test_tFoo_gFoo_noAck_noCluster, "Consumer should be initialized");
+ }
+
+ @Test
+ void testProduce_tFoo_gFoo_noAck_noCluster() throws Exception {
+ TextData textData = TextData.of();
+ textData.setId(1);
+ textData.setName("Test Name 1");
+ textData.setDescription("Test Description 1");
+ Map payload = objectMapper.convertValue(textData, new TypeReference<>() {});
+ TopicEntryId topicEntryId = producerFoo.produce(payload);
+ System.out.println(topicEntryId);
+ assertNotNull(topicEntryId, "TopicEntryId should not be null");
+
+ TopicEntry topicEntry = test_tFoo_gFoo_noAck_noCluster.consume();
+ assertNotNull(topicEntry, "TopicEntry should not be null");
+ System.out.println(topicEntry);
+ assert topicEntry.getId().equals(topicEntryId);
+
+ boolean ack = test_tFoo_gFoo_noAck_noCluster.acknowledge(topicEntry);
+ assertFalse(ack, "TopicEntry should not be acknowledged");
+ }
+
+ @Test
+ void testProduce_tFoo_gFoo_ack_noCluster() throws Exception {
+ TextData textData = TextData.of();
+ textData.setId(1);
+ textData.setName("Test Name 1");
+ textData.setDescription("Test Description 1");
+ Map payload = objectMapper.convertValue(textData, new TypeReference<>() {});
+ TopicEntryId topicEntryId = producerFoo.produce(payload);
+ System.out.println(topicEntryId);
+ assertNotNull(topicEntryId, "TopicEntryId should not be null");
+
+ TopicEntry topicEntry = test_tFoo_gFoo_ack_noCluster.consume();
+ assertNotNull(topicEntry, "TopicEntry should not be null");
+ System.out.println(topicEntry);
+ assert topicEntry.getId().equals(topicEntryId);
+
+ boolean ack = test_tFoo_gFoo_ack_noCluster.acknowledge(topicEntry);
+ assertTrue(ack, "TopicEntry should be acknowledged");
+ }
+
+ /**
+ * FIXME:
+ * I produce a message on a topic named topicFoo.
+ * Then I have two consumers of type {@link com.redis.om.streams.command.serial.ConsumerGroup},
+ * one belonging to groupFoo, and the other one belonging to groupBar.
+ * Both consume the exact same message from the exact same topic:
+ *
+ * Consumer for Group Foo:TopicEntry(streamName=__rsj:topic:stream:topicFoo:0, groupName=groupFoo, id=1750954394888-0-0, message={name=Test Name 1, description=Test Description 1, id=1})
+ * Consumer for Group Bar:TopicEntry(streamName=__rsj:topic:stream:topicFoo:0, groupName=groupBar, id=1750954394888-0-0, message={name=Test Name 1, description=Test Description 1, id=1})
+ *
+ *
+ * Now, if I acknowledge the TopicEntry coming from the consumer of groupFoo
+ * with the consumer of groupBar, I would expect an error, an exception, something...
+ * Instead, the TopicEntry gets acknowledged despite the mismatch on the consumer group paternity.
+ */
+// @Test
+ void testAckWrongGroups() throws Exception {
+ TextData textData = TextData.of();
+ textData.setId(1);
+ textData.setName("Test Name 1");
+ textData.setDescription("Test Description 1");
+ Map payload = objectMapper.convertValue(textData, new TypeReference<>() {});
+ TopicEntryId topicEntryId = producerFoo.produce(payload);
+ System.out.println(topicEntryId);
+ assertNotNull(topicEntryId, "TopicEntryId should not be null");
+
+ TopicEntry topicEntryGroupFoo;
+ TopicEntry topicEntryGroupBar;
+ boolean ack;
+ topicEntryGroupFoo = test_tFoo_gFoo_ack_noCluster.consume();
+ System.out.println("Consumer for Group Foo:" + topicEntryGroupFoo);
+ assertNotNull(topicEntryGroupFoo, "TopicEntry should not be null");
+ assert topicEntryGroupFoo.getId().equals(topicEntryId);
+
+ topicEntryGroupBar = test_tFoo_gBar_ack_noCluster.consume();
+ System.out.println("Consumer for Group Bar:" + topicEntryGroupBar);
+ assertNotNull(topicEntryGroupBar, "TopicEntry should not be null");
+ assert topicEntryGroupBar.getId().equals(topicEntryId);
+
+ ack = test_tFoo_gFoo_ack_noCluster.acknowledge(topicEntryGroupBar);
+ assertFalse(ack, "TopicEntry should not be acknowledged because of consumer wrong group");
+ ack = test_tFoo_gBar_ack_noCluster.acknowledge(topicEntryGroupFoo);
+ assertFalse(ack, "TopicEntry should not be acknowledged because of consumer wrong group");
+
+ ack = test_tFoo_gFoo_ack_noCluster.acknowledge(topicEntryGroupBar);
+ assertTrue(ack, "TopicEntry should be acknowledged because of consumer right group");
+ ack = test_tFoo_gBar_ack_noCluster.acknowledge(topicEntryGroupFoo);
+ assertTrue(ack, "TopicEntry should be acknowledged because of consumer right group");
+ }
+
+ @Test
+ void testProduce_tFoo_gFoo_gFoo_ack_noAck_noCluster() throws Exception {
+ TextData textData = TextData.of();
+ textData.setId(1);
+ textData.setName("Test Name 1");
+ textData.setDescription("Test Description 1");
+ Map payload = objectMapper.convertValue(textData, new TypeReference<>() {});
+ TopicEntryId topicEntryId = producerFoo.produce(payload);
+ System.out.println(topicEntryId);
+ assertNotNull(topicEntryId, "TopicEntryId should not be null");
+
+ TopicEntry topicEntry;
+ boolean ack;
+
+ topicEntry = test_tFoo_gFoo_ack_noCluster.consume();
+ System.out.println("Consumer for Group Foo:" + topicEntry);
+ assertNotNull(topicEntry, "TopicEntry should not be null");
+ ack = test_tFoo_gFoo_ack_noCluster.acknowledge(topicEntry);
+ assertTrue(ack, "TopicEntry should be acknowledged");
+
+ topicEntry = test_tFoo_gFoo_noAck_noCluster.consume();
+ System.out.println("Consumer for Group Foo:" + topicEntry);
+ assertNull(topicEntry, "TopicEntry should be null");
+ ack = test_tFoo_gFoo_noAck_noCluster.acknowledge(topicEntry);
+ assertFalse(ack, "TopicEntry should not be acknowledged");
+
+ }
+
+}
\ No newline at end of file
diff --git a/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/Test_tFoo_gBar_Ack_NoCluster.java b/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/Test_tFoo_gBar_Ack_NoCluster.java
new file mode 100644
index 000000000..1ce3d33a6
--- /dev/null
+++ b/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/Test_tFoo_gBar_Ack_NoCluster.java
@@ -0,0 +1,6 @@
+package com.redis.om.streams.consumer;
+
+import com.redis.om.streams.annotation.RedisStreamConsumer;
+
+@RedisStreamConsumer(topicName = "topicFoo", groupName = "groupBar", autoAck = false, consumerName = "", cluster = false)
+public class Test_tFoo_gBar_Ack_NoCluster extends RedisStreamsConsumer {}
\ No newline at end of file
diff --git a/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/Test_tFoo_gBar_NoAck_NoCluster.java b/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/Test_tFoo_gBar_NoAck_NoCluster.java
new file mode 100644
index 000000000..12cc0e988
--- /dev/null
+++ b/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/Test_tFoo_gBar_NoAck_NoCluster.java
@@ -0,0 +1,6 @@
+package com.redis.om.streams.consumer;
+
+import com.redis.om.streams.annotation.RedisStreamConsumer;
+
+@RedisStreamConsumer(topicName = "topicFoo", groupName = "groupBar", autoAck = false, consumerName = "", cluster = false)
+public class Test_tFoo_gBar_NoAck_NoCluster extends RedisStreamsConsumer {}
\ No newline at end of file
diff --git a/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/Test_tFoo_gFoo_Ack_NoCluster.java b/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/Test_tFoo_gFoo_Ack_NoCluster.java
new file mode 100644
index 000000000..daa971764
--- /dev/null
+++ b/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/Test_tFoo_gFoo_Ack_NoCluster.java
@@ -0,0 +1,6 @@
+package com.redis.om.streams.consumer;
+
+import com.redis.om.streams.annotation.RedisStreamConsumer;
+
+@RedisStreamConsumer(topicName = "topicFoo", groupName = "groupFoo", autoAck = true, consumerName = "", cluster = false)
+public class Test_tFoo_gFoo_Ack_NoCluster extends RedisStreamsConsumer {}
\ No newline at end of file
diff --git a/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/Test_tFoo_gFoo_NoAck_NoCluster.java b/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/Test_tFoo_gFoo_NoAck_NoCluster.java
new file mode 100644
index 000000000..0714948e5
--- /dev/null
+++ b/demos/roms-streams/src/test/java/com/redis/om/streams/consumer/Test_tFoo_gFoo_NoAck_NoCluster.java
@@ -0,0 +1,6 @@
+package com.redis.om.streams.consumer;
+
+import com.redis.om.streams.annotation.RedisStreamConsumer;
+
+@RedisStreamConsumer(topicName = "topicFoo", groupName = "groupFoo", autoAck = false, consumerName = "", cluster = false)
+public class Test_tFoo_gFoo_NoAck_NoCluster extends RedisStreamsConsumer {}
\ No newline at end of file
diff --git a/demos/roms-streams/src/test/resources/application-test.properties b/demos/roms-streams/src/test/resources/application-test.properties
new file mode 100644
index 000000000..8d39c43cd
--- /dev/null
+++ b/demos/roms-streams/src/test/resources/application-test.properties
@@ -0,0 +1,14 @@
+# Test Configuration
+spring.application.name=redis-om-spring-streams-test
+
+# Redis Streams Configuration for Tests
+redis.streams.fixed-delay=1000
+
+# Logging Configuration for Tests
+logging.level.com.redis.om.streams=DEBUG
+logging.level.com.redis.om.streams.consumer=DEBUG
+logging.level.com.redis.om.streams.config=DEBUG
+
+# Disable scheduling for tests to avoid interference
+spring.task.scheduling.enabled=false
+spring.main.allow-bean-definition-overriding=true
\ No newline at end of file
diff --git a/demos/roms-streams/src/test/resources/logback-test.xml b/demos/roms-streams/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..e389f02da
--- /dev/null
+++ b/demos/roms-streams/src/test/resources/logback-test.xml
@@ -0,0 +1,11 @@
+
+
+
+
+ %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX} %-5level %logger{36} - [%thread] - %method - %msg%n
+
+
+
+
+
+
\ No newline at end of file
diff --git a/gradle.properties b/gradle.properties
index 588d3b318..e3ab146d0 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -20,3 +20,7 @@ djlStarterVersion = 0.26
djlVersion = 0.30.0
springAiVersion = 1.0.0
azureIdentityVersion = 1.15.4
+
+lettuceCoreVersion = 6.7.1.RELEASE
+lettucemodVersion = 4.2.1
+globVersion = 0.9.0
\ No newline at end of file
diff --git a/redis-om-spring-ai/src/main/java/com/redis/om/spring/AIRedisConfiguration.java b/redis-om-spring-ai/src/main/java/com/redis/om/spring/AIRedisConfiguration.java
index d70f34825..165d69491 100644
--- a/redis-om-spring-ai/src/main/java/com/redis/om/spring/AIRedisConfiguration.java
+++ b/redis-om-spring-ai/src/main/java/com/redis/om/spring/AIRedisConfiguration.java
@@ -404,8 +404,8 @@ public HuggingFaceTokenizer sentenceTokenizer(AIRedisOMProperties properties) {
//noinspection ResultOfMethodCallIgnored
InetAddress.getByName("www.huggingface.co").isReachable(5000);
return HuggingFaceTokenizer.newInstance(properties.getDjl().getSentenceTokenizerModel(), options);
- } catch (IOException ioe) {
- logger.warn("Error retrieving default DJL sentence tokenizer");
+ } catch (IOException | RuntimeException ioe) {
+ logger.warn("Error retrieving default DJL sentence tokenizer: " + ioe.getMessage());
return null;
}
}
diff --git a/redis-om-spring-ai/src/main/java/com/redis/om/spring/AIRedisOMProperties.java b/redis-om-spring-ai/src/main/java/com/redis/om/spring/AIRedisOMProperties.java
index e096347e1..456b2dfd0 100644
--- a/redis-om-spring-ai/src/main/java/com/redis/om/spring/AIRedisOMProperties.java
+++ b/redis-om-spring-ai/src/main/java/com/redis/om/spring/AIRedisOMProperties.java
@@ -232,10 +232,10 @@ public static class Djl {
/**
* Model identifier for sentence embeddings.
- * Default is "sentence-transformers/msmarco-distilbert-dot-v5".
+ * Default is "https://huggingface.co/sentence-transformers/msmarco-distilbert-dot-v5".
*/
@NotNull
- private String sentenceTokenizerModel = "sentence-transformers/msmarco-distilbert-dot-v5";
+ private String sentenceTokenizerModel = "https://huggingface.co/sentence-transformers/msmarco-distilbert-dot-v5";
// face detection
/**
diff --git a/redis-om-spring/build.gradle b/redis-om-spring/build.gradle
index db58f3bfd..337348e04 100644
--- a/redis-om-spring/build.gradle
+++ b/redis-om-spring/build.gradle
@@ -19,4 +19,16 @@ dependencies {
compileOnly "javax.enterprise:cdi-api:${cdi}"
implementation "com.google.auto.service:auto-service:${autoServiceVersion}"
annotationProcessor "com.google.auto.service:auto-service:${autoServiceVersion}"
+
+
+ compileOnly 'org.projectlombok:lombok:1.18.38'
+
+ annotationProcessor 'org.projectlombok:lombok:1.18.38'
+
+ implementation 'org.springframework.session:spring-session-core'
+ implementation "io.lettuce:lettuce-core:$lettuceCoreVersion"
+ implementation "com.redis:lettucemod:$lettucemodVersion"
+ implementation "com.hrakaroo:glob:$globVersion"
+ implementation 'io.micrometer:micrometer-core:1.15.0'
+ implementation 'jakarta.annotation:jakarta.annotation-api:2.1.1'
}
diff --git a/redis-om-spring/src/main/java/com/redis/om/cache/AbstractRedisCacheAccessor.java b/redis-om-spring/src/main/java/com/redis/om/cache/AbstractRedisCacheAccessor.java
new file mode 100644
index 000000000..2d4ad8ff6
--- /dev/null
+++ b/redis-om-spring/src/main/java/com/redis/om/cache/AbstractRedisCacheAccessor.java
@@ -0,0 +1,177 @@
+package com.redis.om.cache;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
+
+import com.redis.lettucemod.api.StatefulRedisModulesConnection;
+
+import io.lettuce.core.ScanArgs;
+import io.lettuce.core.ScanIterator;
+
+/**
+ * Abstract base class for Redis cache accessors that provides common functionality
+ * for interacting with Redis as a cache.
+ */
+abstract class AbstractRedisCacheAccessor implements CacheAccessor {
+
+ /**
+ * Default number of elements to scan in each iteration when cleaning cache entries.
+ */
+ public static final long DEFAULT_SCAN_COUNT = 100;
+
+ /**
+ * The Redis connection used for cache operations.
+ */
+ protected final StatefulRedisModulesConnection connection;
+
+ private long scanCount = DEFAULT_SCAN_COUNT;
+
+ /**
+ * Creates a new {@link AbstractRedisCacheAccessor} with the given connection.
+ *
+ * @param connection the Redis connection, must not be {@literal null}.
+ */
+ AbstractRedisCacheAccessor(StatefulRedisModulesConnection connection) {
+ Assert.notNull(connection, "Connection must not be null");
+ this.connection = connection;
+
+ }
+
+ /**
+ * Converts a String key to a byte array using UTF-8 encoding.
+ *
+ * @param key the key to convert, never {@literal null}.
+ * @return the key as a byte array.
+ */
+ private byte[] convertKey(String key) {
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public Object get(String key, Duration ttl) {
+ return get(convertKey(key), ttl);
+ }
+
+ /**
+ * Retrieves a cached object for the given key, optionally extending its TTL.
+ *
+ * @param key the cache key as byte array, never {@literal null}.
+ * @param ttl the time-to-live to set if the key exists, can be {@literal null}.
+ * @return the cached object or {@literal null} if not found.
+ */
+ protected abstract Object get(byte[] key, Duration ttl);
+
+ @Override
+ public void put(String key, Object value, Duration ttl) {
+ put(convertKey(key), value, ttl);
+ }
+
+ /**
+ * Stores an object in the cache with the given key and TTL.
+ *
+ * @param key the cache key as byte array, never {@literal null}.
+ * @param value the object to cache, can be {@literal null}.
+ * @param ttl the time-to-live for the cached entry, can be {@literal null}.
+ */
+ protected abstract void put(byte[] key, Object value, Duration ttl);
+
+ @Override
+ public Object putIfAbsent(String key, Object value, Duration ttl) {
+ return putIfAbsent(convertKey(key), value, ttl);
+ }
+
+ /**
+ * Stores an object in the cache only if the key does not already exist.
+ *
+ * @param key the cache key as byte array, never {@literal null}.
+ * @param value the object to cache, can be {@literal null}.
+ * @param ttl the time-to-live for the cached entry, can be {@literal null}.
+ * @return the previous value associated with the key, or {@literal null} if there was no value.
+ */
+ protected abstract Object putIfAbsent(byte[] key, Object value, Duration ttl);
+
+ /**
+ * Sets the number of elements to scan in each iteration when cleaning cache entries.
+ *
+ * @param count the number of elements to scan in each iteration
+ */
+ public void setScanCount(long count) {
+ this.scanCount = count;
+ }
+
+ @Override
+ public void remove(String key) {
+ Assert.notNull(key, "Key must not be null");
+ delete(convertKey(key));
+ }
+
+ @Override
+ public long clean(String pattern) {
+ Assert.notNull(pattern, "Pattern must not be null");
+ ScanArgs args = new ScanArgs();
+ args.match(pattern);
+ args.limit(scanCount);
+ ScanIterator scanIterator = ScanIterator.scan(connection.sync(), args);
+ List keys = new ArrayList<>();
+ long count = 0;
+ while (scanIterator.hasNext()) {
+ keys.add(scanIterator.next());
+ if (keys.size() >= scanCount) {
+ count += delete(keys);
+ keys.clear();
+ }
+ }
+ count += delete(keys);
+ return count;
+ }
+
+ /**
+ * Checks if a key exists in Redis.
+ *
+ * @param key the key to check, never {@literal null}.
+ * @return {@literal true} if the key exists, {@literal false} otherwise.
+ */
+ protected boolean exists(byte[] key) {
+ return connection.sync().exists(key) > 0;
+ }
+
+ /**
+ * Deletes multiple keys from Redis.
+ *
+ * @param keys the list of keys to delete, can be empty.
+ * @return the number of keys that were deleted.
+ */
+ private long delete(List keys) {
+ if (CollectionUtils.isEmpty(keys)) {
+ return 0;
+ }
+ return delete(keys.toArray(new byte[0][]));
+ }
+
+ /**
+ * Deletes multiple keys from Redis.
+ *
+ * @param keys the array of keys to delete.
+ * @return the number of keys that were deleted.
+ */
+ private long delete(byte[]... keys) {
+ return connection.sync().del(keys);
+ }
+
+ /**
+ * Determines if the given TTL duration should be applied as an expiration.
+ *
+ * @param ttl the time-to-live duration, can be {@literal null}
+ * @return {@literal true} if the TTL is not null, not zero, and not negative
+ */
+ protected boolean shouldExpireWithin(@Nullable Duration ttl) {
+ return ttl != null && !ttl.isZero() && !ttl.isNegative();
+ }
+
+}
diff --git a/redis-om-spring/src/main/java/com/redis/om/cache/CacheAccessor.java b/redis-om-spring/src/main/java/com/redis/om/cache/CacheAccessor.java
new file mode 100644
index 000000000..b139ef56c
--- /dev/null
+++ b/redis-om-spring/src/main/java/com/redis/om/cache/CacheAccessor.java
@@ -0,0 +1,67 @@
+package com.redis.om.cache;
+
+import java.time.Duration;
+
+import org.springframework.lang.Nullable;
+
+/**
+ * {@link CacheAccessor} provides low-level access to Redis commands
+ * ({@code HSET, HGETALL, EXPIRE,...}) used for caching.
+ *
+ * The {@link CacheAccessor} may be shared by multiple cache implementations and
+ * is responsible for reading/writing binary data from/to Redis. The
+ * implementation honors potential cache lock flags that might be set.
+ */
+public interface CacheAccessor {
+
+ /**
+ * Get the binary value representation from Redis stored for the given key and
+ * set the given {@link Duration TTL expiration} for the cache entry.
+ *
+ * @param key must not be {@literal null}.
+ * @param ttl {@link Duration} specifying the {@literal expiration timeout} for
+ * the cache entry.
+ * @return {@literal null} if key does not exist or has {@literal expired}.
+ */
+ @Nullable
+ Object get(String key, @Nullable Duration ttl);
+
+ /**
+ * Write the given key/value pair to Redis and set the expiration time if
+ * defined.
+ *
+ * @param key The key for the cache entry. Must not be {@literal null}.
+ * @param value The value stored for the key. Must not be {@literal null}.
+ * @param ttl Optional expiration time. Can be {@literal null}.
+ */
+ void put(String key, Object value, @Nullable Duration ttl);
+
+ /**
+ * Write the given value to Redis if the key does not already exist.
+ *
+ * @param key The key for the cache entry. Must not be {@literal null}.
+ * @param value The value stored for the key. Must not be {@literal null}.
+ * @param ttl Optional expiration time. Can be {@literal null}.
+ * @return {@literal null} if the value has been written, the value stored for
+ * the key if it already exists.
+ */
+ @Nullable
+ Object putIfAbsent(String key, Object value, @Nullable Duration ttl);
+
+ /**
+ * Remove the given key from Redis.
+ *
+ * @param key The key for the cache entry. Must not be {@literal null}.
+ */
+ void remove(String key);
+
+ /**
+ * Remove all keys following the given pattern.
+ *
+ * @param pattern The pattern for the keys to remove. Must not be
+ * {@literal null}.
+ * @return number of keys deleted
+ */
+ long clean(String pattern);
+
+}
diff --git a/redis-om-spring/src/main/java/com/redis/om/cache/KeyFunction.java b/redis-om-spring/src/main/java/com/redis/om/cache/KeyFunction.java
new file mode 100644
index 000000000..c4237c0f4
--- /dev/null
+++ b/redis-om-spring/src/main/java/com/redis/om/cache/KeyFunction.java
@@ -0,0 +1,66 @@
+package com.redis.om.cache;
+
+import org.springframework.util.Assert;
+
+/**
+ * {@link KeyFunction} is a function for creating custom prefixes prepended to
+ * the actual {@literal key} stored in Redis.
+ *
+ */
+@FunctionalInterface
+public interface KeyFunction {
+
+ /**
+ * Default separator.
+ *
+ */
+ String SEPARATOR = ":";
+
+ /**
+ * A pass-through implementation that returns the key unchanged without any prefix.
+ * This can be used when no key transformation is needed.
+ */
+ KeyFunction PASSTHROUGH = (cache, key) -> key;
+
+ /**
+ * Default {@link KeyFunction} scheme that prefixes cache keys with
+ * the {@link String name} of the cache followed by double colons.
+ *
+ * For example, a cache named {@literal myCache} will prefix all cache keys with
+ * {@literal myCache::}.
+ *
+ */
+ KeyFunction SIMPLE = (cache, key) -> cache + SEPARATOR + key;
+
+ /**
+ * Compute the {@link String prefix} for the actual {@literal cache key} stored
+ * in Redis.
+ *
+ * @param cache {@link String name} of the cache in which the key is stored;
+ * will never be {@literal null}.
+ * @param key the cache key to be processed; will never be {@literal null}.
+ * @return the computed {@literal cache key} stored in Redis; never
+ * {@literal null}.
+ */
+ String compute(String cache, String key);
+
+ /**
+ * Creates a {@link KeyFunction} scheme that prefixes cache keys with the given
+ * {@link String prefix}.
+ *
+ * The {@link String prefix} is prepended to the {@link String cacheName}
+ * followed by double colons.
+ *
+ * For example, a prefix {@literal redis-} with a cache named
+ * {@literal myCache} results in {@literal redis-myCache::}.
+ *
+ * @param prefix must not be {@literal null}.
+ * @return the default {@link KeyFunction} scheme.
+ * @since 2.3
+ */
+ static KeyFunction prefixed(String prefix) {
+ Assert.notNull(prefix, "Prefix must not be null");
+ return (name, key) -> prefix + name + SEPARATOR + key;
+ }
+
+}
diff --git a/redis-om-spring/src/main/java/com/redis/om/cache/LocalCacheAccessor.java b/redis-om-spring/src/main/java/com/redis/om/cache/LocalCacheAccessor.java
new file mode 100644
index 000000000..58b3fcbbe
--- /dev/null
+++ b/redis-om-spring/src/main/java/com/redis/om/cache/LocalCacheAccessor.java
@@ -0,0 +1,128 @@
+package com.redis.om.cache;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.hrakaroo.glob.GlobPattern;
+import com.hrakaroo.glob.MatchingEngine;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+
+/**
+ * A {@link CacheAccessor} implementation that provides a local in-memory cache in front of another
+ * {@link CacheAccessor} delegate. This implementation helps reduce network calls by caching values locally.
+ * It also provides metrics for cache hits, misses, and evictions.
+ */
+public class LocalCacheAccessor implements CacheAccessor {
+
+ private static final String DESCRIPTION_GETS = "The number of times local cache lookup methods have returned a cached (hit) or uncached (newly loaded or null) value (miss).";
+ private static final String DESCRIPTION_EVICTIONS = "The number of times the local cache was evicted";
+
+ private final Map map;
+ private final CacheAccessor delegate;
+ private final Counter hits;
+ private final Counter misses;
+ private final Counter evictions;
+
+ /**
+ * Creates a new LocalCacheAccessor with the specified cache map, delegate, name, and registry.
+ *
+ * @param cache the map to use for local caching
+ * @param delegate the underlying CacheAccessor to delegate calls to
+ * @param name the name of the cache, used for metrics
+ * @param registry the meter registry for recording metrics
+ */
+ public LocalCacheAccessor(Map cache, CacheAccessor delegate, String name, MeterRegistry registry) {
+ this.map = cache;
+ this.delegate = delegate;
+ this.hits = Counter.builder("cache.local.gets").tags("name", name).tag("result", "hit").description(
+ DESCRIPTION_GETS).register(registry);
+ this.misses = Counter.builder("cache.local.gets").tag("name", name).tag("result", "miss").description(
+ DESCRIPTION_GETS).register(registry);
+ this.evictions = Counter.builder("cache.local.evictions").tag("name", name).description(DESCRIPTION_EVICTIONS)
+ .register(registry);
+ }
+
+ /**
+ * Returns the map used for local caching.
+ *
+ * @return the map containing locally cached values
+ */
+ public Map getMap() {
+ return map;
+ }
+
+ /**
+ * Returns the delegate CacheAccessor that this LocalCacheAccessor wraps.
+ *
+ * @return the underlying CacheAccessor delegate
+ */
+ public CacheAccessor getDelegate() {
+ return delegate;
+ }
+
+ @Override
+ public Object get(String key, Duration ttl) {
+ Object value = map.get(key);
+ if (value == null) {
+ misses.increment();
+ value = delegate.get(key, ttl);
+ if (value != null) {
+ map.put(key, value);
+ }
+ } else {
+ hits.increment();
+ }
+ return value;
+ }
+
+ @Override
+ public void put(String key, Object value, Duration ttl) {
+ map.put(key, value);
+ delegate.put(key, value, ttl);
+ // Register interest in key
+ delegate.get(key, ttl);
+ }
+
+ @Override
+ public Object putIfAbsent(String key, Object value, Duration ttl) {
+ if (!map.containsKey(key)) {
+ map.put(key, value);
+ }
+ Object result = delegate.putIfAbsent(key, value, ttl);
+ // Register interest in key
+ delegate.get(key, ttl);
+ return result;
+ }
+
+ @Override
+ public void remove(String key) {
+ delegate.remove(key);
+ map.remove(key);
+ evictions.increment();
+ }
+
+ /**
+ * Removes an entry from the local cache without affecting the delegate cache.
+ * This is useful for selectively invalidating local cache entries.
+ *
+ * @param key the key to remove from the local cache
+ */
+ public void evictLocal(String key) {
+ map.remove(key);
+ evictions.increment();
+ }
+
+ @Override
+ public long clean(String pattern) {
+ MatchingEngine engine = GlobPattern.compile(pattern);
+ List keys = map.keySet().stream().filter(engine::matches).collect(Collectors.toList());
+ keys.forEach(map::remove);
+ evictions.increment(keys.size());
+ return delegate.clean(pattern);
+ }
+
+}
diff --git a/redis-om-spring/src/main/java/com/redis/om/cache/RedisCache.java b/redis-om-spring/src/main/java/com/redis/om/cache/RedisCache.java
new file mode 100644
index 000000000..5249538b2
--- /dev/null
+++ b/redis-om-spring/src/main/java/com/redis/om/cache/RedisCache.java
@@ -0,0 +1,497 @@
+package com.redis.om.cache;
+
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+import org.springframework.cache.Cache;
+import org.springframework.cache.support.AbstractValueAdaptingCache;
+import org.springframework.cache.support.NullValue;
+import org.springframework.cache.support.SimpleValueWrapper;
+import org.springframework.core.convert.ConversionFailedException;
+import org.springframework.core.convert.ConversionService;
+import org.springframework.core.convert.TypeDescriptor;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+import org.springframework.util.ObjectUtils;
+import org.springframework.util.ReflectionUtils;
+import org.springframework.util.StringUtils;
+
+import com.redis.lettucemod.RedisModulesUtils;
+import com.redis.lettucemod.api.StatefulRedisModulesConnection;
+import com.redis.lettucemod.search.CreateOptions;
+import com.redis.lettucemod.search.CreateOptions.DataType;
+import com.redis.lettucemod.search.Field;
+import com.redis.lettucemod.search.IndexInfo;
+
+import io.lettuce.core.AbstractRedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.TrackingArgs;
+import io.lettuce.core.codec.ByteArrayCodec;
+import io.lettuce.core.codec.StringCodec;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Timer;
+
+/**
+ * {@link AbstractValueAdaptingCache Cache} implementation using Redis as the underlying store for cache data.
+ *
+ * Use {@link RedisCacheManager} to create {@link RedisCache} instances.
+ */
+public class RedisCache extends AbstractValueAdaptingCache implements AutoCloseable {
+
+ private static final String DESCRIPTION_GETS = "The number of times cache lookup methods have returned a cached (hit) or uncached (miss) value.";
+
+ private static final String DESCRIPTION_PUTS = "The number of entries added to the cache.";
+
+ private static final String DESCRIPTION_EVICTIONS = "The number of times the cache was evicted";
+
+ static final String CACHE_RETRIEVAL_UNSUPPORTED_OPERATION_EXCEPTION_MESSAGE = "The Redis driver configured with RedisCache through RedisCacheWriter does not support CompletableFuture-based retrieval";
+
+ private final String name;
+
+ private final AbstractRedisClient redisClient;
+
+ private final StatefulRedisModulesConnection connection;
+
+ private final CacheAccessor accessor;
+
+ private final RedisCacheConfiguration configuration;
+
+ private final Counter hits;
+
+ private final Counter misses;
+
+ private final Counter puts;
+
+ private final Counter evictions;
+
+ private final Timer getLatency;
+
+ private final Timer putLatency;
+
+ private final Timer evictionLatency;
+
+ /**
+ * Create a new {@link RedisCache} with the given {@link String name} and {@link RedisCacheConfiguration}, using the
+ * {@link CacheAccessor} to execute Redis commands supporting the cache operations.
+ *
+ * @param name {@link String name} for this {@link Cache}; must not be {@literal null}.
+ * @param client Default {@link AbstractRedisClient} to use if none specified in given RedisCacheConfiguration.
+ * @param configuration {@link RedisCacheConfiguration} applied to this {@link RedisCache} on creation; must not be
+ * {@literal null}.
+ * @throws IllegalArgumentException if either the given {@link CacheAccessor} or {@link RedisCacheConfiguration} are
+ * {@literal null} or the given {@link String} name for this {@link RedisCache} is
+ * {@literal null}.
+ */
+ public RedisCache(String name, AbstractRedisClient client, RedisCacheConfiguration configuration) {
+ super(false);
+ Assert.notNull(name, "Name must not be null");
+ this.name = name;
+ this.redisClient = configuration.getClient() == null ? client : configuration.getClient();
+ this.connection = RedisModulesUtils.connection(redisClient, ByteArrayCodec.INSTANCE);
+ this.configuration = configuration;
+ this.accessor = accessor();
+ this.hits = Counter.builder("cache.gets").tags("name", name).tag("result", "hit").description(DESCRIPTION_GETS)
+ .register(configuration.getMeterRegistry());
+ this.misses = Counter.builder("cache.gets").tag("name", name).tag("result", "miss").description(DESCRIPTION_GETS)
+ .register(configuration.getMeterRegistry());
+ this.puts = Counter.builder("cache.puts").tag("name", name).description(DESCRIPTION_PUTS).register(configuration
+ .getMeterRegistry());
+ this.evictions = Counter.builder("cache.evictions").tag("name", name).description(DESCRIPTION_EVICTIONS).register(
+ configuration.getMeterRegistry());
+ this.getLatency = Timer.builder("cache.gets.latency").tag("name", name).description("Cache gets").register(
+ configuration.getMeterRegistry());
+ this.putLatency = Timer.builder("cache.puts.latency").tag("name", name).description("Cache puts").register(
+ configuration.getMeterRegistry());
+ this.evictionLatency = Timer.builder("cache.evictions.latency").tag("name", name).description("Cache evictions")
+ .register(configuration.getMeterRegistry());
+ if (configuration.isIndexEnabled()) {
+ createIndex();
+ }
+ }
+
+ @SuppressWarnings(
+ "unchecked"
+ )
+ private void createIndex() {
+ CreateOptions.Builder createOptions = CreateOptions.builder();
+ createOptions.on(indexType());
+ createOptions.prefix(getName() + KeyFunction.SEPARATOR);
+ createOptions.noFields(); // Disable storing attribute bits for each term. It saves memory, but it does not allow
+ // filtering by specific attributes.
+ try (StatefulRedisModulesConnection connection = connection()) {
+ String indexName = indexName();
+ try {
+ connection.sync().ftDropindex(indexName);
+ } catch (RedisCommandExecutionException e) {
+ // ignore as index might not exist
+ }
+ connection.sync().ftCreate(indexName, createOptions.build(), indexField());
+ }
+ }
+
+ private DataType indexType() {
+ switch (configuration.getRedisType()) {
+ case HASH:
+ return DataType.HASH;
+ case JSON:
+ return DataType.JSON;
+ default:
+ throw new IllegalArgumentException(String.format("Redis type %s not indexable", configuration.getRedisType()));
+ }
+ }
+
+ private StatefulRedisModulesConnection connection() {
+ return RedisModulesUtils.connection(redisClient);
+ }
+
+ /**
+ * Returns the number of documents in the index if enabled.
+ *
+ * @return the number of documents in the index or -1 if cache is not indexed
+ */
+ public long getCount() {
+ if (configuration.isIndexEnabled()) {
+ try (StatefulRedisModulesConnection connection = connection()) {
+ IndexInfo cacheInfo = RedisModulesUtils.indexInfo(connection.sync().ftInfo(indexName()));
+ Double numDocs = cacheInfo.getNumDocs();
+ if (numDocs != null) {
+ return numDocs.longValue();
+ }
+ }
+ }
+ return -1;
+ }
+
+ private String indexName() {
+ if (StringUtils.hasLength(configuration.getIndexName())) {
+ return configuration.getIndexName();
+ }
+ return name + "Idx";
+ }
+
+ private Field indexField() {
+ if (configuration.getRedisType() == RedisType.JSON) {
+ return Field.tag("$._class").as("_class").build();
+ }
+ return Field.tag("_class").build();
+ }
+
+ @Override
+ public void close() {
+ connection.close();
+ }
+
+ @SuppressWarnings(
+ "unchecked"
+ )
+ private CacheAccessor accessor() {
+ CacheAccessor redisCacheAccessor = redisCacheAccessor();
+ if (configuration.getLocalCache().isPresent()) {
+ connection.sync().clientTracking(TrackingArgs.Builder.enabled());
+ LocalCacheAccessor localCacheAccessor = new LocalCacheAccessor(configuration.getLocalCache().get(),
+ redisCacheAccessor, name, configuration.getMeterRegistry());
+ connection.addListener(msg -> {
+ if (msg.getType().equals("invalidate")) {
+ List