Skip to content

Commit 6e71952

Browse files
ravicm2maihacke
andauthored
Retry pattern for a specific topic support (#274)
* retry pattern for specific topic and docs * removed commented cod * review comments and updated docs * removed else part code improve * Update guide-kafka.asciidoc * retry pattern for specific topic and docs * removed commented cod * review comments and updated docs * removed else part code improve * Update guide-kafka.asciidoc * modified value to key for loggers Co-authored-by: Simon Spielmann <[email protected]>
1 parent bdd0c44 commit 6e71952

File tree

11 files changed

+402
-152
lines changed

11 files changed

+402
-152
lines changed

documentation/guide-kafka.asciidoc

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ Example:
114114
@Inject
115115
private MessageSender messageSender;
116116
private ProducerRecord<K,V> producerRecord;
117-
117+
118118
public void sendMessageToKafka(){
119119
producerRecord=new ProducerRecord<>("topic-name","message");
120120
messageSender.sendMessage(this.producerRecord);
@@ -225,24 +225,63 @@ It works as follows:
225225
=== Retry configuration and naming convention of redelivery topics.
226226
The following properties should be added in the `application.properties` or `application.yml` file.
227227

228+
The retry pattern in devon4j-kafka will perform for specific topic of a message. So its mandatory to specify the properties for each topic. Below properties are example,
229+
228230
[source,properties]
229231
-----
230-
# Retry back off policy properties
231-
messaging.retry.default.back-off-policy.retryReEnqueueDelay=1000
232-
messaging.retry.default.back-off-policy.retryDelay=600000
233-
messaging.retry.default.back-off-policy.retryDelayMultiplier=1.0
234-
messaging.retry.default.back-off-policy.retryMaxDelay=600000
235-
messaging.retry.default.back-off-policy.retryCount=2
236-
237-
# default retry policy properties
238-
messaging.retry.default.retry-policy.retryPeriod=1800
239-
messaging.retry.default.retry-policy.retryableExceptions=<Class names of exceptions for which a retry should be performed>
240-
messaging.retry.default.retry-policy.retryableExceptionsTraverseCauses=true
232+
# Back off policy properties for employeeapp-employee-v1-delete
233+
messaging.retry.back-off-policy.retryReEnqueueDelay.employeeapp-employee-v1-delete=1000
234+
messaging.retry.back-off-policy.retryDelay.employeeapp-employee-v1-delete=600000
235+
messaging.retry.back-off-policy.retryDelayMultiplier.employeeapp-employee-v1-delete=1.0
236+
messaging.retry.back-off-policy.retryMaxDelay.employeeapp-employee-v1-delete=600000
237+
messaging.retry.back-off-policy.retryCount.employeeapp-employee-v1-delete=2
238+
239+
# Retry policy properties for employeeapp-employee-v1-delete
240+
messaging.retry.retry-policy.retryPeriod.employeeapp-employee-v1-delete=1800
241+
messaging.retry.retry-policy.retryableExceptions.employeeapp-employee-v1-delete=<Class names of exceptions for which a retry should be performed>
242+
messaging.retry.retry-policy.retryableExceptionsTraverseCauses.employeeapp-employee-v1-delete=true
243+
244+
# Back off policy properties for employeeapp-employee-v1-add
245+
messaging.retry.back-off-policy.retryReEnqueueDelay.employeeapp-employee-v1-add=1000
246+
messaging.retry.back-off-policy.retryDelay.employeeapp-employee-v1-add=600000
247+
messaging.retry.back-off-policy.retryDelayMultiplier.employeeapp-employee-v1-add=2.0
248+
messaging.retry.back-off-policy.retryMaxDelay.employeeapp-employee-v1-add=600000
249+
messaging.retry.back-off-policy.retryCount.employeeapp-employee-v1-add=4
250+
251+
# Retry policy properties for employeeapp-employee-v1-add
252+
messaging.retry.retry-policy.retryPeriod.employeeapp-employee-v1-add=3000
253+
messaging.retry.retry-policy.retryableExceptions.employeeapp-employee-v1-add=<Class names of exceptions for which a retry should be performed>
254+
messaging.retry.retry-policy.retryableExceptionsTraverseCauses.employeeapp-employee-v1-add=true
241255
-----
242256

257+
If you notice the above properties, the `retry-policy` and `back-off policy` properties are repeated twice as i have 2 topics for the retry to be performed with different level of values. The topic name should be added at the last of attribute.
258+
259+
So, the retry will be performed for each topic according to their configuration values.
260+
261+
If you want to provide same/default values for all the topics, then its required to add `default` in the place of topic on the above properties example.
262+
263+
For example,
264+
265+
[source,properties]
266+
-----
267+
# Default back off policy properties
268+
messaging.retry.back-off-policy.retryReEnqueueDelay.default=1000
269+
messaging.retry.back-off-policy.retryDelay.default=600000
270+
messaging.retry.back-off-policy.retryDelayMultiplier.default=1.0
271+
messaging.retry.back-off-policy.retryMaxDelay.default=600000
272+
messaging.retry.back-off-policy.retryCount.default=2
273+
274+
# Default retry policy properties
275+
messaging.retry.retry-policy.retryPeriod.default=1800
276+
messaging.retry.retry-policy.retryableExceptions.default=<Class names of exceptions for which a retry should be performed>
277+
messaging.retry.retry-policy.retryableExceptionsTraverseCauses.default=true
278+
-----
279+
280+
By giving properties like above , the same values will be passed for all the topics and the way of processing retry for all the topics are same.
281+
243282
All these above property values are mapped to the classes `DefaultBackOffPolicyProperties.java` and `DefaultRetryPolicyProperties.java` and configured by the class `MessageDefaultRetryConfig.java`.
244283

245-
The MessageRetryContext in devon kafka is used to perform the retry pattern with the properties from DefaultBackOffPolicyProperties and DefaultRetryPolicyProperties.
284+
The MessageRetryContext in devon kafka is used to perform the retry pattern with the properties from DefaultBackOffPolicyProperties and DefaultRetryPolicyProperties.
246285

247286
The 2 main properties of MessageRetryContext is nextRetry and retryUntil which is a `Instant` date format and it is calculated internally using the properties given in DefaultBackOffPolicyProperties and DefaultRetryPolicyProperties.
248287

@@ -259,7 +298,7 @@ Devon4-kafka enqueues a new message for each retry attempt. It is very important
259298

260299
=== Handling retry finally failed
261300

262-
Per default when the retry fails with final attempt we just log the message and delete the payload of ProducerRecord which comes to proceed the retry pattern.
301+
Per default when the retry fails with final attempt we just log the message and delete the payload of ProducerRecord which comes to proceed the retry pattern.
263302

264303
You can change this behavior by providing the implementation class for the interface `MessageRetryHandler.java`
265304
which has two method `retryTimeout` and `retryFailedFinal`.

modules/kafka/src/main/java/com/devonfw/module/kafka/common/messaging/logging/impl/ProducerLoggingListener.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.devonfw.module.kafka.common.messaging.logging.impl;
22

3+
import java.util.Optional;
4+
35
import org.apache.kafka.clients.producer.RecordMetadata;
46
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
@@ -32,10 +34,8 @@ public ProducerLoggingListener(MessageLoggingSupport loggingSupport) {
3234
@Override
3335
public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
3436

35-
String messageKey = "<no message key>";
36-
if (key != null) {
37-
key = key.toString();
38-
}
37+
String messageKey = (String) Optional.ofNullable(key).orElse("<no message key>");
38+
3939
if (recordMetadata != null) {
4040
this.loggingSupport.logMessageSent(LOG, messageKey, recordMetadata.topic(), recordMetadata.partition(),
4141
recordMetadata.offset());

modules/kafka/src/main/java/com/devonfw/module/kafka/common/messaging/retry/api/client/MessageBackOffPolicy.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@ public interface MessageBackOffPolicy {
1616
* @param currentRetryCount the count of attempted retry count.
1717
*
1818
* @param retryUntilTimestamp the retry until time stamp which requires a Instant in String format.
19+
* @param topic the topic
1920
* @return the Instant
2021
*/
21-
Instant getNextRetryTimestamp(long currentRetryCount, String retryUntilTimestamp);
22+
Instant getNextRetryTimestamp(long currentRetryCount, String retryUntilTimestamp, String topic);
2223

2324
/**
2425
* This method is used to make the thread to sleep for {@link DefaultBackOffPolicyProperties#getRetryReEnqueueDelay()}
2526
* seconds.
27+
*
28+
* @param topic the topic
2629
*/
27-
void sleepBeforeReEnqueue();
30+
void sleepBeforeReEnqueue(String topic);
2831

2932
}

modules/kafka/src/main/java/com/devonfw/module/kafka/common/messaging/retry/api/client/MessageRetryPolicy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ public interface MessageRetryPolicy<K, V> {
3838
/**
3939
* This method is used to return the number of retry count.
4040
*
41+
* @param topic the topic
42+
*
4143
* @return the retry count as long.
4244
*/
43-
long getRetryCount();
45+
long getRetryCount(String topic);
4446

4547
}
Lines changed: 68 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package com.devonfw.module.kafka.common.messaging.retry.api.config;
22

3+
import java.util.HashMap;
4+
import java.util.Map;
5+
36
import com.devonfw.module.kafka.common.messaging.retry.api.client.MessageBackOffPolicy;
47

58
/**
@@ -8,92 +11,124 @@
811
*/
912
public class DefaultBackOffPolicyProperties {
1013

11-
private long retryDelay = 60000;
14+
private Map<String, Long> retryDelay = new HashMap<>();
15+
16+
private Map<String, Double> retryDelayMultiplier = new HashMap<>();
17+
18+
private Map<String, Long> retryMaxDelay = new HashMap<>();
1219

13-
private double retryDelayMultiplier = 1.0;
20+
private Map<String, Long> retryReEnqueueDelay = new HashMap<>();
1421

15-
private long retryMaxDelay = 60000;
22+
private long retryDelayDefault = 60000;
1623

17-
private long retryReEnqueueDelay = 1000;
24+
private double retryDelayMultiplierDefault = 1.0;
25+
26+
private long retryMaxDelayDefault = 60000;
27+
28+
private long retryReEnqueueDelayDefault = 1000;
1829

1930
/**
2031
* The retry enque delay to send again the message to kafka. By default 1000.
2132
*
2233
* @return long
2334
*/
24-
public long getRetryReEnqueueDelay() {
35+
public long getRetryReEnqueueDelayDefault() {
2536

26-
return this.retryReEnqueueDelay;
37+
return this.retryReEnqueueDelayDefault;
2738
}
2839

2940
/**
30-
* Set the retry enque delay for {@link #getRetryReEnqueueDelay()}
41+
* The retry delay.
3142
*
32-
* @param retryReEnqueueDelay the enque delay.
43+
* @return long.
3344
*/
34-
public void setRetryReEnqueueDelay(long retryReEnqueueDelay) {
45+
public long getRetryDelayDefault() {
3546

36-
this.retryReEnqueueDelay = retryReEnqueueDelay;
47+
return this.retryDelayDefault;
3748
}
3849

3950
/**
40-
* The retry delay.
51+
* The retry delay multiplier
4152
*
42-
* @return long.
53+
* @return double.
4354
*/
44-
public long getRetryDelay() {
55+
public double getRetryDelayMultiplierDefault() {
4556

46-
return this.retryDelay;
57+
return this.retryDelayMultiplierDefault;
4758
}
4859

4960
/**
50-
* Set the retry delay for {@link #getRetryDelay()}. By default 1000
61+
* The retry max delay.
5162
*
52-
* @param retryDelay the retry delay.
63+
* @return the long.
64+
*/
65+
public long getRetryMaxDelayDefault() {
66+
67+
return this.retryMaxDelayDefault;
68+
}
69+
70+
/**
71+
* @return retryDelay
5372
*/
54-
public void setRetryDelay(long retryDelay) {
73+
public Map<String, Long> getRetryDelay() {
74+
75+
return this.retryDelay;
76+
}
77+
78+
/**
79+
* @param retryDelay new value of {@link #getRetryDelay}.
80+
*/
81+
public void setRetryDelay(Map<String, Long> retryDelay) {
5582

5683
this.retryDelay = retryDelay;
5784
}
5885

5986
/**
60-
* The retry delay multiplier
61-
*
62-
* @return double.
87+
* @return retryDelayMultiplier
6388
*/
64-
public double getRetryDelayMultiplier() {
89+
public Map<String, Double> getRetryDelayMultiplier() {
6590

6691
return this.retryDelayMultiplier;
6792
}
6893

6994
/**
70-
* Set the retry delay multiplier for {@link #getRetryDelayMultiplier()}. By default 1.0
71-
*
72-
* @param retryDelayMultiplier the retry delay multiplier.
95+
* @param retryDelayMultiplier new value of {@link #getRetryDelayMultiplier}.
7396
*/
74-
public void setRetryDelayMultiplier(double retryDelayMultiplier) {
97+
public void setRetryDelayMultiplier(Map<String, Double> retryDelayMultiplier) {
7598

7699
this.retryDelayMultiplier = retryDelayMultiplier;
77100
}
78101

79102
/**
80-
* The retry max delay.
81-
*
82-
* @return the long.
103+
* @return retryMaxDelay
83104
*/
84-
public long getRetryMaxDelay() {
105+
public Map<String, Long> getRetryMaxDelay() {
85106

86107
return this.retryMaxDelay;
87108
}
88109

89110
/**
90-
* Set the retry max delay for {@link #getRetryMaxDelay()}. by default 60000.
91-
*
92-
* @param retryMaxDelay the retry max delay.
111+
* @param retryMaxDelay new value of {@link #getRetryMaxDelay}.
93112
*/
94-
public void setRetryMaxDelay(long retryMaxDelay) {
113+
public void setRetryMaxDelay(Map<String, Long> retryMaxDelay) {
95114

96115
this.retryMaxDelay = retryMaxDelay;
97116
}
98117

118+
/**
119+
* @return retryReEnqueueDelay
120+
*/
121+
public Map<String, Long> getRetryReEnqueueDelay() {
122+
123+
return this.retryReEnqueueDelay;
124+
}
125+
126+
/**
127+
* @param retryReEnqueueDelay new value of {@link #getRetryReEnqueueDelay}.
128+
*/
129+
public void setRetryReEnqueueDelay(Map<String, Long> retryReEnqueueDelay) {
130+
131+
this.retryReEnqueueDelay = retryReEnqueueDelay;
132+
}
133+
99134
}

0 commit comments

Comments
 (0)