Skip to content

Commit 9159ef9

Browse files
authored
GH-1994: Option to Strip Exception Headers
Resolves #1994 Add an option to strip previous exception headers when republishing a dead letter record. * Fix typos. * Fix constant. * Fix typo in doc.
1 parent afc2883 commit 9159ef9

File tree

10 files changed

+178
-43
lines changed

10 files changed

+178
-43
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5607,6 +5607,24 @@ Starting with version 2.7, the recoverer checks that the partition selected by t
56075607
If the partition is not present, the partition in the `ProducerRecord` is set to `null`, allowing the `KafkaProducer` to select the partition.
56085608
You can disable this check by setting the `verifyPartition` property to `false`.
56095609

5610+
[[dlpr-headers]]
5611+
===== Managing Dead Letter Record Headers
5612+
5613+
Referring to <<dead-letters>> above, the `DeadLetterPublishingRecoverer` has two properties used to manage headers when those headers already exist (such as when reprocessing a dead letter record that failed, including when using <<retry-topic>>).
5614+
5615+
* `appendOriginalHeaders` (default `true`)
5616+
* `stripPreviousExceptionHeaders` (default `false` - will be `true` in version 2.8 and later)
5617+
5618+
Apache Kafka supports multiple headers with the same name; to obtain the "latest" value, you can use `headers.lastHeader(headerName)`; to get an iterator over multiple headers, use `headers.headers(headerName).iterator()`.
5619+
5620+
When repeatedly republishing a failed record, these headers can grow (and eventually cause publication to fail due to a `RecordTooLargeException`); this is especially true for the exception headers and particularly for the stack trace headers.
5621+
5622+
The reason for the two properties is because, while you might want to retain only the last exception information, you might want to retain the history of which topic(s) the record passed through for each failure.
5623+
5624+
`appendOriginalHeaders` is applied to all headers named `*ORIGINAL*` while `stripPreviousExceptionHeaders` is applied to all headers named `*EXCEPTION*`.
5625+
5626+
Also see <<retry-headers>>.
5627+
56105628
[[exp-backoff]]
56115629
===== `ExponentialBackOffWithMaxRetries` Implementation
56125630

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,33 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>
357357

358358
NOTE: By default the topics are autocreated with one partition and a replication factor of one.
359359

360+
[[retry-headers]]
361+
===== Failure Header Management
362+
363+
When considering how to manage failure headers (original headers and exception headers), the framework delegates to the `DeadLetterPublishingRecover` to decide whether to append or replace the headers.
364+
365+
By default, it explicitly sets `appendOriginalHeaders` to `false` and leaves `stripPreviousExceptionHeaders` to the default used by the `DeadLetterPublishingRecover`.
366+
367+
This means that, currently, records published to multiple retry topics may grow to large size, especially when the stack trace is large.
368+
369+
See <<dlpr-headers>> for more information.
370+
371+
To reconfigure the framework to use different settings for these properties, replace standard `DeadLetterPublishingRecovererFactory` bean by adding a `recovererCustomizer`:
372+
373+
====
374+
[source, java]
375+
----
376+
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
377+
DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
378+
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver);
379+
factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
380+
dlpr.appendOriginalHeaders(true);
381+
dlpr.setStripPreviousExceptionHeaders(true);
382+
});
383+
return factory;
384+
}
385+
----
386+
====
360387

361388
==== Topic Naming
362389

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ In addition, the recoverer verifies that the partition selected by the destinati
6262

6363
See <<dead-letters>> for more information.
6464

65+
There is now an option (`stripPreviousExceptionHeaders`) to prevent build-up of multiple large headers when republishing dead letter records.
66+
67+
See <<dlpr-headers>> for more information.
68+
6569
[[x27-CKTM]]
6670
==== `ChainedKafkaTransactionManager` is Deprecated
6771

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,16 @@ public class DeadLetterPublishingRecoverer implements ConsumerAwareRecordRecover
9393

9494
private Duration waitForSendResultTimeout = Duration.ofSeconds(THIRTY);
9595

96-
private boolean replaceOriginalHeaders = true;
96+
private boolean appendOriginalHeaders = true;
9797

9898
private boolean failIfSendResultIsError = true;
9999

100100
private boolean throwIfNoDestinationReturned = false;
101101

102102
private long timeoutBuffer = Duration.ofSeconds(FIVE).toMillis();
103103

104+
private boolean stripPreviousExceptionHeaders;
105+
104106
/**
105107
* Create an instance with the provided template and a default destination resolving
106108
* function that returns a TopicPartition based on the original topic (appended with ".DLT")
@@ -246,13 +248,27 @@ public void setPartitionInfoTimeout(Duration partitionInfoTimeout) {
246248
}
247249

248250
/**
249-
* Set to false if you don't want to replace the dead letter original headers if
250-
* they are already present.
251+
* Set to false if you don't want to append the current "original" headers (topic,
252+
* partition etc.) if they are already present. When false, only the first "original"
253+
* headers are retained.
251254
* @param replaceOriginalHeaders set to false not to replace.
252255
* @since 2.7
256+
* @deprecated in favor of {@link #setAppendOriginalHeaders(boolean)}.
253257
*/
258+
@Deprecated
254259
public void setReplaceOriginalHeaders(boolean replaceOriginalHeaders) {
255-
this.replaceOriginalHeaders = replaceOriginalHeaders;
260+
this.appendOriginalHeaders = replaceOriginalHeaders;
261+
}
262+
263+
/**
264+
* Set to false if you don't want to append the current "original" headers (topic,
265+
* partition etc.) if they are already present. When false, only the first "original"
266+
* headers are retained.
267+
* @param appendOriginalHeaders set to false not to replace.
268+
* @since 2.7.9
269+
*/
270+
public void setAppendOriginalHeaders(boolean appendOriginalHeaders) {
271+
this.appendOriginalHeaders = appendOriginalHeaders;
256272
}
257273

258274
/**
@@ -299,6 +315,18 @@ public void setTimeoutBuffer(long buffer) {
299315
this.timeoutBuffer = buffer;
300316
}
301317

318+
/**
319+
* Set to true to remove previous exception headers and only retain headers for the
320+
* current exception. Default is false, which means all exception header values are
321+
* retained; this can cause a growth in record size when a record is republished many
322+
* times.
323+
* @param stripPreviousExceptionHeaders true to strip.
324+
* @since 2.7.9
325+
*/
326+
public void setStripPreviousExceptionHeaders(boolean stripPreviousExceptionHeaders) {
327+
this.stripPreviousExceptionHeaders = stripPreviousExceptionHeaders;
328+
}
329+
302330
@SuppressWarnings("unchecked")
303331
@Override
304332
public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consumer, Exception exception) {
@@ -535,29 +563,35 @@ private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?>
535563
}
536564

537565
private void maybeAddHeader(Headers kafkaHeaders, String header, byte[] value) {
538-
if (this.replaceOriginalHeaders || kafkaHeaders.lastHeader(header) == null) {
566+
if (this.appendOriginalHeaders || kafkaHeaders.lastHeader(header) == null) {
539567
kafkaHeaders.add(header, value);
540568
}
541569
}
542570

543-
void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception,
544-
boolean isKey) {
545-
kafkaHeaders.add(new RecordHeader(isKey ? this.headerNames.exceptionInfo.keyExceptionFqcn
571+
void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception, boolean isKey) {
572+
appendOrReplace(kafkaHeaders, new RecordHeader(isKey ? this.headerNames.exceptionInfo.keyExceptionFqcn
546573
: this.headerNames.exceptionInfo.exceptionFqcn,
547574
exception.getClass().getName().getBytes(StandardCharsets.UTF_8)));
548575
String message = exception.getMessage();
549576
if (message != null) {
550-
kafkaHeaders.add(new RecordHeader(isKey
577+
appendOrReplace(kafkaHeaders, new RecordHeader(isKey
551578
? this.headerNames.exceptionInfo.keyExceptionMessage
552579
: this.headerNames.exceptionInfo.exceptionMessage,
553580
exception.getMessage().getBytes(StandardCharsets.UTF_8)));
554581
}
555-
kafkaHeaders.add(new RecordHeader(isKey
582+
appendOrReplace(kafkaHeaders, new RecordHeader(isKey
556583
? this.headerNames.exceptionInfo.keyExceptionStacktrace
557584
: this.headerNames.exceptionInfo.exceptionStacktrace,
558585
getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)));
559586
}
560587

588+
private void appendOrReplace(Headers headers, RecordHeader header) {
589+
if (this.stripPreviousExceptionHeaders) {
590+
headers.remove(header.key());
591+
}
592+
headers.add(header);
593+
}
594+
561595
private String getStackTraceAsString(Throwable cause) {
562596
StringWriter stringWriter = new StringWriter();
563597
PrintWriter printWriter = new PrintWriter(stringWriter, true);

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
8585

8686
recoverer.setHeadersFunction((consumerRecord, e) -> addHeaders(consumerRecord, e, getAttempts(consumerRecord)));
8787
recoverer.setFailIfSendResultIsError(true);
88-
recoverer.setReplaceOriginalHeaders(false);
88+
recoverer.setAppendOriginalHeaders(false);
8989
recoverer.setThrowIfNoDestinationReturned(false);
9090
this.recovererCustomizer.accept(recoverer);
9191
return recoverer;

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBootstrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ private void registerBeans() {
8080
DefaultDestinationTopicProcessor.class);
8181
registerIfNotContains(RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME,
8282
ListenerContainerFactoryConfigurer.class);
83-
registerIfNotContains(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_PROVIDER_NAME,
83+
registerIfNotContains(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME,
8484
DeadLetterPublishingRecovererFactory.class);
8585
registerIfNotContains(RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER, RetryTopicConfigurer.class);
8686
registerIfNotContains(RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME,

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicInternalBeanNames.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,16 @@ public abstract class RetryTopicInternalBeanNames {
5252
/**
5353
* {@link DeadLetterPublishingRecovererFactory} bean name.
5454
*/
55-
public static final String DEAD_LETTER_PUBLISHING_RECOVERER_PROVIDER_NAME = "internalDeadLetterPublishingRecovererProvider";
55+
public static final String DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME =
56+
"internalDeadLetterPublishingRecovererProvider";
57+
58+
/**
59+
* {@link DeadLetterPublishingRecovererFactory} bean name.
60+
* @deprecated in favor of {@link #DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME}
61+
*/
62+
@Deprecated
63+
public static final String DEAD_LETTER_PUBLISHING_RECOVERER_PROVIDER_NAME =
64+
DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME;
5665

5766
/**
5867
* {@link DestinationTopicContainer} bean name.

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.time.Duration;
3939
import java.util.Collections;
4040
import java.util.HashMap;
41+
import java.util.Iterator;
4142
import java.util.LinkedHashMap;
4243
import java.util.Map;
4344
import java.util.concurrent.TimeUnit;
@@ -329,15 +330,16 @@ void allOriginalHeaders() {
329330

330331
@SuppressWarnings({"unchecked", "rawtypes"})
331332
@Test
332-
void dontReplaceOriginalHeaders() {
333+
void dontAppendOriginalHeaders() {
333334
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
334335
ListenableFuture future = mock(ListenableFuture.class);
335336
given(template.send(any(ProducerRecord.class))).willReturn(future);
336337
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, 1234L,
337338
TimestampType.CREATE_TIME, 4321L, 123, 123, "bar", null);
338339
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
339-
recoverer.setReplaceOriginalHeaders(false);
340-
recoverer.accept(record, new RuntimeException());
340+
recoverer.setStripPreviousExceptionHeaders(true);
341+
recoverer.setAppendOriginalHeaders(false);
342+
recoverer.accept(record, new RuntimeException(new IllegalStateException()));
341343
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
342344
then(template).should(times(1)).send(producerRecordCaptor.capture());
343345
Headers headers = producerRecordCaptor.getValue().headers();
@@ -346,32 +348,46 @@ void dontReplaceOriginalHeaders() {
346348
Header originalOffsetHeader = headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET);
347349
Header originalTimestampHeader = headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP);
348350
Header originalTimestampType = headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE);
351+
Header firstExceptionType = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN);
352+
Header firstExceptionMessage = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE);
353+
Header firstExceptionStackTrace = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE);
349354

350355
ConsumerRecord<String, String> anotherRecord = new ConsumerRecord<>("bar", 1, 12L, 4321L,
351356
TimestampType.LOG_APPEND_TIME, 1234L, 321, 321, "bar", null);
352357
headers.forEach(header -> anotherRecord.headers().add(header));
353-
recoverer.accept(anotherRecord, new RuntimeException());
358+
recoverer.accept(anotherRecord, new RuntimeException(new IllegalStateException()));
354359
ArgumentCaptor<ProducerRecord> anotherProducerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
355-
then(template).should(times(2)).send(producerRecordCaptor.capture());
356-
Headers anotherHeaders = producerRecordCaptor.getAllValues().get(1).headers();
357-
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isEqualTo(originalTopicHeader);
358-
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isEqualTo(originalPartitionHeader);
359-
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isEqualTo(originalOffsetHeader);
360-
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isEqualTo(originalTimestampHeader);
361-
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isEqualTo(originalTimestampType);
360+
then(template).should(times(2)).send(anotherProducerRecordCaptor.capture());
361+
Headers anotherHeaders = anotherProducerRecordCaptor.getAllValues().get(1).headers();
362+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isSameAs(originalTopicHeader);
363+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isSameAs(originalPartitionHeader);
364+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isSameAs(originalOffsetHeader);
365+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isSameAs(originalTimestampHeader);
366+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE))
367+
.isSameAs(originalTimestampType);
368+
Iterator<Header> originalTopics = anotherHeaders.headers(KafkaHeaders.DLT_ORIGINAL_TOPIC).iterator();
369+
assertThat(originalTopics.next()).isSameAs(originalTopicHeader);
370+
assertThat(originalTopics.hasNext()).isFalse();
371+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNotSameAs(firstExceptionType);
372+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotSameAs(firstExceptionMessage);
373+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE))
374+
.isNotSameAs(firstExceptionStackTrace);
375+
Iterator<Header> exceptionHeaders = anotherHeaders.headers(KafkaHeaders.DLT_EXCEPTION_FQCN).iterator();
376+
assertThat(exceptionHeaders.next()).isNotSameAs(firstExceptionType);
377+
assertThat(exceptionHeaders.hasNext()).isFalse();
362378
}
363379

364380
@SuppressWarnings({"unchecked", "rawtypes"})
365381
@Test
366-
void replaceOriginalHeaders() {
382+
void appendOriginalHeaders() {
367383
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
368384
ListenableFuture future = mock(ListenableFuture.class);
369385
given(template.send(any(ProducerRecord.class))).willReturn(future);
370386
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, 1234L,
371387
TimestampType.CREATE_TIME, 4321L, 123, 123, "bar", null);
372388
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
373-
recoverer.setReplaceOriginalHeaders(true);
374-
recoverer.accept(record, new RuntimeException());
389+
recoverer.setAppendOriginalHeaders(true);
390+
recoverer.accept(record, new RuntimeException(new IllegalStateException()));
375391
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
376392
then(template).should(times(1)).send(producerRecordCaptor.capture());
377393
Headers headers = producerRecordCaptor.getValue().headers();
@@ -380,19 +396,37 @@ void replaceOriginalHeaders() {
380396
Header originalOffsetHeader = headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET);
381397
Header originalTimestampHeader = headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP);
382398
Header originalTimestampType = headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE);
399+
Header firstExceptionType = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN);
400+
Header firstExceptionMessage = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE);
401+
Header firstExceptionStackTrace = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE);
383402

384403
ConsumerRecord<String, String> anotherRecord = new ConsumerRecord<>("bar", 1, 12L, 4321L,
385404
TimestampType.LOG_APPEND_TIME, 1234L, 321, 321, "bar", null);
386405
headers.forEach(header -> anotherRecord.headers().add(header));
387-
recoverer.accept(anotherRecord, new RuntimeException());
406+
recoverer.accept(anotherRecord, new RuntimeException(new IllegalStateException()));
388407
ArgumentCaptor<ProducerRecord> anotherProducerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
389408
then(template).should(times(2)).send(anotherProducerRecordCaptor.capture());
390409
Headers anotherHeaders = anotherProducerRecordCaptor.getAllValues().get(1).headers();
391-
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNotEqualTo(originalTopicHeader);
392-
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNotEqualTo(originalPartitionHeader);
393-
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNotEqualTo(originalOffsetHeader);
394-
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNotEqualTo(originalTimestampHeader);
395-
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNotEqualTo(originalTimestampType);
410+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNotSameAs(originalTopicHeader);
411+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION))
412+
.isNotSameAs(originalPartitionHeader);
413+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNotSameAs(originalOffsetHeader);
414+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP))
415+
.isNotSameAs(originalTimestampHeader);
416+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE))
417+
.isNotSameAs(originalTimestampType);
418+
Iterator<Header> originalTopics = anotherHeaders.headers(KafkaHeaders.DLT_ORIGINAL_TOPIC).iterator();
419+
assertThat(originalTopics.next()).isSameAs(originalTopicHeader);
420+
assertThat(originalTopics.next()).isNotNull();
421+
assertThat(originalTopics.hasNext()).isFalse();
422+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNotSameAs(firstExceptionType);
423+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotSameAs(firstExceptionMessage);
424+
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE))
425+
.isNotSameAs(firstExceptionStackTrace);
426+
Iterator<Header> exceptionHeaders = anotherHeaders.headers(KafkaHeaders.DLT_EXCEPTION_FQCN).iterator();
427+
assertThat(exceptionHeaders.next()).isSameAs(firstExceptionType);
428+
assertThat(exceptionHeaders.next()).isNotNull();
429+
assertThat(exceptionHeaders.hasNext()).isFalse();
396430
}
397431

398432
@SuppressWarnings({ "unchecked", "rawtypes" })

0 commit comments

Comments
 (0)