Skip to content

Commit b1a3830

Browse files
rohan mukeshartembilan
authored andcommitted
Add Error Handling to RedisStreamMessageProducer
* Use 'onErrorContinue' to continue receiving messages from Stream * Send an `ErrorMessage` to the provided `errorChannel` (if any) * And `@Nullable` to some `MessageProducerSupport` API which definitely may accept `null` * Extract common `buildMessageFromRecord()` method in the `ReactiveRedisStreamMessageProducer`, so all the headers from the stream `Record` are carried to the message independently of the record state - normal send or error sending
1 parent 5b74db8 commit b1a3830

File tree

3 files changed

+151
-57
lines changed

3 files changed

+151
-57
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ protected void subscribeToPublisher(Publisher<? extends Message<?>> publisher) {
248248
* @return true if the error channel is available and message sent.
249249
* @since 4.3.10
250250
*/
251-
protected final boolean sendErrorMessageIfNecessary(Message<?> message, Exception exception) {
251+
protected final boolean sendErrorMessageIfNecessary(@Nullable Message<?> message, Exception exception) {
252252
MessageChannel channel = getErrorChannel();
253253
if (channel != null) {
254254
this.messagingTemplate.send(channel, buildErrorMessage(message, exception));
@@ -265,7 +265,7 @@ protected final boolean sendErrorMessageIfNecessary(Message<?> message, Exceptio
265265
* @return the error message.
266266
* @since 4.3.10
267267
*/
268-
protected final ErrorMessage buildErrorMessage(Message<?> message, Exception exception) {
268+
protected final ErrorMessage buildErrorMessage(@Nullable Message<?> message, Exception exception) {
269269
return this.errorMessageStrategy.buildErrorMessage(exception, getErrorMessageAttributes(message));
270270
}
271271

@@ -277,7 +277,7 @@ protected final ErrorMessage buildErrorMessage(Message<?> message, Exception exc
277277
* @return the attributes.
278278
* @since 4.3.10
279279
*/
280-
protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
280+
protected AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> message) {
281281
return ErrorMessageUtils.getAttributeAccessor(message, null);
282282
}
283283

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/ReactiveRedisStreamMessageProducer.java

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3535
import org.springframework.lang.Nullable;
3636
import org.springframework.messaging.Message;
37+
import org.springframework.messaging.MessagingException;
38+
import org.springframework.messaging.converter.MessageConversionException;
3739
import org.springframework.util.Assert;
3840
import org.springframework.util.StringUtils;
3941

@@ -211,25 +213,38 @@ protected void doStart() {
211213
}
212214

213215
Flux<? extends Message<?>> messageFlux =
214-
events.map((event) -> {
215-
AbstractIntegrationMessageBuilder<?> builder =
216-
getMessageBuilderFactory()
217-
.withPayload(this.extractPayload ? event.getValue() : event)
218-
.setHeader(RedisHeaders.STREAM_KEY, event.getStream())
219-
.setHeader(RedisHeaders.STREAM_MESSAGE_ID, event.getId())
220-
.setHeader(RedisHeaders.CONSUMER_GROUP, this.consumerGroup)
221-
.setHeader(RedisHeaders.CONSUMER, this.consumerName);
222-
if (!this.autoAck && this.consumerGroup != null) {
223-
builder.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
224-
(SimpleAcknowledgment) () ->
225-
this.reactiveStreamOperations
226-
.acknowledge(this.consumerGroup, event)
227-
.subscribe());
228-
}
229-
return builder.build();
230-
});
231-
216+
events.map((record) -> buildMessageFromRecord(record, this.extractPayload))
217+
.onErrorContinue((ex, record) -> {
218+
@SuppressWarnings("unchecked")
219+
Message<?> failedMessage = buildMessageFromRecord((Record<String, ?>) record, false);
220+
MessagingException conversionException =
221+
new MessageConversionException(failedMessage,
222+
"Cannot deserialize Redis Stream Record", ex);
223+
if (!sendErrorMessageIfNecessary(null, conversionException)) {
224+
logger.getLog().error(conversionException);
225+
}
226+
});
232227
subscribeToPublisher(messageFlux);
233228
}
234229

230+
private Message<?> buildMessageFromRecord(Record<String, ?> record, boolean extractPayload) {
231+
AbstractIntegrationMessageBuilder<?> builder =
232+
getMessageBuilderFactory()
233+
.withPayload(extractPayload ? record.getValue() : record)
234+
.setHeader(RedisHeaders.STREAM_KEY, record.getStream())
235+
.setHeader(RedisHeaders.STREAM_MESSAGE_ID, record.getId())
236+
.setHeader(RedisHeaders.CONSUMER_GROUP, this.consumerGroup)
237+
.setHeader(RedisHeaders.CONSUMER, this.consumerName);
238+
239+
if (!this.autoAck && this.consumerGroup != null) {
240+
builder.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
241+
(SimpleAcknowledgment) () ->
242+
this.reactiveStreamOperations
243+
.acknowledge(this.consumerGroup, record)
244+
.subscribe());
245+
}
246+
247+
return builder.build();
248+
}
249+
235250
}

spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/ReactiveRedisStreamMessageProducerTests.java

Lines changed: 115 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import static org.awaitility.Awaitility.await;
2121

2222
import java.time.Duration;
23+
import java.util.Date;
2324
import java.util.concurrent.atomic.AtomicReference;
2425

26+
import org.assertj.core.api.InstanceOfAssertFactories;
2527
import org.junit.After;
2628
import org.junit.Before;
2729
import org.junit.Test;
@@ -32,14 +34,14 @@
3234
import org.springframework.context.annotation.Configuration;
3335
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
3436
import org.springframework.data.redis.connection.stream.ReadOffset;
35-
import org.springframework.data.redis.connection.stream.StreamInfo;
3637
import org.springframework.data.redis.core.ReactiveRedisTemplate;
3738
import org.springframework.data.redis.serializer.RedisSerializationContext;
3839
import org.springframework.data.redis.stream.StreamReceiver;
3940
import org.springframework.integration.IntegrationMessageHeaderAccessor;
4041
import org.springframework.integration.StaticMessageHeaderAccessor;
4142
import org.springframework.integration.acks.SimpleAcknowledgment;
4243
import org.springframework.integration.channel.FluxMessageChannel;
44+
import org.springframework.integration.channel.QueueChannel;
4345
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
4446
import org.springframework.integration.redis.outbound.ReactiveRedisStreamMessageHandler;
4547
import org.springframework.integration.redis.rules.RedisAvailable;
@@ -48,6 +50,10 @@
4850
import org.springframework.integration.redis.support.RedisHeaders;
4951
import org.springframework.integration.redis.util.Address;
5052
import org.springframework.integration.redis.util.Person;
53+
import org.springframework.messaging.Message;
54+
import org.springframework.messaging.MessagingException;
55+
import org.springframework.messaging.PollableChannel;
56+
import org.springframework.messaging.support.ErrorMessage;
5157
import org.springframework.messaging.support.GenericMessage;
5258
import org.springframework.test.annotation.DirtiesContext;
5359
import org.springframework.test.context.junit4.SpringRunner;
@@ -75,7 +81,7 @@ public class ReactiveRedisStreamMessageProducerTests extends RedisAvailableTests
7581
FluxMessageChannel fluxMessageChannel;
7682

7783
@Autowired
78-
ReactiveRedisStreamMessageProducer redisStreamMessageProducer;
84+
ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer;
7985

8086
@Autowired
8187
ReactiveRedisTemplate<String, ?> template;
@@ -85,42 +91,32 @@ public class ReactiveRedisStreamMessageProducerTests extends RedisAvailableTests
8591

8692
@Before
8793
public void delKey() {
88-
this.template.hasKey(STREAM_KEY)
89-
.filter(Boolean::booleanValue)
90-
.flatMapMany(b ->
91-
this.template.opsForStream()
92-
.groups(STREAM_KEY)
93-
.map(StreamInfo.XInfoGroup::groupName)
94-
.flatMap(groupName ->
95-
this.template.opsForStream()
96-
.destroyGroup(STREAM_KEY, groupName)))
97-
.blockLast();
9894
this.template.delete(STREAM_KEY).block();
9995
}
10096

10197
@After
10298
public void tearDown() {
103-
this.redisStreamMessageProducer.stop();
99+
this.reactiveRedisStreamProducer.stop();
104100
RedisAvailableRule.connectionFactory.resetConnection();
105101
}
106102

107103
@Test
108104
@RedisAvailable
109105
public void testConsumerGroupCreation() {
110-
this.redisStreamMessageProducer.setCreateConsumerGroup(true);
111-
this.redisStreamMessageProducer.setConsumerName(CONSUMER);
112-
this.redisStreamMessageProducer.afterPropertiesSet();
106+
this.reactiveRedisStreamProducer.setCreateConsumerGroup(true);
107+
this.reactiveRedisStreamProducer.setConsumerName(CONSUMER);
108+
this.reactiveRedisStreamProducer.afterPropertiesSet();
113109

114110
Flux.from(this.fluxMessageChannel).subscribe();
115111

116-
this.redisStreamMessageProducer.start();
112+
this.reactiveRedisStreamProducer.start();
117113

118114
this.template.opsForStream()
119115
.groups(STREAM_KEY)
120116
.next()
121117
.as(StepVerifier::create)
122118
.assertNext((infoGroup) ->
123-
assertThat(infoGroup.groupName()).isEqualTo(this.redisStreamMessageProducer.getBeanName()))
119+
assertThat(infoGroup.groupName()).isEqualTo(this.reactiveRedisStreamProducer.getBeanName()))
124120
.thenCancel()
125121
.verify(Duration.ofSeconds(10));
126122
}
@@ -132,10 +128,10 @@ public void testReadingMessageAsStandaloneClient() {
132128
Person person = new Person(address, "Attoumane");
133129
this.messageHandler.handleMessage(new GenericMessage<>(person));
134130

135-
this.redisStreamMessageProducer.setCreateConsumerGroup(false);
136-
this.redisStreamMessageProducer.setConsumerName(null);
137-
this.redisStreamMessageProducer.setReadOffset(ReadOffset.from("0-0"));
138-
this.redisStreamMessageProducer.afterPropertiesSet();
131+
this.reactiveRedisStreamProducer.setCreateConsumerGroup(false);
132+
this.reactiveRedisStreamProducer.setConsumerName(null);
133+
this.reactiveRedisStreamProducer.setReadOffset(ReadOffset.from("0-0"));
134+
this.reactiveRedisStreamProducer.afterPropertiesSet();
139135

140136
StepVerifier stepVerifier =
141137
Flux.from(this.fluxMessageChannel)
@@ -148,7 +144,7 @@ public void testReadingMessageAsStandaloneClient() {
148144
.thenCancel()
149145
.verifyLater();
150146

151-
this.redisStreamMessageProducer.start();
147+
this.reactiveRedisStreamProducer.start();
152148

153149
stepVerifier.verify(Duration.ofSeconds(10));
154150
}
@@ -160,17 +156,17 @@ public void testReadingMessageAsConsumerInConsumerGroup() {
160156
Person person = new Person(address, "John Snow");
161157

162158
this.template.opsForStream()
163-
.createGroup(STREAM_KEY, this.redisStreamMessageProducer.getBeanName())
159+
.createGroup(STREAM_KEY, this.reactiveRedisStreamProducer.getBeanName())
164160
.as(StepVerifier::create)
165161
.assertNext(message -> assertThat(message).isEqualTo("OK"))
166162
.thenCancel()
167163
.verify(Duration.ofSeconds(10));
168164

169-
this.redisStreamMessageProducer.setCreateConsumerGroup(false);
170-
this.redisStreamMessageProducer.setConsumerName(CONSUMER);
171-
this.redisStreamMessageProducer.setReadOffset(ReadOffset.latest());
172-
this.redisStreamMessageProducer.afterPropertiesSet();
173-
this.redisStreamMessageProducer.start();
165+
this.reactiveRedisStreamProducer.setCreateConsumerGroup(false);
166+
this.reactiveRedisStreamProducer.setConsumerName(CONSUMER);
167+
this.reactiveRedisStreamProducer.setReadOffset(ReadOffset.latest());
168+
this.reactiveRedisStreamProducer.afterPropertiesSet();
169+
this.reactiveRedisStreamProducer.start();
174170

175171
StepVerifier stepVerifier =
176172
Flux.from(this.fluxMessageChannel)
@@ -196,13 +192,13 @@ public void testReadingPendingMessageWithNoAutoACK() {
196192
String consumerGroup = "testGroup";
197193
String consumerName = "testConsumer";
198194

199-
this.redisStreamMessageProducer.setCreateConsumerGroup(true);
200-
this.redisStreamMessageProducer.setAutoAck(false);
201-
this.redisStreamMessageProducer.setConsumerGroup(consumerGroup);
202-
this.redisStreamMessageProducer.setConsumerName(consumerName);
203-
this.redisStreamMessageProducer.setReadOffset(ReadOffset.latest());
204-
this.redisStreamMessageProducer.afterPropertiesSet();
205-
this.redisStreamMessageProducer.start();
195+
this.reactiveRedisStreamProducer.setCreateConsumerGroup(true);
196+
this.reactiveRedisStreamProducer.setAutoAck(false);
197+
this.reactiveRedisStreamProducer.setConsumerGroup(consumerGroup);
198+
this.reactiveRedisStreamProducer.setConsumerName(consumerName);
199+
this.reactiveRedisStreamProducer.setReadOffset(ReadOffset.latest());
200+
this.reactiveRedisStreamProducer.afterPropertiesSet();
201+
this.reactiveRedisStreamProducer.start();
206202

207203
AtomicReference<SimpleAcknowledgment> acknowledgmentReference = new AtomicReference<>();
208204

@@ -239,6 +235,65 @@ public void testReadingPendingMessageWithNoAutoACK() {
239235
.verifyComplete();
240236
}
241237

238+
@Autowired
239+
ReactiveRedisStreamMessageProducer reactiveErrorRedisStreamProducer;
240+
241+
@Autowired
242+
PollableChannel redisStreamErrorChannel;
243+
244+
@Test
245+
@RedisAvailable
246+
public void testReadingNextMessagesWhenSerializationException() {
247+
Person person = new Person(new Address("Winterfell, Westeros"), "John Snow");
248+
Date testDate = new Date();
249+
this.reactiveErrorRedisStreamProducer.start();
250+
251+
StepVerifier stepVerifier =
252+
Flux.from(this.fluxMessageChannel)
253+
.map(Message::getPayload)
254+
.cast(Date.class)
255+
.as(StepVerifier::create)
256+
.expectNext(testDate)
257+
.thenCancel()
258+
.verifyLater();
259+
260+
this.messageHandler.handleMessage(new GenericMessage<>(person));
261+
262+
Message<?> errorMessage = this.redisStreamErrorChannel.receive(10_000);
263+
assertThat(errorMessage).isInstanceOf(ErrorMessage.class)
264+
.extracting("payload.message")
265+
.asInstanceOf(InstanceOfAssertFactories.STRING)
266+
.contains("Cannot deserialize Redis Stream Record")
267+
.contains("Cannot parse date out of");
268+
269+
Mono<PendingMessagesSummary> pendingMessage =
270+
template.opsForStream()
271+
.pending(STREAM_KEY, this.reactiveErrorRedisStreamProducer.getBeanName());
272+
273+
StepVerifier.create(pendingMessage)
274+
.assertNext(pendingMessagesSummary ->
275+
assertThat(pendingMessagesSummary.getTotalPendingMessages()).isEqualTo(1L))
276+
.verifyComplete();
277+
278+
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload()).getFailedMessage();
279+
StaticMessageHeaderAccessor.getAcknowledgment(failedMessage).acknowledge();
280+
281+
pendingMessage =
282+
template.opsForStream()
283+
.pending(STREAM_KEY, this.reactiveErrorRedisStreamProducer.getBeanName());
284+
285+
StepVerifier.create(pendingMessage)
286+
.assertNext(pendingMessagesSummary ->
287+
assertThat(pendingMessagesSummary.getTotalPendingMessages()).isEqualTo(0))
288+
.verifyComplete();
289+
290+
this.messageHandler.handleMessage(new GenericMessage<>(testDate));
291+
292+
stepVerifier.verify(Duration.ofSeconds(10));
293+
294+
this.reactiveErrorRedisStreamProducer.stop();
295+
}
296+
242297
@Configuration
243298
static class ContextConfig {
244299

@@ -263,6 +318,30 @@ FluxMessageChannel fluxMessageChannel() {
263318
return new FluxMessageChannel();
264319
}
265320

321+
@Bean
322+
PollableChannel redisStreamErrorChannel() {
323+
return new QueueChannel();
324+
}
325+
326+
@Bean
327+
ReactiveRedisStreamMessageProducer reactiveErrorRedisStreamProducer() {
328+
ReactiveRedisStreamMessageProducer messageProducer =
329+
new ReactiveRedisStreamMessageProducer(RedisAvailableRule.connectionFactory, STREAM_KEY);
330+
messageProducer.setStreamReceiverOptions(
331+
StreamReceiver.StreamReceiverOptions.builder()
332+
.pollTimeout(Duration.ofMillis(100))
333+
.targetType(Date.class)
334+
.build());
335+
messageProducer.setCreateConsumerGroup(true);
336+
messageProducer.setAutoAck(false);
337+
messageProducer.setConsumerName("testConsumer");
338+
messageProducer.setReadOffset(ReadOffset.latest());
339+
messageProducer.setAutoStartup(false);
340+
messageProducer.setOutputChannel(fluxMessageChannel());
341+
messageProducer.setErrorChannel(redisStreamErrorChannel());
342+
return messageProducer;
343+
}
344+
266345
@Bean
267346
ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer() {
268347
ReactiveRedisStreamMessageProducer messageProducer =

0 commit comments

Comments
 (0)