Skip to content

Commit dfce233

Browse files
garyrussellartembilan
authored andcommitted
GH-2082: Fix RecordInterceptor
Resolves #2082 Now that the `earlyRecordInterceptor` is always used unless explicitly disabled, move delivery attempt header processing earlier so that the header is available in the interceptor. **cherry-pick to 2.8.x, 2.7.x**
1 parent 61103ef commit dfce233

File tree

2 files changed

+25
-14
lines changed

2 files changed

+25
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2173,16 +2173,28 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
21732173
}
21742174

21752175
@Nullable
2176-
private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> nextArg) {
2177-
ConsumerRecord<K, V> next = nextArg;
2176+
private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg) {
2177+
deliveryAttemptHeader(recordArg);
2178+
ConsumerRecord<K, V> record = recordArg;
21782179
if (this.earlyRecordInterceptor != null) {
2179-
next = this.earlyRecordInterceptor.intercept(next, this.consumer);
2180-
if (next == null) {
2180+
record = this.earlyRecordInterceptor.intercept(record, this.consumer);
2181+
if (record == null) {
21812182
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
2182-
+ ListenerUtils.recordToString(nextArg));
2183+
+ ListenerUtils.recordToString(recordArg));
21832184
}
21842185
}
2185-
return next;
2186+
return record;
2187+
}
2188+
2189+
private void deliveryAttemptHeader(final ConsumerRecord<K, V> record) {
2190+
if (this.deliveryAttemptAware != null) {
2191+
byte[] buff = new byte[4]; // NOSONAR (magic #)
2192+
ByteBuffer bb = ByteBuffer.wrap(buff);
2193+
bb.putInt(this.deliveryAttemptAware
2194+
.deliveryAttempt(
2195+
new TopicPartitionOffset(record.topic(), record.partition(), record.offset())));
2196+
record.headers().add(new RecordHeader(KafkaHeaders.DELIVERY_ATTEMPT, buff));
2197+
}
21862198
}
21872199

21882200
private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecord<K, V> record) {
@@ -2304,14 +2316,6 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record) {
23042316
if (record.key() == null && this.checkNullKeyForExceptions) {
23052317
checkDeser(record, ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER);
23062318
}
2307-
if (this.deliveryAttemptAware != null) {
2308-
byte[] buff = new byte[4]; // NOSONAR (magic #)
2309-
ByteBuffer bb = ByteBuffer.wrap(buff);
2310-
bb.putInt(this.deliveryAttemptAware
2311-
.deliveryAttempt(
2312-
new TopicPartitionOffset(record.topic(), record.partition(), record.offset())));
2313-
record.headers().add(new RecordHeader(KafkaHeaders.DELIVERY_ATTEMPT, buff));
2314-
}
23152319
doInvokeOnMessage(record);
23162320
if (this.nackSleep < 0 && !this.isManualImmediateAck) {
23172321
ackCurrent(record);

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
124124
assertThat(this.config.count).isEqualTo(8);
125125
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux", "qux", "qux", "fiz", "buz");
126126
assertThat(this.config.deliveries).contains(1, 1, 1, 1, 2, 3, 1, 1);
127+
assertThat(this.config.deliveryAttempt).isNotNull();
127128
}
128129

129130
@Configuration
@@ -144,6 +145,8 @@ public static class Config {
144145

145146
int count;
146147

148+
volatile org.apache.kafka.common.header.Header deliveryAttempt;
149+
147150
@KafkaListener(groupId = "grp",
148151
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
149152
partitions = "#{'0,1,2'.split(',')}",
@@ -226,6 +229,10 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
226229
factory.setErrorHandler(new SeekToCurrentErrorHandler());
227230
factory.getContainerProperties().setAckMode(AckMode.RECORD);
228231
factory.getContainerProperties().setDeliveryAttemptHeader(true);
232+
factory.setRecordInterceptor(record -> {
233+
Config.this.deliveryAttempt = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
234+
return record;
235+
});
229236
return factory;
230237
}
231238

0 commit comments

Comments
 (0)