Skip to content

Commit 73544fe

Browse files
committed
fix: add backoff
1 parent f5b87df commit 73544fe

File tree

1 file changed

+50
-47
lines changed

1 file changed

+50
-47
lines changed
Lines changed: 50 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
/**
2-
* Copyright 2019 Sven Loesekann
3-
Licensed under the Apache License, Version 2.0 (the "License");
4-
you may not use this file except in compliance with the License.
5-
You may obtain a copy of the License at
6-
http://www.apache.org/licenses/LICENSE-2.0
7-
Unless required by applicable law or agreed to in writing, software
8-
distributed under the License is distributed on an "AS IS" BASIS,
9-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10-
See the License for the specific language governing permissions and
11-
limitations under the License.
2+
* Copyright 2019 Sven Loesekann
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
1212
*/
1313
package ch.xxx.moviemanager.adapter.events;
1414

1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
1717
import org.springframework.context.annotation.Profile;
18+
import org.springframework.kafka.annotation.BackOff;
1819
import org.springframework.kafka.annotation.DltHandler;
1920
import org.springframework.kafka.annotation.KafkaListener;
2021
import org.springframework.kafka.annotation.RetryableTopic;
@@ -33,45 +34,47 @@
3334
@Service
3435
@Profile("kafka | prod-kafka")
3536
public class KafkaConsumer {
36-
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
37-
private final JsonMapper jsonMapper;
38-
private final UserDetailServiceEvents appUserService;
39-
private final KafkaListenerDltHandler kafkaListenerDltHandler;
37+
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
38+
private final JsonMapper jsonMapper;
39+
private final UserDetailServiceEvents appUserService;
40+
private final KafkaListenerDltHandler kafkaListenerDltHandler;
4041

41-
public KafkaConsumer(JsonMapper jsonMapper, UserDetailServiceEvents appUserService, KafkaListenerDltHandler kafkaListenerDltHandler) {
42-
this.jsonMapper = jsonMapper;
43-
this.appUserService = appUserService;
44-
this.kafkaListenerDltHandler = kafkaListenerDltHandler;
45-
}
42+
public KafkaConsumer(JsonMapper jsonMapper, UserDetailServiceEvents appUserService, KafkaListenerDltHandler kafkaListenerDltHandler) {
43+
this.jsonMapper = jsonMapper;
44+
this.appUserService = appUserService;
45+
this.kafkaListenerDltHandler = kafkaListenerDltHandler;
46+
}
4647

47-
@RetryableTopic(kafkaTemplate = "kafkaRetryTemplate", attempts = "3", autoCreateTopics = "true", topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
48-
@KafkaListener(topics = KafkaConfig.NEW_USER_TOPIC)
49-
public void consumerForNewUserTopic(String message) {
50-
LOGGER.info("consumerForNewUserTopic [{}]", message);
51-
try {
52-
UserDto dto = this.jsonMapper.readValue(message, UserDto.class);
53-
this.appUserService.signinMsg(dto);
54-
} catch (Exception e) {
55-
LOGGER.warn("send failed consumerForNewUserTopic [{}]", message);
56-
this.kafkaListenerDltHandler.sendToDefaultDlt(new KafkaEventDto(KafkaConfig.DEFAULT_DLT_TOPIC, message));
57-
}
58-
}
48+
@RetryableTopic(kafkaTemplate = "kafkaRetryTemplate", backOff = @BackOff(delay = 1000, multiplier = 2.0),
49+
attempts = "3", autoCreateTopics = "true", topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
50+
@KafkaListener(topics = KafkaConfig.NEW_USER_TOPIC)
51+
public void consumerForNewUserTopic(String message) {
52+
LOGGER.info("consumerForNewUserTopic [{}]", message);
53+
try {
54+
UserDto dto = this.jsonMapper.readValue(message, UserDto.class);
55+
this.appUserService.signinMsg(dto);
56+
} catch (Exception e) {
57+
LOGGER.warn("send failed consumerForNewUserTopic [{}]", message);
58+
this.kafkaListenerDltHandler.sendToDefaultDlt(new KafkaEventDto(KafkaConfig.DEFAULT_DLT_TOPIC, message));
59+
}
60+
}
5961

60-
@DltHandler
61-
public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
62-
LOGGER.info(in + " from " + topic);
63-
}
62+
@DltHandler
63+
public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
64+
LOGGER.info(in + " from " + topic);
65+
}
6466

65-
@RetryableTopic(attempts = "3", autoCreateTopics = "true", topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
66-
@KafkaListener(topics = KafkaConfig.USER_LOGOUT_TOPIC)
67-
public void consumerForUserLogoutsTopic(String message) {
68-
LOGGER.info("consumerForUserLogoutsTopic [{}]", message);
69-
try {
70-
RevokedTokenDto dto = this.jsonMapper.readValue(message, RevokedTokenDto.class);
71-
this.appUserService.logoutMsg(dto);
72-
} catch (Exception e) {
73-
LOGGER.warn("send failed consumerForUserLogoutsTopic [{}]", message);
74-
this.kafkaListenerDltHandler.sendToDefaultDlt(new KafkaEventDto(KafkaConfig.DEFAULT_DLT_TOPIC, message));
75-
}
76-
}
67+
@RetryableTopic(backOff = @BackOff(delay = 1000, multiplier = 2.0), attempts = "3", autoCreateTopics = "true",
68+
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
69+
@KafkaListener(topics = KafkaConfig.USER_LOGOUT_TOPIC)
70+
public void consumerForUserLogoutsTopic(String message) {
71+
LOGGER.info("consumerForUserLogoutsTopic [{}]", message);
72+
try {
73+
RevokedTokenDto dto = this.jsonMapper.readValue(message, RevokedTokenDto.class);
74+
this.appUserService.logoutMsg(dto);
75+
} catch (Exception e) {
76+
LOGGER.warn("send failed consumerForUserLogoutsTopic [{}]", message);
77+
this.kafkaListenerDltHandler.sendToDefaultDlt(new KafkaEventDto(KafkaConfig.DEFAULT_DLT_TOPIC, message));
78+
}
79+
}
7780
}

0 commit comments

Comments
 (0)