Skip to content

Commit 7d7c273

Browse files
authored
GH-3241: MetadataStoreSelector - compare old/new (#3242)
Resolves: #3241 * Docs and XML namespace support.
1 parent 1f2a334 commit 7d7c273

File tree

10 files changed

+244
-20
lines changed

10 files changed

+244
-20
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/xml/IdempotentReceiverInterceptorParser.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ protected AbstractBeanDefinition parseInternal(Element element, ParserContext pa
5959
boolean hasValueStrategy = StringUtils.hasText(valueStrategy);
6060
String valueExpression = element.getAttribute("value-expression");
6161
boolean hasValueExpression = StringUtils.hasText(valueExpression);
62+
String compareValues = element.getAttribute("compare-values");
63+
boolean hasCompareValues = StringUtils.hasText(compareValues);
6264

6365
String endpoints = element.getAttribute("endpoint");
6466

@@ -68,10 +70,10 @@ protected AbstractBeanDefinition parseInternal(Element element, ParserContext pa
6870
}
6971

7072
if (hasSelector && (hasStore || hasKeyStrategy || hasKeyExpression || hasValueStrategy // NOSONAR complexity
71-
|| hasValueExpression)) {
73+
|| hasValueExpression || hasCompareValues)) {
7274
parserContext.getReaderContext().error("The 'selector' attribute is mutually exclusive with " +
73-
"'metadata-store', 'key-strategy', 'key-expression', 'value-strategy' " +
74-
"or 'value-expression'", source);
75+
"'metadata-store', 'key-strategy', 'key-expression', 'value-strategy', " +
76+
"'value-expression', and 'compare-values'", source);
7577
}
7678

7779
if (hasKeyStrategy && hasKeyExpression) {
@@ -133,6 +135,7 @@ else if (hasValueExpression) {
133135
else {
134136
selectorBuilder.addConstructorArgValue(new RootBeanDefinition(SimpleMetadataStore.class));
135137
}
138+
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(selectorBuilder, element, "compare-values");
136139
selectorBeanDefinition = selectorBuilder.getBeanDefinition();
137140
}
138141

spring-integration-core/src/main/java/org/springframework/integration/selector/MetadataStoreSelector.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,10 +16,13 @@
1616

1717
package org.springframework.integration.selector;
1818

19+
import java.util.function.BiPredicate;
20+
1921
import org.springframework.integration.core.MessageSelector;
2022
import org.springframework.integration.handler.MessageProcessor;
2123
import org.springframework.integration.metadata.ConcurrentMetadataStore;
2224
import org.springframework.integration.metadata.SimpleMetadataStore;
25+
import org.springframework.lang.Nullable;
2326
import org.springframework.messaging.Message;
2427
import org.springframework.util.Assert;
2528

@@ -58,6 +61,9 @@ public class MetadataStoreSelector implements MessageSelector {
5861

5962
private final MessageProcessor<String> valueStrategy;
6063

64+
@Nullable
65+
private BiPredicate<String, String> compareValues;
66+
6167
public MetadataStoreSelector(MessageProcessor<String> keyStrategy) {
6268
this(keyStrategy, (MessageProcessor<String>) null);
6369
}
@@ -79,6 +85,26 @@ public MetadataStoreSelector(MessageProcessor<String> keyStrategy, MessageProces
7985
this.valueStrategy = valueStrategy;
8086
}
8187

88+
/**
89+
* Set a {@link BiPredicate} to compare old and new values in the metadata store for
90+
* the key. The first parameter is the old value; return true if we should accept this
91+
* message and replace the old value with the new value.
92+
* @param compareValues the {@link BiPredicate}.
93+
* @since 5.3
94+
*/
95+
public void setCompareValues(@Nullable BiPredicate<String, String> compareValues) {
96+
this.compareValues = compareValues;
97+
}
98+
99+
/**
100+
* Fluent version of {@link #setCompareValues(BiPredicate)}.
101+
* @param compareValues the {@link BiPredicate}.
102+
* @return this.
103+
*/
104+
public MetadataStoreSelector compareValues(@Nullable BiPredicate<String, String> compareValues) {
105+
setCompareValues(compareValues);
106+
return this;
107+
}
82108

83109
@Override
84110
public boolean accept(Message<?> message) {
@@ -88,7 +114,21 @@ public boolean accept(Message<?> message) {
88114
? this.valueStrategy.processMessage(message)
89115
: (timestamp == null ? "0" : Long.toString(timestamp));
90116

91-
return this.metadataStore.putIfAbsent(key, value) == null;
117+
if (this.compareValues == null) {
118+
return this.metadataStore.putIfAbsent(key, value) == null;
119+
}
120+
else {
121+
synchronized (this) {
122+
String oldValue = this.metadataStore.get(key);
123+
if (oldValue == null) {
124+
return this.metadataStore.putIfAbsent(key, value) == null;
125+
}
126+
if (this.compareValues.test(oldValue, value)) {
127+
return this.metadataStore.replace(key, oldValue, value);
128+
}
129+
return false;
130+
}
131+
}
92132
}
93133

94134
}

spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration.xsd

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4853,6 +4853,19 @@ The list of component name patterns you want to track (e.g., tracked-components
48534853
]]></xsd:documentation>
48544854
</xsd:annotation>
48554855
</xsd:attribute>
4856+
<xsd:attribute name="compare-values">
4857+
<xsd:annotation>
4858+
<xsd:appinfo>
4859+
<tool:annotation kind="ref">
4860+
<tool:expected-type type="java.util.function.BiPredicate"/>
4861+
</tool:annotation>
4862+
</xsd:appinfo>
4863+
<xsd:documentation><![CDATA[
4864+
A 'BiPredicate<String, String>' which is called if a value exists to determine whether the message
4865+
should be accepted and the old value replaced with the new value in the metadata store.
4866+
]]></xsd:documentation>
4867+
</xsd:annotation>
4868+
</xsd:attribute>
48564869
<xsd:attribute name="discard-channel" type="xsd:string">
48574870
<xsd:annotation>
48584871
<xsd:appinfo>

spring-integration-core/src/test/java/org/springframework/integration/config/xml/IdempotentReceiverParserTests-context.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@
2727
endpoint="foo"
2828
key-strategy="keyStrategy"
2929
value-strategy="valueStrategy"
30+
compare-values="valueComparator"
3031
discard-channel="nullChannel"
3132
throw-exception-on-rejection="true"/>
3233

34+
<beans:bean id="valueComparator"
35+
class="org.springframework.integration.config.xml.IdempotentReceiverParserTests$AlwaysAccept"/>
36+
3337
<beans:bean id="store" class="org.springframework.integration.metadata.SimpleMetadataStore"/>
3438

3539
<util:properties id="properties">

spring-integration-core/src/test/java/org/springframework/integration/config/xml/IdempotentReceiverParserTests.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Properties;
27+
import java.util.function.BiPredicate;
2728

2829
import org.junit.Test;
2930
import org.junit.runner.RunWith;
@@ -77,6 +78,9 @@ public class IdempotentReceiverParserTests {
7778
@Autowired
7879
private MessageProcessor<String> valueStrategy;
7980

81+
@Autowired
82+
private AlwaysAccept alwaysAccept;
83+
8084
@Autowired
8185
@Qualifier("nullChannel")
8286
private MessageChannel nullChannel;
@@ -94,8 +98,8 @@ public void testSelectorInterceptor() {
9498
assertThat(getPropertyValue(this.selectorInterceptor, "throwExceptionOnRejection", Boolean.class)).isFalse();
9599
@SuppressWarnings("unchecked")
96100
Map<String, List<String>> idempotentEndpoints =
97-
(Map<String, List<String>>) getPropertyValue(this.idempotentReceiverAutoProxyCreator,
98-
"idempotentEndpoints", Map.class);
101+
getPropertyValue(this.idempotentReceiverAutoProxyCreator,
102+
"idempotentEndpoints", Map.class);
99103
List<String> endpoints = idempotentEndpoints.get("selectorInterceptor");
100104
assertThat(endpoints).isNotNull();
101105
assertThat(endpoints.isEmpty()).isFalse();
@@ -110,10 +114,11 @@ public void testStrategyInterceptor() {
110114
assertThat(messageSelector).isInstanceOf(MetadataStoreSelector.class);
111115
assertThat(getPropertyValue(messageSelector, "keyStrategy")).isSameAs(this.keyStrategy);
112116
assertThat(getPropertyValue(messageSelector, "valueStrategy")).isSameAs(this.valueStrategy);
117+
assertThat(getPropertyValue(messageSelector, "compareValues")).isSameAs(this.alwaysAccept);
113118
@SuppressWarnings("unchecked")
114119
Map<String, List<String>> idempotentEndpoints =
115-
(Map<String, List<String>>) getPropertyValue(this.idempotentReceiverAutoProxyCreator,
116-
"idempotentEndpoints", Map.class);
120+
getPropertyValue(this.idempotentReceiverAutoProxyCreator,
121+
"idempotentEndpoints", Map.class);
117122
List<String> endpoints = idempotentEndpoints.get("strategyInterceptor");
118123
assertThat(endpoints).isNotNull();
119124
assertThat(endpoints.isEmpty()).isFalse();
@@ -130,8 +135,8 @@ public void testExpressionInterceptor() {
130135
assertThat(keyStrategy.toString()).contains("headers.foo");
131136
@SuppressWarnings("unchecked")
132137
Map<String, List<String>> idempotentEndpoints =
133-
(Map<String, List<String>>) getPropertyValue(this.idempotentReceiverAutoProxyCreator,
134-
"idempotentEndpoints", Map.class);
138+
getPropertyValue(this.idempotentReceiverAutoProxyCreator,
139+
"idempotentEndpoints", Map.class);
135140
List<String> endpoints = idempotentEndpoints.get("expressionInterceptor");
136141
assertThat(endpoints).isNotNull();
137142
assertThat(endpoints.isEmpty()).isFalse();
@@ -172,7 +177,8 @@ public void testSelectorAndStore() throws Exception {
172177
catch (BeanDefinitionParsingException e) {
173178
assertThat(e.getMessage())
174179
.contains("The 'selector' attribute is mutually exclusive with 'metadata-store', " +
175-
"'key-strategy', 'key-expression', 'value-strategy' or 'value-expression'");
180+
"'key-strategy', 'key-expression', 'value-strategy', 'value-expression', and "
181+
+ "'compare-values'");
176182
}
177183
}
178184

@@ -185,7 +191,8 @@ public void testSelectorAndKeyStrategy() throws Exception {
185191
catch (BeanDefinitionParsingException e) {
186192
assertThat(e.getMessage())
187193
.contains("The 'selector' attribute is mutually exclusive with 'metadata-store', " +
188-
"'key-strategy', 'key-expression', 'value-strategy' or 'value-expression'");
194+
"'key-strategy', 'key-expression', 'value-strategy', 'value-expression', and "
195+
+ "'compare-values'");
189196
}
190197
}
191198

@@ -198,7 +205,8 @@ public void testSelectorAndKeyExpression() throws Exception {
198205
catch (BeanDefinitionParsingException e) {
199206
assertThat(e.getMessage())
200207
.contains("The 'selector' attribute is mutually exclusive with 'metadata-store', " +
201-
"'key-strategy', 'key-expression', 'value-strategy' or 'value-expression'");
208+
"'key-strategy', 'key-expression', 'value-strategy', 'value-expression', and "
209+
+ "'compare-values'");
202210
}
203211
}
204212

@@ -211,7 +219,8 @@ public void testSelectorAndValueStrategy() throws Exception {
211219
catch (BeanDefinitionParsingException e) {
212220
assertThat(e.getMessage())
213221
.contains("The 'selector' attribute is mutually exclusive with 'metadata-store', " +
214-
"'key-strategy', 'key-expression', 'value-strategy' or 'value-expression'");
222+
"'key-strategy', 'key-expression', 'value-strategy', 'value-expression', and "
223+
+ "'compare-values'");
215224
}
216225
}
217226

@@ -224,7 +233,8 @@ public void testSelectorAndValueExpression() throws Exception {
224233
catch (BeanDefinitionParsingException e) {
225234
assertThat(e.getMessage())
226235
.contains("The 'selector' attribute is mutually exclusive with 'metadata-store', " +
227-
"'key-strategy', 'key-expression', 'value-strategy' or 'value-expression'");
236+
"'key-strategy', 'key-expression', 'value-strategy', 'value-expression', and "
237+
+ "'compare-values'");
228238
}
229239
}
230240

@@ -268,4 +278,13 @@ private ApplicationContext bootStrap(String configProperty) throws Exception {
268278
return ac;
269279
}
270280

281+
public static class AlwaysAccept implements BiPredicate<String, String> {
282+
283+
@Override
284+
public boolean test(String t, String u) {
285+
return true;
286+
}
287+
288+
}
289+
271290
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.selector;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import org.springframework.integration.metadata.SimpleMetadataStore;
24+
import org.springframework.messaging.support.MessageBuilder;
25+
26+
/**
27+
* @author Gary Russell
28+
* @since 5.3
29+
*
30+
*/
31+
public class MetadataStoreSelectorTests {
32+
33+
@Test
34+
void lineNumbers() {
35+
SimpleMetadataStore store = new SimpleMetadataStore();
36+
store.put("file", "5");
37+
MetadataStoreSelector selector = new MetadataStoreSelector(
38+
msg -> "file", msg -> msg.getHeaders().get("lineNum").toString(), store);
39+
selector.setCompareValues((oldValue, newValue) -> Integer.parseInt(oldValue) < Integer.parseInt(newValue));
40+
for (int i = 0; i < 6; i++) {
41+
assertThat(selector.accept(MessageBuilder.withPayload("foo").setHeader("lineNum", i).build()))
42+
.isEqualTo(Boolean.FALSE);
43+
assertThat(store.get("file")).isEqualTo("5");
44+
}
45+
assertThat(selector.accept(MessageBuilder.withPayload("foo").setHeader("lineNum", 6).build()))
46+
.isEqualTo(Boolean.TRUE);
47+
assertThat(store.get("file")).isEqualTo("6");
48+
}
49+
50+
}

0 commit comments

Comments
 (0)