Skip to content

Commit 3cb13af

Browse files
authored
More tests for Redis Stream support
Related to #3226
1 parent 60f3273 commit 3cb13af

File tree

2 files changed

+29
-4
lines changed

2 files changed

+29
-4
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/ReactiveRedisStreamMessageProducer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
* output channel.
4545
* By default this adapter reads message as a standalone client {@code XREAD} (Redis command) but can be switched to a
4646
* Consumer Group feature {@code XREADGROUP} by setting {@link #consumerName} field.
47-
* By default the Consumer Group name is an id of this bean {@link #getBeanName()}.
47+
* By default the Consumer Group name is the id of this bean {@link #getBeanName()}.
4848
*
4949
* @author Attoumane Ahamadi
5050
* @author Artem Bilan
@@ -195,7 +195,7 @@ protected void doStart() {
195195
Consumer consumer = Consumer.from(this.consumerGroup, this.consumerName); // NOSONAR
196196

197197
if (offset.getOffset().equals(ReadOffset.latest())) {
198-
// for consumer group offset id should be equal '>'
198+
// for consumer group offset id should be equal to '>'
199199
offset = StreamOffset.create(this.streamKey, ReadOffset.lastConsumed());
200200
}
201201

spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/ReactiveRedisStreamMessageProducerTests.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.springframework.integration.redis.rules.RedisAvailable;
3939
import org.springframework.integration.redis.rules.RedisAvailableRule;
4040
import org.springframework.integration.redis.rules.RedisAvailableTests;
41+
import org.springframework.integration.redis.support.RedisHeaders;
4142
import org.springframework.integration.redis.util.Address;
4243
import org.springframework.integration.redis.util.Person;
4344
import org.springframework.messaging.support.GenericMessage;
@@ -118,15 +119,39 @@ public void testReadingMessageAsStandaloneClient() {
118119

119120
Flux.from(this.fluxMessageChannel)
120121
.as(StepVerifier::create)
121-
.assertNext(message -> assertThat(message.getPayload()).isEqualTo(person))
122+
.assertNext(message -> {
123+
assertThat(message.getPayload()).isEqualTo(person);
124+
assertThat(message.getHeaders()).containsKeys(RedisHeaders.STREAM_KEY,
125+
RedisHeaders.STREAM_MESSAGE_ID);
126+
})
122127
.thenCancel()
123128
.verify(Duration.ofSeconds(10));
124129
}
125130

126131
@Test
127132
@RedisAvailable
128133
public void testReadingMessageAsConsumerInConsumerGroup() {
129-
//TODO find why the test above does not execute before implementing this one
134+
Address address = new Address("Winterfell, Westeros");
135+
Person person = new Person(address, "John Snow");
136+
137+
this.template.opsForStream().createGroup(STREAM_KEY, this.redisStreamMessageProducer.getBeanName())
138+
.subscribe();
139+
140+
this.redisStreamMessageProducer.setCreateConsumerGroup(false);
141+
this.redisStreamMessageProducer.setConsumerName(CONSUMER);
142+
this.redisStreamMessageProducer.afterPropertiesSet();
143+
this.redisStreamMessageProducer.start();
144+
145+
this.messageHandler.handleMessage(new GenericMessage<>(person));
146+
147+
Flux.from(this.fluxMessageChannel)
148+
.as(StepVerifier::create)
149+
.assertNext(message -> {
150+
assertThat(message.getPayload()).isEqualTo(person);
151+
assertThat(message.getHeaders()).containsKeys(RedisHeaders.CONSUMER_GROUP, RedisHeaders.CONSUMER);
152+
})
153+
.thenCancel()
154+
.verify(Duration.ofSeconds(10));
130155
}
131156

132157
@Configuration

0 commit comments

Comments
 (0)