Skip to content

Commit 4383f56

Browse files
sobychackoartembilan
authored andcommitted
GH-3032: Add RecoveryCallback to RetryingDeserializer
Fixes: #3032 # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java # spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java
1 parent d4425bd commit 4383f56

File tree

3 files changed

+65
-49
lines changed

3 files changed

+65
-49
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,8 @@ ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
397397
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
398398
----
399399

400+
Starting with version `3.1.2`, a `RecoveryCallback` can be set on the `RetryingDeserializer` optionally.
401+
400402
Refer to the https://github.com/spring-projects/spring-retry[spring-retry] project for configuration of the `RetryTemplate` with a retry policy, back off policy, etc.
401403

402404

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,11 +16,14 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19+
import java.nio.ByteBuffer;
1920
import java.util.Map;
2021

2122
import org.apache.kafka.common.header.Headers;
2223
import org.apache.kafka.common.serialization.Deserializer;
2324

25+
import org.springframework.lang.Nullable;
26+
import org.springframework.retry.RecoveryCallback;
2427
import org.springframework.retry.RetryOperations;
2528
import org.springframework.util.Assert;
2629

@@ -31,39 +34,54 @@
3134
* @param <T> Type to be deserialized into.
3235
*
3336
* @author Gary Russell
34-
* @since 2.3
37+
* @author Wang Zhiyang
38+
* @author Soby Chacko
3539
*
40+
* @since 2.3
3641
*/
3742
public class RetryingDeserializer<T> implements Deserializer<T> {
3843

3944
private final Deserializer<T> delegate;
4045

4146
private final RetryOperations retryOperations;
4247

48+
@Nullable
49+
private RecoveryCallback<T> recoveryCallback;
50+
4351
public RetryingDeserializer(Deserializer<T> delegate, RetryOperations retryOperations) {
4452
Assert.notNull(delegate, "the 'delegate' deserializer cannot be null");
4553
Assert.notNull(retryOperations, "the 'retryOperations' deserializer cannot be null");
4654
this.delegate = delegate;
4755
this.retryOperations = retryOperations;
4856
}
4957

58+
/**
59+
* Set a recovery callback to execute when the retries are exhausted.
60+
* @param recoveryCallback {@link RecoveryCallback} to execute
61+
* @since 3.1.2
62+
*/
63+
public void setRecoveryCallback(@Nullable RecoveryCallback<T> recoveryCallback) {
64+
this.recoveryCallback = recoveryCallback;
65+
}
66+
5067
@Override
5168
public void configure(Map<String, ?> configs, boolean isKey) {
5269
this.delegate.configure(configs, isKey);
5370
}
5471

5572
@Override
5673
public T deserialize(String topic, byte[] data) {
57-
return this.retryOperations.execute(context -> {
58-
return this.delegate.deserialize(topic, data);
59-
});
74+
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, data), this.recoveryCallback);
6075
}
6176

6277
@Override
6378
public T deserialize(String topic, Headers headers, byte[] data) {
64-
return this.retryOperations.execute(context -> {
65-
return this.delegate.deserialize(topic, headers, data);
66-
});
79+
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data), this.recoveryCallback);
80+
}
81+
82+
@Override
83+
public T deserialize(String topic, Headers headers, ByteBuffer data) {
84+
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data), this.recoveryCallback);
6785
}
6886

6987
@Override

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java

Lines changed: 37 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,65 +16,61 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19-
import static org.assertj.core.api.Assertions.assertThat;
20-
21-
import java.util.Map;
19+
import java.util.concurrent.atomic.AtomicInteger;
2220

23-
import org.apache.kafka.common.header.Headers;
2421
import org.apache.kafka.common.header.internals.RecordHeaders;
2522
import org.apache.kafka.common.serialization.Deserializer;
2623
import org.junit.jupiter.api.Test;
2724

25+
import org.springframework.retry.RecoveryCallback;
26+
import org.springframework.retry.RetryContext;
2827
import org.springframework.retry.support.RetryTemplate;
2928

29+
import static org.assertj.core.api.Assertions.assertThat;
30+
import static org.mockito.Mockito.any;
31+
import static org.mockito.Mockito.mock;
32+
import static org.mockito.Mockito.verify;
33+
3034
/**
3135
* @author Gary Russell
32-
* @since 2.3
36+
* @author Wang Zhiyang
37+
* @author Soby Chacko
38+
* @author Artem Bilan
3339
*
40+
* @since 2.3
3441
*/
35-
public class RetryingDeserializerTests {
42+
class RetryingDeserializerTests {
3643

3744
@Test
38-
void testRetry() {
39-
Deser delegate = new Deser();
45+
void basicRetryingDeserializer() {
46+
AtomicInteger n = new AtomicInteger();
47+
Deserializer<String> delegate =
48+
(topic, data) -> {
49+
if (n.incrementAndGet() < 2) {
50+
throw new RuntimeException();
51+
}
52+
return new String(data);
53+
};
54+
4055
RetryingDeserializer<String> rdes = new RetryingDeserializer<>(delegate, new RetryTemplate());
4156
assertThat(rdes.deserialize("foo", "bar".getBytes())).isEqualTo("bar");
42-
assertThat(delegate.n).isEqualTo(3);
43-
delegate.n = 0;
57+
assertThat(n.get()).isEqualTo(3);
58+
n.set(0);
4459
assertThat(rdes.deserialize("foo", new RecordHeaders(), "bar".getBytes())).isEqualTo("bar");
45-
assertThat(delegate.n).isEqualTo(3);
46-
rdes.close();
60+
assertThat(n.get()).isEqualTo(3);
4761
}
4862

49-
public static class Deser implements Deserializer<String> {
50-
51-
int n;
52-
53-
@Override
54-
public void configure(Map<String, ?> configs, boolean isKey) {
55-
}
56-
57-
@Override
58-
public String deserialize(String topic, byte[] data) {
59-
if (n++ < 2) {
60-
throw new RuntimeException();
61-
}
62-
return new String(data);
63-
}
64-
65-
@Override
66-
public String deserialize(String topic, Headers headers, byte[] data) {
67-
if (n++ < 2) {
68-
throw new RuntimeException();
69-
}
70-
return new String(data);
71-
}
72-
73-
@Override
74-
public void close() {
75-
// empty
76-
}
77-
63+
@Test
64+
void retryingDeserializerWithRecoveryCallback() throws Exception {
65+
RetryingDeserializer<String> rdes =
66+
new RetryingDeserializer<>(
67+
(s, b) -> {
68+
throw new RuntimeException();
69+
}, new RetryTemplate());
70+
RecoveryCallback<String> recoveryCallback = mock();
71+
rdes.setRecoveryCallback(recoveryCallback);
72+
rdes.deserialize("my-topic", "my-data".getBytes());
73+
verify(recoveryCallback).recover(any(RetryContext.class));
7874
}
7975

8076
}

0 commit comments

Comments
 (0)