Skip to content

Commit e63edde

Browse files
artembilangaryrussell
authored andcommitted
GH-3966: Kafka XML config: Expose more attributes
Fixes #3966 Exposed setters in the `KafkaInboundGateway` and `KafkaMessageDrivenChannelAdapter` as an XML attributes for `kafka` namespace **Cherry-pick to `5.5.x`**
1 parent 280eb29 commit e63edde

File tree

7 files changed

+128
-20
lines changed

7 files changed

+128
-20
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/config/xml/KafkaInboundGatewayParser.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727
* Inbound gateway parser.
2828
*
2929
* @author Gary Russell
30+
* @author Artem Bilan
3031
*
3132
* @since 5.4
3233
*
@@ -48,6 +49,9 @@ protected void doPostProcess(BeanDefinitionBuilder builder, Element element) {
4849
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-message-strategy");
4950
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "retry-template");
5051
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "recovery-callback");
52+
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element,
53+
"on-partitions-assigned-seek-callback");
54+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "bind-source-record");
5155
}
5256

5357
@Override

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParser.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -59,6 +59,12 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
5959
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-message-strategy");
6060
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "retry-template");
6161
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "recovery-callback");
62+
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "record-filter-strategy");
63+
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element,
64+
"on-partitions-assigned-seek-callback");
65+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "ack-discarded");
66+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "filter-in-retry");
67+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "bind-source-record");
6268

6369
return builder.getBeanDefinition();
6470
}

spring-integration-kafka/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka.xsd

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,46 @@
373373
</xsd:documentation>
374374
</xsd:annotation>
375375
</xsd:attribute>
376+
<xsd:attribute name="record-filter-strategy" type="xsd:string">
377+
<xsd:annotation>
378+
<xsd:documentation>
379+
Wrap a 'KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener'
380+
into 'FilteringMessageListenerAdapter' with the provided 'RecordFilterStrategy'.
381+
</xsd:documentation>
382+
<xsd:appinfo>
383+
<tool:annotation kind="ref">
384+
<tool:expected-type type="org.springframework.kafka.listener.adapter.RecordFilterStrategy" />
385+
</tool:annotation>
386+
</xsd:appinfo>
387+
</xsd:annotation>
388+
</xsd:attribute>
389+
<xsd:attribute name="ack-discarded">
390+
<xsd:annotation>
391+
<xsd:documentation>
392+
A boolean flag to indicate if 'FilteringMessageListenerAdapter
393+
should acknowledge discarded records or not.
394+
Does not make sense if 'record-filter-strategy' isn't specified.
395+
</xsd:documentation>
396+
</xsd:annotation>
397+
<xsd:simpleType>
398+
<xsd:union memberTypes="xsd:boolean xsd:string"/>
399+
</xsd:simpleType>
400+
</xsd:attribute>
401+
<xsd:attribute name="filter-in-retry">
402+
<xsd:annotation>
403+
<xsd:documentation>
404+
A boolean flag to specify the order in which the filter and retry
405+
operations are performed.
406+
Does not make sense if only one of 'retry-template' or
407+
'record-filter-strategy' is present, or none.
408+
When true, the filter is called for each retry; when false, the filter is only
409+
called once for each delivery from the container.
410+
</xsd:documentation>
411+
</xsd:annotation>
412+
<xsd:simpleType>
413+
<xsd:union memberTypes="xsd:boolean xsd:string"/>
414+
</xsd:simpleType>
415+
</xsd:attribute>
376416
</xsd:extension>
377417
</xsd:complexContent>
378418
</xsd:complexType>
@@ -725,6 +765,30 @@
725765
</xsd:appinfo>
726766
</xsd:annotation>
727767
</xsd:attribute>
768+
<xsd:attribute name="on-partitions-assigned-seek-callback" type="xsd:string">
769+
<xsd:annotation>
770+
<xsd:documentation><![CDATA[
771+
A 'BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback>'
772+
bean reference for seeks management.
773+
]]></xsd:documentation>
774+
<xsd:appinfo>
775+
<tool:annotation kind="ref">
776+
<tool:expected-type type="java.util.function.BiConsumer" />
777+
</tool:annotation>
778+
</xsd:appinfo>
779+
</xsd:annotation>
780+
</xsd:attribute>
781+
<xsd:attribute name="bind-source-record">
782+
<xsd:annotation>
783+
<xsd:documentation>
784+
Set to true to bind the source consumer record in the header named
785+
'IntegrationMessageHeaderAccessor#SOURCE_DATA'.
786+
</xsd:documentation>
787+
</xsd:annotation>
788+
<xsd:simpleType>
789+
<xsd:union memberTypes="xsd:boolean xsd:string"/>
790+
</xsd:simpleType>
791+
</xsd:attribute>
728792
<xsd:attributeGroup ref="errorMessageStrategyGroup" />
729793
</xsd:complexType>
730794

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaInboundGatewayTests-context.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
payload-type="java.lang.String"
2020
error-message-strategy="ems"
2121
retry-template="retryTemplate"
22-
recovery-callback="recoveryCallback"/>
22+
recovery-callback="recoveryCallback"
23+
bind-source-record="true"
24+
on-partitions-assigned-seek-callback="onPartitionsAssignedSeekCallback"/>
2325

2426
<bean id="template" class="org.mockito.Mockito" factory-method="mock">
2527
<constructor-arg value="org.springframework.kafka.core.KafkaTemplate"/>
@@ -44,6 +46,10 @@
4446
</constructor-arg>
4547
</bean>
4648

49+
<bean id="onPartitionsAssignedSeekCallback" class="org.mockito.Mockito" factory-method="mock">
50+
<constructor-arg value="java.util.function.BiConsumer"/>
51+
</bean>
52+
4753
<bean id="ems" class="org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy"/>
4854

4955
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"/>

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaInboundGatewayTests.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
/**
3232
* @author Gary Russell
33+
* @author Artem Bilan
3334
*
3435
* @since 5.4
3536
*
@@ -58,13 +59,16 @@ public void testProps() {
5859
assertThat(TestUtils.getPropertyValue(this.gateway1, "listener.fallbackType"))
5960
.isEqualTo(String.class);
6061
assertThat(TestUtils.getPropertyValue(this.gateway1, "errorMessageStrategy"))
61-
.isSameAs(this.context.getBean("ems"));
62+
.isSameAs(this.context.getBean("ems"));
6263
assertThat(TestUtils.getPropertyValue(this.gateway1, "retryTemplate"))
63-
.isSameAs(this.context.getBean("retryTemplate"));
64+
.isSameAs(this.context.getBean("retryTemplate"));
6465
assertThat(TestUtils.getPropertyValue(this.gateway1, "recoveryCallback"))
65-
.isSameAs(this.context.getBean("recoveryCallback"));
66+
.isSameAs(this.context.getBean("recoveryCallback"));
67+
assertThat(TestUtils.getPropertyValue(this.gateway1, "onPartitionsAssignedSeekCallback"))
68+
.isSameAs(this.context.getBean("onPartitionsAssignedSeekCallback"));
6669
assertThat(TestUtils.getPropertyValue(this.gateway1, "messagingTemplate.sendTimeout")).isEqualTo(5000L);
6770
assertThat(TestUtils.getPropertyValue(this.gateway1, "replyTimeout")).isEqualTo(43L);
71+
assertThat(TestUtils.getPropertyValue(this.gateway1, "bindSourceRecord", Boolean.class)).isTrue();
6872
}
6973

7074
}
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<beans xmlns="http://www.springframework.org/schema/beans"
3-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4-
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
5-
xmlns:context="http://www.springframework.org/schema/context"
6-
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
5+
xmlns:context="http://www.springframework.org/schema/context"
6+
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
77
http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd
88
http://www.springframework.org/schema/integration/kafka https://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
99

@@ -20,7 +20,12 @@
2020
payload-type="java.lang.String"
2121
error-message-strategy="ems"
2222
retry-template="retryTemplate"
23-
recovery-callback="recoveryCallback" />
23+
recovery-callback="recoveryCallback"
24+
bind-source-record="true"
25+
on-partitions-assigned-seek-callback="onPartitionsAssignedSeekCallback"
26+
filter-in-retry="true"
27+
ack-discarded="true"
28+
record-filter-strategy="recordFilterStrategy"/>
2429

2530
<int-kafka:message-driven-channel-adapter
2631
id="kafkaBatchListener"
@@ -31,7 +36,7 @@
3136
channel="nullChannel"
3237
mode="batch"
3338
message-converter="messageConverter"
34-
error-channel="errorChannel" />
39+
error-channel="errorChannel"/>
3540

3641
<bean id="messageConverter" class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
3742

@@ -40,39 +45,47 @@
4045
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
4146
<constructor-arg>
4247
<map>
43-
<entry key="" value="" />
48+
<entry key="" value=""/>
4449
</map>
4550
</constructor-arg>
4651
</bean>
4752
</constructor-arg>
4853
<constructor-arg>
4954
<bean class="org.springframework.kafka.listener.ContainerProperties">
50-
<constructor-arg name="topics" value="foo" />
55+
<constructor-arg name="topics" value="foo"/>
5156
</bean>
5257
</constructor-arg>
5358
</bean>
5459

60+
<bean id="onPartitionsAssignedSeekCallback" class="org.mockito.Mockito" factory-method="mock">
61+
<constructor-arg value="java.util.function.BiConsumer"/>
62+
</bean>
63+
64+
<bean id="recordFilterStrategy" class="org.mockito.Mockito" factory-method="mock">
65+
<constructor-arg value="org.springframework.kafka.listener.adapter.RecordFilterStrategy"/>
66+
</bean>
67+
5568
<bean id="container2" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
5669
<constructor-arg>
5770
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
5871
<constructor-arg>
5972
<map>
60-
<entry key="" value="" />
73+
<entry key="" value=""/>
6174
</map>
6275
</constructor-arg>
6376
</bean>
6477
</constructor-arg>
6578
<constructor-arg>
6679
<bean class="org.springframework.kafka.listener.ContainerProperties">
67-
<constructor-arg name="topics" value="foo" />
80+
<constructor-arg name="topics" value="foo"/>
6881
</bean>
6982
</constructor-arg>
7083
</bean>
7184

72-
<bean id="ems" class="org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy" />
85+
<bean id="ems" class="org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy"/>
7386

74-
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate" />
87+
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"/>
7588

76-
<bean id="recoveryCallback" class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer" />
89+
<bean id="recoveryCallback" class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer"/>
7790

7891
</beans>

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.junit.jupiter.api.Test;
2525

2626
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.context.ApplicationContext;
2728
import org.springframework.integration.channel.NullChannel;
2829
import org.springframework.integration.channel.PublishSubscribeChannel;
2930
import org.springframework.integration.channel.QueueChannel;
@@ -43,7 +44,7 @@
4344
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4445

4546
/**
46-
* @author Artem Bilan.
47+
* @author Artem Bilan
4748
* @author Gary Russell
4849
*
4950
* @since 5.4
@@ -52,6 +53,9 @@
5253
@DirtiesContext
5354
class KafkaMessageDrivenChannelAdapterParserTests {
5455

56+
@Autowired
57+
private ApplicationContext context;
58+
5559
@Autowired
5660
private NullChannel nullChannel;
5761

@@ -92,6 +96,13 @@ void testKafkaMessageDrivenChannelAdapterParser() {
9296
assertThat(TestUtils.getPropertyValue(this.kafkaListener, "errorMessageStrategy")).isSameAs(this.ems);
9397
assertThat(TestUtils.getPropertyValue(this.kafkaListener, "retryTemplate")).isSameAs(this.retryTemplate);
9498
assertThat(TestUtils.getPropertyValue(this.kafkaListener, "recoveryCallback")).isSameAs(this.recoveryCallback);
99+
assertThat(TestUtils.getPropertyValue(this.kafkaListener, "onPartitionsAssignedSeekCallback"))
100+
.isSameAs(this.context.getBean("onPartitionsAssignedSeekCallback"));
101+
assertThat(TestUtils.getPropertyValue(this.kafkaListener, "bindSourceRecord", Boolean.class)).isTrue();
102+
assertThat(TestUtils.getPropertyValue(this.kafkaListener, "filterInRetry", Boolean.class)).isTrue();
103+
assertThat(TestUtils.getPropertyValue(this.kafkaListener, "ackDiscarded", Boolean.class)).isTrue();
104+
assertThat(TestUtils.getPropertyValue(this.kafkaListener, "recordFilterStrategy"))
105+
.isSameAs(this.context.getBean("recordFilterStrategy"));
95106
}
96107

97108
@Test

0 commit comments

Comments
 (0)