@@ -392,7 +392,9 @@ public void testCustomReplyTopicHeaderIsNotDuplicated() throws Exception {
392392 RequestReplyMessageFuture <Integer , String > future = template .sendAndReceive (message , Duration .ofSeconds (30 ));
393393 future .getSendFuture ().get (10 , TimeUnit .SECONDS ); // send ok
394394 Message <?> resultingMessage = future .get (30 , TimeUnit .SECONDS );
395+
395396 assertThat (resultingMessage .getPayload ()).isEqualTo ("OK" );
397+ assertThat (resultingMessage .getHeaders ()).containsEntry ("originalPayload" , "expected_message" );
396398 }
397399 finally {
398400 template .stop ();
@@ -418,7 +420,9 @@ public void testCustomReplyHeadersAreNotDuplicated() throws Exception {
418420 RequestReplyMessageFuture <Integer , String > future = template .sendAndReceive (message , Duration .ofSeconds (30 ));
419421 future .getSendFuture ().get (10 , TimeUnit .SECONDS ); // send ok
420422 Message <?> resultingMessage = future .get (30 , TimeUnit .SECONDS );
423+
421424 assertThat (resultingMessage .getPayload ()).isEqualTo ("OK" );
425+ assertThat (resultingMessage .getHeaders ()).containsEntry ("originalPayload" , "expected_message" );
422426 }
423427 finally {
424428 template .stop ();
@@ -932,14 +936,6 @@ void testMessageIterableReturn() throws Exception {
932936 }
933937 }
934938
935- private static int length (Iterable <?> iterable ) {
936- int counter = 0 ;
937- for (Object o : iterable ) {
938- counter ++;
939- }
940- return counter ;
941- }
942-
943939 @ Configuration
944940 @ EnableKafka
945941 public static class Config {
@@ -1116,43 +1112,19 @@ public List<Message<String>> handleM(String in) throws InterruptedException {
11161112 }
11171113
11181114 @ KafkaListener (id = CUSTOM_REPLY_HEADER_REQUEST , topics = CUSTOM_REPLY_HEADER_REQUEST )
1119- @ SendTo (CUSTOM_REPLY_HEADER_REPLY ) // send to custom topic back
1120- public String handleCustomReplyHeaderNoReplyPartition (ConsumerRecord <?, String > inputMessage ) {
1121- Headers headers = inputMessage .headers ();
1122-
1123- if (length (headers .headers ("X-Custom-Reply-Header" )) != 1 ) {
1124- return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once" ;
1125- }
1126-
1127- if (length (headers .headers (KafkaHeaders .REPLY_PARTITION )) != 0 ) {
1128- return "It is expected that the user does NOT specify the reply partition in this test case" ;
1129- }
1130-
1131- if (!"expected_message" .equals (inputMessage .value ())) {
1132- return "Expected message is 'expected_message', but got %s" .formatted (inputMessage .value ());
1133- }
1134-
1135- return "OK" ;
1115+ @ SendTo (CUSTOM_REPLY_HEADER_REPLY )
1116+ public Message <String > handleCustomReplyHeaderNoReplyPartition (ConsumerRecord <?, String > inputMessage ) {
1117+ return MessageBuilder .withPayload ("OK" )
1118+ .setHeader ("originalPayload" , inputMessage .value ())
1119+ .build ();
11361120 }
11371121
11381122 @ KafkaListener (id = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST , topics = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST )
1139- @ SendTo (CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY ) // send to custom topic back
1140- public String handleCustomReplyHeaderDefaultPartitionHeader (ConsumerRecord <?, String > inputMessage ) {
1141- Headers headers = inputMessage .headers ();
1142-
1143- if (length (headers .headers ("X-Custom-Reply-Header" )) != 1 ) {
1144- return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once" ;
1145- }
1146-
1147- if (length (headers .headers ("X-Custom-Reply-Partition" )) != 1 ) {
1148- return "Executed a single reply partition header '%s' in the incoming message" .formatted (KafkaHeaders .REPLY_PARTITION );
1149- }
1150-
1151- if (!"expected_message" .equals (inputMessage .value ())) {
1152- return "Expected message is 'expected_message', but got %s" .formatted (inputMessage .value ());
1153- }
1154-
1155- return "OK" ;
1123+ @ SendTo (CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY )
1124+ public Message <String > handleCustomReplyHeaderDefaultPartitionHeader (ConsumerRecord <?, String > inputMessage ) {
1125+ return MessageBuilder .withPayload ("OK" )
1126+ .setHeader ("originalPayload" , inputMessage .value ())
1127+ .build ();
11561128 }
11571129 }
11581130
0 commit comments