Skip to content

Commit 51e240b

Browse files
artembilangaryrussell
authored andcommitted
GH-3439: Revise error handling in RedisStreamMP
Fixes #3439 The latest Spring Data Redis has introduced an `onErrorResume` function option for the `StreamReceiver` and this one is now recommended way to handle errors in the `Flux` from this receiver * Expose all the `StreamReceiver.StreamReceiverOptionsBuilder` option onto the `ReactiveRedisStreamMessageProducer`, including `onErrorResume` * Have a default function as it was before - send into an error channel supporting (n)ack in the failed message based on the failed record * Make new setters mutually exclusive with an explicit `StreamReceiver.StreamReceiverOptions` * Doc polishing
1 parent c003a47 commit 51e240b

File tree

4 files changed

+142
-23
lines changed

4 files changed

+142
-23
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/ReactiveRedisStreamMessageProducer.java

Lines changed: 128 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,14 +17,19 @@
1717
package org.springframework.integration.redis.inbound;
1818

1919
import java.time.Duration;
20+
import java.util.function.Function;
2021

22+
import org.reactivestreams.Publisher;
23+
24+
import org.springframework.core.convert.ConversionFailedException;
2125
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
2226
import org.springframework.data.redis.connection.stream.Consumer;
2327
import org.springframework.data.redis.connection.stream.ReadOffset;
2428
import org.springframework.data.redis.connection.stream.Record;
2529
import org.springframework.data.redis.connection.stream.StreamOffset;
2630
import org.springframework.data.redis.core.ReactiveRedisTemplate;
2731
import org.springframework.data.redis.core.ReactiveStreamOperations;
32+
import org.springframework.data.redis.hash.HashMapper;
2833
import org.springframework.data.redis.serializer.RedisSerializationContext;
2934
import org.springframework.data.redis.stream.StreamReceiver;
3035
import org.springframework.integration.IntegrationMessageHeaderAccessor;
@@ -34,6 +39,7 @@
3439
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3540
import org.springframework.lang.Nullable;
3641
import org.springframework.messaging.Message;
42+
import org.springframework.messaging.MessageChannel;
3743
import org.springframework.messaging.MessagingException;
3844
import org.springframework.messaging.converter.MessageConversionException;
3945
import org.springframework.util.Assert;
@@ -61,12 +67,14 @@ public class ReactiveRedisStreamMessageProducer extends MessageProducerSupport {
6167

6268
private final String streamKey;
6369

64-
private ReactiveStreamOperations<String, ?, ?> reactiveStreamOperations;
65-
66-
private StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions =
70+
private final StreamReceiver.StreamReceiverOptionsBuilder<String, ?> streamReceiverOptionsBuilder =
6771
StreamReceiver.StreamReceiverOptions.builder()
6872
.pollTimeout(Duration.ZERO)
69-
.build();
73+
.onErrorResume(this::handleReceiverError);
74+
75+
private ReactiveStreamOperations<String, ?, ?> reactiveStreamOperations;
76+
77+
private StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions;
7078

7179
private StreamReceiver<String, ?> streamReceiver;
7280

@@ -84,6 +92,8 @@ public class ReactiveRedisStreamMessageProducer extends MessageProducerSupport {
8492

8593
private boolean createConsumerGroup;
8694

95+
private boolean receiverBuilderOptionSet;
96+
8797
public ReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveConnectionFactory,
8898
String streamKey) {
8999

@@ -152,14 +162,105 @@ public void setCreateConsumerGroup(boolean createConsumerGroup) {
152162
* It provides a way to set the polling timeout and the serialization context.
153163
* By default the polling timeout is set to infinite and
154164
* {@link org.springframework.data.redis.serializer.StringRedisSerializer} is used.
165+
* Mutually exclusive with 'pollTimeout', 'batchSize', 'onErrorResume', 'serializer', 'targetType', 'objectMapper'.
155166
* @param streamReceiverOptions the desired receiver options
156167
* */
157168
public void setStreamReceiverOptions(
158169
@Nullable StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions) {
159170

171+
Assert.isTrue(!this.receiverBuilderOptionSet,
172+
"The 'streamReceiverOptions' is mutually exclusive with 'pollTimeout', 'batchSize', " +
173+
"'onErrorResume', 'serializer', 'targetType', 'objectMapper'");
160174
this.streamReceiverOptions = streamReceiverOptions;
161175
}
162176

177+
private void assertStreamReceiverOptions(String property) {
178+
Assert.isNull(this.streamReceiverOptions,
179+
() -> "'" + property + "' cannot be set when 'StreamReceiver.StreamReceiverOptions' is provided.");
180+
}
181+
182+
/**
183+
* Configure a poll timeout for the BLOCK option during reading.
184+
* Mutually exclusive with {@link #setStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)}.
185+
* @param pollTimeout the timeout for polling.
186+
* @since 5.5
187+
* @see org.springframework.data.redis.stream.StreamReceiver.StreamReceiverOptionsBuilder#pollTimeout(Duration)
188+
*/
189+
public void setPollTimeout(Duration pollTimeout) {
190+
assertStreamReceiverOptions("pollTimeout");
191+
this.streamReceiverOptionsBuilder.pollTimeout(pollTimeout);
192+
this.receiverBuilderOptionSet = true;
193+
}
194+
195+
/**
196+
* Configure a batch size for the COUNT option during reading.
197+
* Mutually exclusive with {@link #setStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)}.
198+
* @param recordsPerPoll must be greater zero.
199+
* @since 5.5
200+
* @see org.springframework.data.redis.stream.StreamReceiver.StreamReceiverOptionsBuilder#batchSize(int)
201+
*/
202+
public void setBatchSize(int recordsPerPoll) {
203+
assertStreamReceiverOptions("batchSize");
204+
this.streamReceiverOptionsBuilder.batchSize(recordsPerPoll);
205+
this.receiverBuilderOptionSet = true;
206+
}
207+
208+
/**
209+
* Configure a resume Function to resume the main sequence when polling the stream fails.
210+
* Mutually exclusive with {@link #setStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)}.
211+
* By default this function extract the failed {@link Record} and sends an
212+
* {@link org.springframework.messaging.support.ErrorMessage} to the provided {@link #setErrorChannel(MessageChannel)}.
213+
* The failed message for this record may have a {@link IntegrationMessageHeaderAccessor#ACKNOWLEDGMENT_CALLBACK}
214+
* header when manual acknowledgment is configured for this message producer.
215+
* @param resumeFunction must not be null.
216+
* @since 5.5
217+
* @see org.springframework.data.redis.stream.StreamReceiver.StreamReceiverOptionsBuilder#onErrorResume(Function)
218+
*/
219+
public void setOnErrorResume(Function<? super Throwable, ? extends Publisher<Void>> resumeFunction) {
220+
assertStreamReceiverOptions("onErrorResume");
221+
this.streamReceiverOptionsBuilder.onErrorResume(resumeFunction);
222+
this.receiverBuilderOptionSet = true;
223+
}
224+
225+
/**
226+
* Configure a key, hash key and hash value serializer.
227+
* Mutually exclusive with {@link #setStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)}.
228+
* @param pair must not be null.
229+
* @since 5.5
230+
* @see StreamReceiver.StreamReceiverOptionsBuilder#serializer(RedisSerializationContext)
231+
*/
232+
public void setSerializer(RedisSerializationContext.SerializationPair<?> pair) {
233+
assertStreamReceiverOptions("serializer");
234+
this.streamReceiverOptionsBuilder.serializer(pair);
235+
this.receiverBuilderOptionSet = true;
236+
}
237+
238+
/**
239+
* Configure a hash target type. Changes the emitted Record type to ObjectRecord.
240+
* Mutually exclusive with {@link #setStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)}.
241+
* @param targetType must not be null.
242+
* @since 5.5
243+
* @see StreamReceiver.StreamReceiverOptionsBuilder#targetType(Class)
244+
*/
245+
public void setTargetType(Class<?> targetType) {
246+
assertStreamReceiverOptions("targetType");
247+
this.streamReceiverOptionsBuilder.targetType(targetType);
248+
this.receiverBuilderOptionSet = true;
249+
}
250+
251+
/**
252+
* Configure a hash mapper.
253+
* Mutually exclusive with {@link #setStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)}.
254+
* @param hashMapper must not be null.
255+
* @since 5.5
256+
* @see StreamReceiver.StreamReceiverOptionsBuilder#objectMapper(HashMapper)
257+
*/
258+
public void setObjectMapper(HashMapper<?, ?, ?> hashMapper) {
259+
assertStreamReceiverOptions("objectMapper");
260+
this.streamReceiverOptionsBuilder.objectMapper(hashMapper);
261+
this.receiverBuilderOptionSet = true;
262+
}
263+
163264
@Override
164265
public String getComponentType() {
165266
return "redis:stream-inbound-channel-adapter";
@@ -168,6 +269,9 @@ public String getComponentType() {
168269
@Override
169270
protected void onInit() {
170271
super.onInit();
272+
if (this.streamReceiverOptions == null) {
273+
this.streamReceiverOptions = this.streamReceiverOptionsBuilder.build();
274+
}
171275
this.streamReceiver = StreamReceiver.create(this.reactiveConnectionFactory, this.streamReceiverOptions);
172276
if (StringUtils.hasText(this.consumerName) && !StringUtils.hasText(this.consumerGroup)) {
173277
this.consumerGroup = getBeanName();
@@ -211,17 +315,7 @@ protected void doStart() {
211315
}
212316

213317
Flux<? extends Message<?>> messageFlux =
214-
events.map((record) -> buildMessageFromRecord(record, this.extractPayload))
215-
.onErrorContinue((ex, record) -> {
216-
@SuppressWarnings("unchecked")
217-
Message<?> failedMessage = buildMessageFromRecord((Record<String, ?>) record, false);
218-
MessagingException conversionException =
219-
new MessageConversionException(failedMessage,
220-
"Cannot deserialize Redis Stream Record", ex);
221-
if (!sendErrorMessageIfNecessary(null, conversionException)) {
222-
logger.getLog().error(conversionException);
223-
}
224-
});
318+
events.map((record) -> buildMessageFromRecord(record, this.extractPayload));
225319
subscribeToPublisher(messageFlux);
226320
}
227321

@@ -245,4 +339,22 @@ private Message<?> buildMessageFromRecord(Record<String, ?> record, boolean extr
245339
return builder.build();
246340
}
247341

342+
private <T> Publisher<T> handleReceiverError(Throwable error) {
343+
Message<?> failedMessage = null;
344+
if (error instanceof ConversionFailedException) {
345+
@SuppressWarnings("unchecked")
346+
Record<String, ?> record = (Record<String, ?>) ((ConversionFailedException) error).getValue();
347+
if (record != null) {
348+
failedMessage = buildMessageFromRecord(record, false);
349+
}
350+
}
351+
MessagingException conversionException =
352+
new MessageConversionException(failedMessage, // NOSONAR
353+
"Cannot deserialize Redis Stream Record", error);
354+
if (!sendErrorMessageIfNecessary(null, conversionException)) {
355+
logger.getLog().error(conversionException);
356+
}
357+
return Mono.empty();
358+
}
359+
248360
}

spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/ReactiveRedisStreamMessageProducerTests.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -327,11 +327,8 @@ PollableChannel redisStreamErrorChannel() {
327327
ReactiveRedisStreamMessageProducer reactiveErrorRedisStreamProducer() {
328328
ReactiveRedisStreamMessageProducer messageProducer =
329329
new ReactiveRedisStreamMessageProducer(RedisAvailableRule.connectionFactory, STREAM_KEY);
330-
messageProducer.setStreamReceiverOptions(
331-
StreamReceiver.StreamReceiverOptions.builder()
332-
.pollTimeout(Duration.ofMillis(100))
333-
.targetType(Date.class)
334-
.build());
330+
messageProducer.setTargetType(Date.class);
331+
messageProducer.setPollTimeout(Duration.ofMillis(100));
335332
messageProducer.setCreateConsumerGroup(true);
336333
messageProducer.setAutoAck(false);
337334
messageProducer.setConsumerName("testConsumer");

src/reference/asciidoc/redis.adoc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
[[redis]]
1+
``[[redis]]
22
== Redis Support
33

44
Spring Integration 2.1 introduced support for https://redis.io/[Redis]: "`an open source advanced key-value store`".
@@ -859,6 +859,10 @@ Similar logic is required even when an exception happens during deserialization
859859
So, target error handler must decided to ack or nack such a failed message.
860860
Alongside with `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`, the `ReactiveRedisStreamMessageProducer` also populates these headers into the message to produce: `RedisHeaders.STREAM_KEY`, `RedisHeaders.STREAM_MESSAGE_ID`, `RedisHeaders.CONSUMER_GROUP` and `RedisHeaders.CONSUMER`.
861861

862+
Starting with version 5.5, you can configure `StreamReceiver.StreamReceiverOptionsBuilder` options explicitly on the `ReactiveRedisStreamMessageProducer`, including the newly introduced `onErrorResume` function, which is required if the Redis Stream consumer should continue polling when deserialization errors occur.
863+
The default function sends a message to the error channel (if provided) with possible acknowledgement for the failed message as it is described above.
864+
All these `StreamReceiver.StreamReceiverOptionsBuilder` are mutually exclusive with an externally provided `StreamReceiver.StreamReceiverOptions`.
865+
862866
[[redis-lock-registry]]
863867
=== Redis Lock Registry
864868

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,9 @@ If you are interested in more details, see the Issue Tracker tickets that were r
2323

2424
The `AmqpInboundChannelAdapter` and `AmqpInboundGateway` (and the respective Java DSL builders) now support an `org.springframework.amqp.rabbit.retry.MessageRecoverer` as an AMQP-specific alternative to the general purpose `RecoveryCallback`.
2525
See <<./amqp.adoc#amqp,AMQP Support>> for more information.
26+
27+
[[x5.5-redis]]
28+
==== Redis Changes
29+
30+
The `ReactiveRedisStreamMessageProducer` has now setters for all the `StreamReceiver.StreamReceiverOptionsBuilder` options, including an `onErrorResume` function.
31+
See <<./redis.adoc#redis,Redis Support>> for more information.

0 commit comments

Comments
 (0)