Skip to content

Commit 1361a36

Browse files
garyrussellartembilan
authored andcommitted
GH-2003: Fix RetryableTopic Delivery Attempts Hdr
Resolves #2003 **cherry-pick to 2.7.x**
1 parent 6ed0b2e commit 1361a36

File tree

2 files changed

+56
-9
lines changed

2 files changed

+56
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.retrytopic;
1818

1919
import java.math.BigInteger;
20+
import java.nio.ByteBuffer;
2021
import java.time.Instant;
2122
import java.util.function.Consumer;
2223

@@ -137,17 +138,28 @@ protected TopicPartition resolveTopicPartition(final ConsumerRecord<?, ?> cr, fi
137138

138139
private int getAttempts(ConsumerRecord<?, ?> consumerRecord) {
139140
Header header = consumerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
140-
return header != null
141-
? header.value()[0]
142-
: 1;
141+
if (header != null) {
142+
byte[] value = header.value();
143+
if (value.length == 1) { // backwards compatibility
144+
return value[0];
145+
}
146+
else if (value.length == 4) {
147+
return ByteBuffer.wrap(value).getInt();
148+
}
149+
else {
150+
LOGGER.debug(() -> "Unexected size for " + RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS + " header: "
151+
+ value.length);
152+
}
153+
}
154+
return 1;
143155
}
144156

145157
private Headers addHeaders(ConsumerRecord<?, ?> consumerRecord, Exception e, int attempts) {
146158
Headers headers = new RecordHeaders();
147159
byte[] originalTimestampHeader = getOriginalTimestampHeaderBytes(consumerRecord);
148160
headers.add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, originalTimestampHeader);
149161
headers.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS,
150-
BigInteger.valueOf(attempts + 1).toByteArray());
162+
ByteBuffer.wrap(new byte[4]).putInt(attempts + 1).array());
151163
headers.add(RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP,
152164
BigInteger.valueOf(getNextExecutionTimestamp(consumerRecord, e, originalTimestampHeader))
153165
.toByteArray());

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.mockito.Mockito.times;
2626

2727
import java.math.BigInteger;
28+
import java.nio.ByteBuffer;
2829
import java.time.Clock;
2930
import java.time.Instant;
3031
import java.time.LocalDateTime;
@@ -125,22 +126,22 @@ void shouldSendMessage() {
125126
// assert headers
126127
Header attemptsHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
127128
assertThat(attemptsHeader).isNotNull();
128-
assertThat(attemptsHeader.value()[0]).isEqualTo(Integer.valueOf(2).byteValue());
129+
assertThat(ByteBuffer.wrap(attemptsHeader.value()).getInt()).isEqualTo(2);
129130
Header timestampHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP);
130131
assertThat(timestampHeader).isNotNull();
131132
assertThat(new BigInteger(timestampHeader.value()).longValue()).isEqualTo(failureTimestamp + 1000L);
132133
}
133134

134135
@Test
135-
void shouldIncreaseAttempts() {
136+
void shouldIncreaseAttemptsInLegacyHeader() {
136137

137138
// setup
138139
RuntimeException e = new RuntimeException();
139140
ConsumerRecord consumerRecord = new ConsumerRecord(testTopic, 0, 0, key, value);
140-
consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, BigInteger.valueOf(1).toByteArray());
141+
consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, BigInteger.valueOf(127).toByteArray());
141142
consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, this.originalTimestampBytes);
142143

143-
given(destinationTopicResolver.resolveDestinationTopic(testTopic, 1, e, originalTimestamp))
144+
given(destinationTopicResolver.resolveDestinationTopic(testTopic, 127, e, originalTimestamp))
144145
.willReturn(destinationTopic);
145146
given(destinationTopic.isNoOpsTopic()).willReturn(false);
146147
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
@@ -159,7 +160,41 @@ void shouldIncreaseAttempts() {
159160
ProducerRecord producerRecord = producerRecordCaptor.getValue();
160161
Header attemptsHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
161162
assertThat(attemptsHeader).isNotNull();
162-
assertThat(attemptsHeader.value()[0]).isEqualTo(Integer.valueOf(2).byteValue());
163+
assertThat(attemptsHeader.value().length).isEqualTo(4); // handled a legacy one byte header ok
164+
assertThat(ByteBuffer.wrap(attemptsHeader.value()).getInt()).isEqualTo(128);
165+
}
166+
167+
@Test
168+
void shouldIncreaseAttemptsInNewHeader() {
169+
170+
// setup
171+
RuntimeException e = new RuntimeException();
172+
ConsumerRecord consumerRecord = new ConsumerRecord(testTopic, 0, 0, key, value);
173+
consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS,
174+
ByteBuffer.wrap(new byte[4]).putInt(127).array());
175+
consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, this.originalTimestampBytes);
176+
177+
given(destinationTopicResolver.resolveDestinationTopic(testTopic, 127, e, originalTimestamp))
178+
.willReturn(destinationTopic);
179+
given(destinationTopic.isNoOpsTopic()).willReturn(false);
180+
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
181+
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
182+
willReturn(kafkaOperations).given(destinationTopic).getKafkaOperations();
183+
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);
184+
185+
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver);
186+
187+
// when
188+
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = factory.create();
189+
deadLetterPublishingRecoverer.accept(consumerRecord, e);
190+
191+
// then
192+
then(kafkaOperations).should(times(1)).send(producerRecordCaptor.capture());
193+
ProducerRecord producerRecord = producerRecordCaptor.getValue();
194+
Header attemptsHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
195+
assertThat(attemptsHeader).isNotNull();
196+
assertThat(attemptsHeader.value().length).isEqualTo(4);
197+
assertThat(ByteBuffer.wrap(attemptsHeader.value()).getInt()).isEqualTo(128);
163198
}
164199

165200
@Test

0 commit comments

Comments
 (0)