Skip to content

Commit b1cb906

Browse files
Add MongoDbMessageSource UPDATE option (#3493)
* Add MongoDbMessageSource UPDATE option * Extract `AbstractMongoDbMessageSource` with common options and methods for both `MongoDbMessageSource` and `ReactiveMongoDbMessageSource` * Add an `updateExpression` option into MongoDb source implementations * Implement respective `update` logic after fetching the data from the collection * Cover both reactive and blocking updates with tests * Add `MongoDbMessageSourceSpec` into Java DSL for MongoDb channel adapters * Expose an `update` XML attribute for the `<int-mongo:inbound-channel-adapter>` * Upgrade MongoDb driver for latest Spring Data compatibility * Document a new feature * Upgrade `mongodb.adoc` for code block switch whenever it is appropriate * * Add a Kotlin sample for `MongoDb.outboundGateway()` DSL * Apply suggestions from code review Co-authored-by: Gary Russell <[email protected]> Co-authored-by: Gary Russell <[email protected]>
1 parent b4290dd commit b1cb906

File tree

17 files changed

+840
-449
lines changed

17 files changed

+840
-449
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ ext {
8585
mailVersion = '1.6.5'
8686
micrometerVersion = '1.6.3'
8787
mockitoVersion = '3.7.0'
88-
mongoDriverVersion = '4.1.1'
88+
mongoDriverVersion = '4.2.0'
8989
mysqlVersion = '8.0.22'
9090
pahoMqttClientVersion = '1.2.5'
9191
postgresVersion = '42.2.18'

spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/config/MongoDbInboundChannelAdapterParser.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,11 +25,14 @@
2525
import org.springframework.integration.config.xml.AbstractPollingInboundChannelAdapterParser;
2626
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
2727
import org.springframework.integration.mongodb.inbound.MongoDbMessageSource;
28+
2829
/**
29-
* Parser for Mongodb store inbound adapters
30+
* Parser for MongoDb store inbound adapters
3031
*
3132
* @author Amol Nayak
3233
* @author Oleg Zhurakousky
34+
* @author Artem Bilan
35+
*
3336
* @since 2.2
3437
*/
3538
public class MongoDbInboundChannelAdapterParser extends AbstractPollingInboundChannelAdapterParser {
@@ -48,6 +51,11 @@ protected BeanMetadataElement parseSource(Element element, ParserContext parserC
4851

4952
builder.addConstructorArgValue(queryExpressionDef);
5053

54+
BeanDefinition expressionDef =
55+
IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("update", "update-expression",
56+
parserContext, element, false);
57+
builder.addPropertyValue("updateExpression", expressionDef);
58+
5159
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "entity-class");
5260
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "expect-single-result");
5361

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mongodb.dsl;
18+
19+
import java.util.function.Supplier;
20+
21+
import org.springframework.data.mongodb.core.convert.MongoConverter;
22+
import org.springframework.data.mongodb.core.query.Update;
23+
import org.springframework.expression.Expression;
24+
import org.springframework.expression.common.LiteralExpression;
25+
import org.springframework.integration.dsl.MessageSourceSpec;
26+
import org.springframework.integration.expression.SupplierExpression;
27+
import org.springframework.integration.expression.ValueExpression;
28+
import org.springframework.integration.mongodb.inbound.AbstractMongoDbMessageSource;
29+
30+
/**
31+
* A {@link MessageSourceSpec} extension for common MongoDB sources options.
32+
*
33+
* @author Artem Bilan
34+
*
35+
* @since 5.5
36+
*/
37+
public class AbstractMongoDbMessageSourceSpec<S extends AbstractMongoDbMessageSourceSpec<S, H>,
38+
H extends AbstractMongoDbMessageSource<?>>
39+
extends MessageSourceSpec<S, H> {
40+
41+
/**
42+
* Allow you to set the type of the entityClass that will be passed to the the MongoDB query method.
43+
* Default is {@link com.mongodb.DBObject}.
44+
* @param entityClass The entity class.
45+
* @return the spec
46+
* @see AbstractMongoDbMessageSource#setEntityClass(Class)
47+
*/
48+
public S entityClass(Class<?> entityClass) {
49+
this.target.setEntityClass(entityClass);
50+
return _this();
51+
}
52+
53+
/**
54+
* Allow you to manage which find* method to invoke.
55+
* @param expectSingleResult true if a single result is expected.
56+
* @return the spec
57+
* @see AbstractMongoDbMessageSource#setExpectSingleResult(boolean)
58+
*/
59+
public S expectSingleResult(boolean expectSingleResult) {
60+
this.target.setExpectSingleResult(expectSingleResult);
61+
return _this();
62+
}
63+
64+
/**
65+
* Configure a collection name to query against.
66+
* @param collectionName the name of the MongoDb collection
67+
* @return the spec
68+
*/
69+
public S collectionName(String collectionName) {
70+
return collectionNameExpression(new LiteralExpression(collectionName));
71+
}
72+
73+
/**
74+
* Configure a SpEL expression to evaluation a collection name on each {@code receive()} call.
75+
* @param collectionNameExpression the SpEL expression for name of the MongoDb collection
76+
* @return the spec
77+
*/
78+
public S collectionNameExpression(String collectionNameExpression) {
79+
return collectionNameExpression(PARSER.parseExpression(collectionNameExpression));
80+
}
81+
82+
/**
83+
* Configure a {@link Supplier} to obtain a collection name on each {@code receive()} call.
84+
* @param collectionNameSupplier the {@link Supplier} for name of the MongoDb collection
85+
* @return the spec
86+
*/
87+
public S collectionNameSupplier(Supplier<String> collectionNameSupplier) {
88+
return collectionNameExpression(new SupplierExpression<>(collectionNameSupplier));
89+
}
90+
91+
/**
92+
* Configure a SpEL expression to evaluation a collection name on each {@code receive()} call.
93+
* @param collectionNameExpression the SpEL expression for name of the MongoDb collection
94+
* @return the spec
95+
* @see AbstractMongoDbMessageSource#setCollectionNameExpression(Expression)
96+
*/
97+
public S collectionNameExpression(Expression collectionNameExpression) {
98+
this.target.setCollectionNameExpression(collectionNameExpression);
99+
return _this();
100+
}
101+
102+
/**
103+
* Configure a custom {@link MongoConverter} used to assist in deserialization
104+
* data read from MongoDb.
105+
* @param mongoConverter The mongo converter.
106+
* @return the spec
107+
* @see AbstractMongoDbMessageSource#setMongoConverter(MongoConverter)
108+
*/
109+
public S mongoConverter(MongoConverter mongoConverter) {
110+
this.target.setMongoConverter(mongoConverter);
111+
return _this();
112+
}
113+
114+
/**
115+
* Configure a MongoDB update.
116+
* @param update the MongoDB update.
117+
* @return the spec
118+
*/
119+
public S update(String update) {
120+
return update(new LiteralExpression(update));
121+
}
122+
123+
/**
124+
* Configure a MongoDB update.
125+
* @param update the MongoDB update.
126+
* @return the spec
127+
*/
128+
public S update(Update update) {
129+
return update(new ValueExpression<>(update));
130+
}
131+
132+
/**
133+
* Configure a {@link Supplier} to produce a MongoDB update on each receive call.
134+
* @param updateSupplier the {@link Supplier} for MongoDB update.
135+
* @return the spec
136+
*/
137+
public S updateSupplier(Supplier<Update> updateSupplier) {
138+
return update(new SupplierExpression<>(updateSupplier));
139+
}
140+
141+
/**
142+
* Configure a SpEL expression to evaluate a MongoDB update.
143+
* @param updateExpression the expression to evaluate a MongoDB update.
144+
* @return the spec
145+
*/
146+
public S update(Expression updateExpression) {
147+
this.target.setUpdateExpression(updateExpression);
148+
return _this();
149+
}
150+
151+
}

spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/dsl/MongoDb.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -154,6 +154,54 @@ public static MongoDbChangeStreamMessageProducerSpec changeStreamInboundChannelA
154154
return new MongoDbChangeStreamMessageProducerSpec(new MongoDbChangeStreamMessageProducer(mongoOperations));
155155
}
156156

157+
/**
158+
* Create a {@link MongoDbMessageSourceSpec} builder instance
159+
* based on the provided {@link MongoDatabaseFactory}.
160+
* @param mongoDbFactory the {@link MongoDatabaseFactory} to use.
161+
* @param query the MongoDb query
162+
* @return the {@link MongoDbMessageSourceSpec} instance
163+
* @since 5.5
164+
*/
165+
public static MongoDbMessageSourceSpec inboundChannelAdapter(MongoDatabaseFactory mongoDbFactory, String query) {
166+
return new MongoDbMessageSourceSpec(mongoDbFactory, new LiteralExpression(query));
167+
}
168+
169+
/**
170+
* Create a {@link MongoDbMessageSourceSpec} builder instance
171+
* based on the provided {@link MongoDatabaseFactory}.
172+
* @param mongoDbFactory the {@link MongoDatabaseFactory} to use.
173+
* @param query the MongoDb query DSL object
174+
* @return the {@link MongoDbMessageSourceSpec} instance
175+
* @since 5.5
176+
*/
177+
public static MongoDbMessageSourceSpec inboundChannelAdapter(MongoDatabaseFactory mongoDbFactory, Query query) {
178+
return new MongoDbMessageSourceSpec(mongoDbFactory, new ValueExpression<>(query));
179+
}
180+
181+
/**
182+
* Create a {@link MongoDbMessageSourceSpec} builder instance
183+
* based on the provided {@link MongoOperations}.
184+
* @param mongoTemplate the {@link MongoOperations} to use.
185+
* @param query the MongoDb query
186+
* @return the {@link MongoDbMessageSourceSpec} instance
187+
* @since 5.5
188+
*/
189+
public static MongoDbMessageSourceSpec inboundChannelAdapter(MongoOperations mongoTemplate, String query) {
190+
return new MongoDbMessageSourceSpec(mongoTemplate, new LiteralExpression(query));
191+
}
192+
193+
/**
194+
* Create a {@link MongoDbMessageSourceSpec} builder instance
195+
* based on the provided {@link MongoOperations}.
196+
* @param mongoTemplate the {@link MongoOperations} to use.
197+
* @param query the MongoDb query DSL object
198+
* @return the {@link MongoDbMessageSourceSpec} instance
199+
* @since 5.5
200+
*/
201+
public static MongoDbMessageSourceSpec reactiveInboundChannelAdapter(MongoOperations mongoTemplate, Query query) {
202+
return new MongoDbMessageSourceSpec(mongoTemplate, new ValueExpression<>(query));
203+
}
204+
157205
private MongoDb() {
158206
}
159207

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mongodb.dsl;
18+
19+
import org.springframework.data.mongodb.MongoDatabaseFactory;
20+
import org.springframework.data.mongodb.core.MongoOperations;
21+
import org.springframework.expression.Expression;
22+
import org.springframework.integration.mongodb.inbound.MongoDbMessageSource;
23+
24+
/**
25+
* A {@link AbstractMongoDbMessageSourceSpec} implementation for a {@link MongoDbMessageSource}.
26+
*
27+
* @author Artem Bilan
28+
*
29+
* @since 5.5
30+
*/
31+
public class MongoDbMessageSourceSpec
32+
extends AbstractMongoDbMessageSourceSpec<MongoDbMessageSourceSpec, MongoDbMessageSource> {
33+
34+
protected MongoDbMessageSourceSpec(MongoDatabaseFactory mongoDatabaseFactory, Expression queryExpression) {
35+
this.target = new MongoDbMessageSource(mongoDatabaseFactory, queryExpression);
36+
}
37+
38+
protected MongoDbMessageSourceSpec(MongoOperations mongoTemplate, Expression queryExpression) {
39+
this.target = new MongoDbMessageSource(mongoTemplate, queryExpression);
40+
}
41+
42+
}

0 commit comments

Comments
 (0)