Skip to content

Commit fac9eb1

Browse files
committed
Customizing batch message conversion behavior for Kafka binder
Continuation of the previous commit: 14c1046
1 parent 14c1046 commit fac9eb1

File tree

5 files changed

+108
-137
lines changed

5 files changed

+108
-137
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
<dependency>
4646
<groupId>org.springframework.kafka</groupId>
4747
<artifactId>spring-kafka</artifactId>
48+
<version>3.3.0-SNAPSHOT</version>
4849
</dependency>
4950
<dependency>
5051
<groupId>org.springframework.boot</groupId>

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/DefaultMessageConverterHelper.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,18 @@
1616

1717
package org.springframework.cloud.stream.binder.kafka.config;
1818

19-
import java.util.concurrent.atomic.AtomicInteger;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Set;
2022

2123
import org.springframework.cloud.function.context.config.MessageConverterHelper;
24+
import org.springframework.kafka.support.KafkaHeaders;
2225
import org.springframework.messaging.Message;
26+
import org.springframework.messaging.MessageHeaders;
2327

2428
/**
2529
* @author Oleg Zhurakousky
30+
* @author Soby Chacko
2631
*/
2732
public class DefaultMessageConverterHelper implements MessageConverterHelper {
2833

@@ -32,10 +37,22 @@ public boolean shouldFailIfCantConvert(Message<?> message) {
3237
}
3338

3439
public void postProcessBatchMessageOnFailure(Message<?> message, int index) {
35-
AtomicInteger deliveryAttempt = (AtomicInteger) message.getHeaders().get("deliveryAttempt");
36-
// if (message.getHeaders().containsKey("amqp_batchedHeaders") && deliveryAttempt != null && deliveryAttempt.get() == 1) {
37-
// ArrayList<?> list = (ArrayList<?>) message.getHeaders().get("amqp_batchedHeaders");
38-
// list.remove(index);
39-
// }
40+
MessageHeaders headers = message.getHeaders();
41+
Set<String> headerKeySet = headers.keySet();
42+
List<String> matchingHeaderKeys = new ArrayList<>();
43+
44+
for (String string : headerKeySet) {
45+
if (string.startsWith(KafkaHeaders.PREFIX)) {
46+
matchingHeaderKeys.add(string);
47+
}
48+
}
49+
for (String matchingHeaderKey : matchingHeaderKeys) {
50+
Object matchingHeaderValue = message.getHeaders().get(matchingHeaderKey);
51+
if (matchingHeaderValue instanceof ArrayList<?> list) {
52+
if (!list.isEmpty()) {
53+
list.remove(index);
54+
}
55+
}
56+
}
4057
}
4158
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2019-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka.config;
18+
19+
import org.springframework.cloud.function.context.config.MessageConverterHelper;
20+
import org.springframework.context.annotation.Bean;
21+
import org.springframework.context.annotation.Configuration;
22+
23+
/**
24+
* @author Oleg Zhurakousky
25+
* @author Soby Chacko
26+
*/
27+
@Configuration(proxyBeanMethods = false)
28+
public class MessageConverterHelperConfiguration {
29+
30+
@Bean
31+
public MessageConverterHelper messageConverterHelper() {
32+
return new DefaultMessageConverterHelper();
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
org.springframework.cloud.stream.binder.kafka.config.ExtendedBindingHandlerMappingsProviderConfiguration
2+
org.springframework.cloud.stream.binder.kafka.config.MessageConverterHelperConfiguration
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2024 the original author or authors.
2+
* Copyright 2024-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,163 +18,81 @@
1818

1919
import java.nio.charset.StandardCharsets;
2020
import java.util.ArrayList;
21-
import java.util.LinkedHashMap;
2221
import java.util.List;
23-
import java.util.Map;
24-
import java.util.concurrent.atomic.AtomicInteger;
25-
import java.util.function.Function;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.function.Consumer;
2625

2726
import org.junit.jupiter.api.Test;
2827

28+
import org.springframework.beans.factory.annotation.Autowired;
2929
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
30-
import org.springframework.cloud.function.context.config.MessageConverterHelper;
31-
import org.springframework.cloud.function.json.JacksonMapper;
32-
import org.springframework.cloud.stream.binder.test.InputDestination;
33-
import org.springframework.cloud.stream.binder.test.OutputDestination;
34-
import org.springframework.cloud.stream.binder.test.TestChannelBinder;
35-
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
30+
import org.springframework.boot.test.context.SpringBootTest;
31+
import org.springframework.cloud.stream.function.StreamBridge;
3632
import org.springframework.context.annotation.Bean;
3733
import org.springframework.context.annotation.Configuration;
38-
import org.springframework.integration.support.MessageBuilder;
34+
import org.springframework.kafka.test.context.EmbeddedKafka;
3935
import org.springframework.messaging.Message;
40-
import org.springframework.messaging.MessageHandlingException;
41-
import org.springframework.messaging.converter.MessageConversionException;
36+
import org.springframework.test.annotation.DirtiesContext;
37+
import org.springframework.util.Assert;
4238

4339
import static org.assertj.core.api.Assertions.assertThat;
4440

4541
/**
46-
*
42+
* @author Soby Chacko
4743
*/
44+
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
45+
"spring.cloud.function.definition=batchConsumer",
46+
"spring.cloud.stream.bindings.batchConsumer-in-0.consumer.batch-mode=true",
47+
"spring.cloud.stream.bindings.batchConsumer-in-0.destination=cfrthp-topic",
48+
"spring.cloud.stream.bindings.batchConsumer-in-0.group=cfrthp-group"
49+
})
50+
@EmbeddedKafka
51+
@DirtiesContext
4852
public class FunctionBatchingConversionTests {
4953

50-
@SuppressWarnings("unchecked")
51-
// @Test
52-
void testBatchHeadersMatchingPayload() {
53-
TestChannelBinderConfiguration.applicationContextRunner(BatchFunctionConfiguration.class)
54-
.withPropertyValues("spring.cloud.stream.function.definition=func",
55-
"spring.cloud.stream.bindings.func-in-0.consumer.batch-mode=true",
56-
"spring.cloud.stream.rabbit.bindings.func-in-0.consumer.enable-batching=true")
57-
.run(context -> {
58-
InputDestination inputDestination = context.getBean(InputDestination.class);
59-
OutputDestination outputDestination = context.getBean(OutputDestination.class);
60-
61-
List<byte[]> payloads = List.of("hello".getBytes(StandardCharsets.UTF_8),
62-
"{\"name\":\"Ricky\"}".getBytes(StandardCharsets.UTF_8),
63-
"{\"name\":\"Julien\"}".getBytes(StandardCharsets.UTF_8),
64-
"{\"name\":\"Bubbles\"}".getBytes(StandardCharsets.UTF_8),
65-
"hello".getBytes(StandardCharsets.UTF_8));
66-
List<Map<String, String>> amqpBatchHeaders = new ArrayList<>();
67-
for (int i = 0; i < 5; i++) {
68-
Map<String, String> batchHeaders = new LinkedHashMap<>();
69-
batchHeaders.put("amqp_receivedDeliveryMode", "PERSISTENT");
70-
batchHeaders.put("index", String.valueOf(i));
71-
amqpBatchHeaders.add(batchHeaders);
72-
}
73-
74-
var message = MessageBuilder.withPayload(payloads)
75-
.setHeader("amqp_batchedHeaders", amqpBatchHeaders)
76-
.setHeader("deliveryAttempt", new AtomicInteger(1)).build();
77-
inputDestination.send(message);
78-
79-
Message<byte[]> resultMessage = outputDestination.receive();
80-
JacksonMapper mapper = context.getBean(JacksonMapper.class);
81-
List<?> resultPayloads = mapper.fromJson(resultMessage.getPayload(), List.class);
82-
assertThat(resultPayloads).hasSize(3);
83-
84-
List<Map<String, String>> amqpBatchedHeaders = (List<Map<String, String>>) resultMessage
85-
.getHeaders().get("amqp_batchedHeaders");
86-
assertThat(amqpBatchedHeaders).hasSize(resultPayloads.size());
87-
assertThat(amqpBatchedHeaders.get(0).get("index")).isEqualTo("1");
88-
assertThat(amqpBatchedHeaders.get(1).get("index")).isEqualTo("2");
89-
assertThat(amqpBatchedHeaders.get(2).get("index")).isEqualTo("3");
90-
91-
context.stop();
92-
});
93-
}
54+
@Autowired
55+
private StreamBridge streamBridge;
9456

95-
// @Test
96-
void testBatchHeadersForcingFatalFailureOnConversiionException() {
97-
TestChannelBinderConfiguration
98-
.applicationContextRunner(BatchFunctionConfigurationWithAdditionalConversionHelper.class)
99-
.withPropertyValues("spring.cloud.stream.function.definition=func",
100-
"spring.cloud.stream.bindings.func-in-0.consumer.batch-mode=true",
101-
"spring.cloud.stream.bindings.func-in-0.consumer.max-attempts=1",
102-
"spring.cloud.stream.rabbit.bindings.func-in-0.consumer.enable-batching=true")
103-
.run(context -> {
104-
InputDestination inputDestination = context.getBean(InputDestination.class);
105-
106-
List<byte[]> payloads = List.of("hello".getBytes(StandardCharsets.UTF_8),
107-
"{\"name\":\"Ricky\"}".getBytes(StandardCharsets.UTF_8),
108-
"{\"name\":\"Julien\"}".getBytes(StandardCharsets.UTF_8),
109-
"{\"name\":\"Bubbles\"}".getBytes(StandardCharsets.UTF_8),
110-
"hello".getBytes(StandardCharsets.UTF_8));
111-
List<Map<String, String>> amqpBatchHeaders = new ArrayList<>();
112-
for (int i = 0; i < 5; i++) {
113-
Map<String, String> batchHeaders = new LinkedHashMap<>();
114-
batchHeaders.put("amqp_receivedDeliveryMode", "PERSISTENT");
115-
batchHeaders.put("index", String.valueOf(i));
116-
amqpBatchHeaders.add(batchHeaders);
117-
}
118-
119-
var message = MessageBuilder.withPayload(payloads)
120-
.setHeader("amqp_batchedHeaders", amqpBatchHeaders)
121-
.setHeader("deliveryAttempt", new AtomicInteger(1)).build();
122-
inputDestination.send(message);
123-
TestChannelBinder binder = context.getBean(TestChannelBinder.class);
124-
assertThat(binder.getLastError().getPayload()).isInstanceOf(MessageHandlingException.class);
125-
MessageHandlingException exception = (MessageHandlingException) binder.getLastError().getPayload();
126-
assertThat(exception.getCause()).isInstanceOf(MessageConversionException.class);
127-
128-
context.stop();
129-
});
130-
}
57+
static CountDownLatch latch = new CountDownLatch(3);
13158

132-
@Configuration
133-
@EnableAutoConfiguration
134-
public static class BatchFunctionConfiguration {
135-
@Bean
136-
public Function<Message<List<Person>>, Message<List<Person>>> func() {
137-
return x -> {
138-
return x;
139-
};
140-
}
59+
static List<Person> persons = new ArrayList<>();
60+
61+
@Test
62+
void conversionFailuresRemoveTheHeadersProperly() throws Exception {
63+
streamBridge.send("cfrthp-topic", "hello".getBytes(StandardCharsets.UTF_8));
64+
streamBridge.send("cfrthp-topic", "hello".getBytes(StandardCharsets.UTF_8));
65+
streamBridge.send("cfrthp-topic", "{\"name\":\"Ricky\"}".getBytes(StandardCharsets.UTF_8));
66+
streamBridge.send("cfrthp-topic", "{\"name\":\"Julian\"}".getBytes(StandardCharsets.UTF_8));
67+
streamBridge.send("cfrthp-topic", "{\"name\":\"Bubbles\"}".getBytes(StandardCharsets.UTF_8));
68+
69+
Assert.isTrue(latch.await(10, TimeUnit.SECONDS), "Failed to receive message");
70+
71+
assertThat(persons.size()).isEqualTo(3);
72+
assertThat(persons.get(0).toString().contains("Ricky")).isTrue();
73+
assertThat(persons.get(1).toString().contains("Julian")).isTrue();
74+
assertThat(persons.get(2).toString().contains("Bubbles")).isTrue();
14175
}
14276

143-
@Configuration
14477
@EnableAutoConfiguration
145-
public static class BatchFunctionConfigurationWithAdditionalConversionHelper {
78+
@Configuration
79+
public static class Config {
14680

14781
@Bean
148-
public MessageConverterHelper helper() {
149-
return new MessageConverterHelper() {
150-
public boolean shouldFailIfCantConvert(Message<?> message) {
151-
return true;
82+
Consumer<Message<List<Person>>> batchConsumer() {
83+
return message -> {
84+
if (!message.getPayload().isEmpty()) {
85+
message.getPayload().forEach(c -> {
86+
persons.add(c);
87+
latch.countDown();
88+
});
15289
}
15390
};
15491
}
15592

156-
@Bean
157-
public Function<Message<List<Person>>, Message<List<Person>>> func() {
158-
return x -> {
159-
return x;
160-
};
161-
}
16293
}
16394

164-
static class Person {
165-
166-
private String name;
167-
168-
public String getName() {
169-
return name;
170-
}
171-
172-
public void setName(String name) {
173-
this.name = name;
174-
}
175-
176-
public String toString() {
177-
return "name: " + name;
178-
}
95+
record Person(String name) {
17996
}
97+
18098
}

0 commit comments

Comments
 (0)