Skip to content

Commit 230c320

Browse files
christophstroblmp911de
authored andcommitted
DATAMONGO-2622 - Add support for $unionWith aggregation stage.
We now support the $unionWith aggregation stage via the UnionWithOperation that performs a union of two collections by combining pipeline results, potentially containing duplicates, into a single result set that is handed over to the next stage. In order to remove duplicates it is possible to append a GroupOperation right after UnionWithOperation. If the UnionWithOperation uses a pipeline to process documents, field names within the pipeline will be treated as is. In order to map domain type property names to actual field names (considering potential org.springframework.data.mongodb.core.mapping.Field annotations) make sure the enclosing aggregation is a TypedAggregation and provide the target type for the $unionWith stage via mapFieldsTo(Class). Original pull request: #886.
1 parent 4548d07 commit 230c320

File tree

7 files changed

+366
-7
lines changed

7 files changed

+366
-7
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/AggregationUtil.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
2929
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
3030
import org.springframework.data.mongodb.core.aggregation.CountOperation;
31+
import org.springframework.data.mongodb.core.aggregation.RelaxedTypeBasedAggregationOperationContext;
3132
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
3233
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
3334
import org.springframework.data.mongodb.core.convert.QueryMapper;
@@ -75,12 +76,17 @@ AggregationOperationContext prepareAggregationContext(Aggregation aggregation,
7576
return context;
7677
}
7778

78-
if (aggregation instanceof TypedAggregation) {
79-
return new TypeBasedAggregationOperationContext(((TypedAggregation) aggregation).getInputType(), mappingContext,
80-
queryMapper);
79+
if (!(aggregation instanceof TypedAggregation)) {
80+
return Aggregation.DEFAULT_CONTEXT;
8181
}
8282

83-
return Aggregation.DEFAULT_CONTEXT;
83+
Class<?> inputType = ((TypedAggregation) aggregation).getInputType();
84+
85+
if (aggregation.getPipeline().requiresRelaxedChecking()) {
86+
return new RelaxedTypeBasedAggregationOperationContext(inputType, mappingContext, queryMapper);
87+
}
88+
89+
return new TypeBasedAggregationOperationContext(inputType, mappingContext, queryMapper);
8490
}
8591

8692
/**

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1977,9 +1977,7 @@ public <O> AggregationResults<O> aggregate(TypedAggregation<?> aggregation, Stri
19771977

19781978
Assert.notNull(aggregation, "Aggregation pipeline must not be null!");
19791979

1980-
AggregationOperationContext context = new TypeBasedAggregationOperationContext(aggregation.getInputType(),
1981-
mappingContext, queryMapper);
1982-
return aggregate(aggregation, inputCollectionName, outputType, context);
1980+
return aggregate(aggregation, inputCollectionName, outputType, null);
19831981
}
19841982

19851983
/* (non-Javadoc)

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationPipeline.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public AggregationPipeline() {
4545
* @param aggregationOperations must not be {@literal null}.
4646
*/
4747
public AggregationPipeline(List<AggregationOperation> aggregationOperations) {
48+
49+
Assert.notNull(aggregationOperations, "AggregationOperations must not be null!");
4850
pipeline = new ArrayList<>(aggregationOperations);
4951
}
5052

@@ -108,4 +110,37 @@ void verify() {
108110
private boolean isLast(AggregationOperation aggregationOperation) {
109111
return pipeline.indexOf(aggregationOperation) == pipeline.size() - 1;
110112
}
113+
114+
/**
115+
* @return {@literal true} if field names might get computed by one of the pipeline stages, that the
116+
* {@link AggregationOperationContext} might not be aware of. A strongly typed context might fail to resolve
117+
* field references, so if {@literal true} usage of a {@link RelaxedTypeBasedAggregationOperationContext}
118+
* might be the better choice.
119+
* @since 3.1
120+
*/
121+
public boolean requiresRelaxedChecking() {
122+
return pipelineContainsValueOfType(UnionWithOperation.class);
123+
}
124+
125+
/**
126+
* @return {@literal true} if the pipeline does not contain any stages.
127+
* @since 3.1
128+
*/
129+
public boolean isEmpty() {
130+
return pipeline.isEmpty();
131+
}
132+
133+
private boolean pipelineContainsValueOfType(Class<?> type) {
134+
135+
if (isEmpty()) {
136+
return false;
137+
}
138+
139+
for (Object element : pipeline) {
140+
if (type.isInstance(element)) {
141+
return true;
142+
}
143+
}
144+
return false;
145+
}
111146
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/ExposedFieldsAggregationOperationContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,12 @@ protected FieldReference resolveExposedField(@Nullable Field field, String name)
149149
}
150150
return null;
151151
}
152+
153+
/**
154+
* @return obtain the root context used to resolve references.
155+
* @since 3.1
156+
*/
157+
AggregationOperationContext getRootContext() {
158+
return rootContext;
159+
}
152160
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/TypeBasedAggregationOperationContext.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,19 @@ public Fields getFields(Class<?> type) {
133133
*/
134134
@Override
135135
public AggregationOperationContext continueOnMissingFieldReference() {
136+
return continueOnMissingFieldReference(type);
137+
}
138+
139+
/**
140+
* This toggle allows the {@link AggregationOperationContext context} to use any given field name without checking for
141+
* its existence. Typically the {@link AggregationOperationContext} fails when referencing unknown fields, those that
142+
* are not present in one of the previous stages or the input source, throughout the pipeline.
143+
*
144+
* @param type The domain type to map fields to.
145+
* @return a more relaxed {@link AggregationOperationContext}.
146+
* @since 3.1
147+
*/
148+
public AggregationOperationContext continueOnMissingFieldReference(Class<?> type) {
136149
return new RelaxedTypeBasedAggregationOperationContext(type, mappingContext, mapper);
137150
}
138151

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core.aggregation;
17+
18+
import java.util.Arrays;
19+
import java.util.List;
20+
21+
import org.bson.Document;
22+
import org.springframework.lang.Nullable;
23+
import org.springframework.util.Assert;
24+
25+
/**
26+
* The <a href="https://docs.mongodb.com/master/reference/operator/aggregation/unionWith/">$unionWith</a> aggregation
27+
* stage (available since MongoDB 4.4) performs a union of two collections by combining pipeline results, potentially
28+
* containing duplicates, into a single result set that is handed over to the next stage. <br />
29+
* In order to remove duplicates it is possible to append a {@link GroupOperation} right after
30+
* {@link UnionWithOperation}.
31+
* <p />
32+
* If the {@link UnionWithOperation} uses a
33+
* <a href="https://docs.mongodb.com/master/reference/operator/aggregation/unionWith/#unionwith-pipeline">pipeline</a>
34+
* to process documents, field names within the pipeline will be treated as is. In order to map domain type property
35+
* names to actual field names (considering potential {@link org.springframework.data.mongodb.core.mapping.Field}
36+
* annotations) make sure the enclosing aggregation is a {@link TypedAggregation} and provide the target type for the
37+
* {@code $unionWith} stage via {@link #mapFieldsTo(Class)}.
38+
*
39+
* @author Christoph Strobl
40+
* @see <a href="https://docs.mongodb.com/master/reference/operator/aggregation/unionWith/">Aggregation Pipeline Stage:
41+
* $unionWith</a>
42+
* @since 3.1
43+
*/
44+
public class UnionWithOperation implements AggregationOperation {
45+
46+
private final String collection;
47+
48+
@Nullable //
49+
private final AggregationPipeline pipeline;
50+
51+
@Nullable //
52+
private final Class<?> domainType;
53+
54+
public UnionWithOperation(String collection, @Nullable AggregationPipeline pipeline, @Nullable Class<?> domainType) {
55+
56+
this.collection = collection;
57+
this.pipeline = pipeline;
58+
this.domainType = domainType;
59+
}
60+
61+
/**
62+
* Set the name of the collection from which pipeline results should be included in the result set.<br />
63+
* The collection name is used to set the {@code coll} parameter of {@code $unionWith}.
64+
*
65+
* @param collection the MongoDB collection name. Must not be {@literal null}.
66+
* @return new instance of {@link UnionWithOperation}.
67+
* @throws IllegalArgumentException if the required argument is {@literal null}.
68+
*/
69+
public static UnionWithOperation unionWith(String collection) {
70+
71+
Assert.notNull(collection, "Collection must not be null!");
72+
return new UnionWithOperation(collection, null, null);
73+
}
74+
75+
/**
76+
* Set the {@link AggregationPipeline} to apply to the specified collection. The pipeline corresponds to the optional
77+
* {@code pipeline} field of the {@code $unionWith} aggregation stage and is used to compute the documents going into
78+
* the result set.
79+
*
80+
* @param pipeline the {@link AggregationPipeline} that computes the documents. Must not be {@literal null}.
81+
* @return new instance of {@link UnionWithOperation}.
82+
* @throws IllegalArgumentException if the required argument is {@literal null}.
83+
*/
84+
public UnionWithOperation pipeline(AggregationPipeline pipeline) {
85+
return new UnionWithOperation(collection, pipeline, domainType);
86+
}
87+
88+
/**
89+
* Set the aggregation pipeline stages to apply to the specified collection. The pipeline corresponds to the optional
90+
* {@code pipeline} field of the {@code $unionWith} aggregation stage and is used to compute the documents going into
91+
* the result set.
92+
*
93+
* @param aggregationStages the aggregation pipeline stages that compute the documents. Must not be {@literal null}.
94+
* @return new instance of {@link UnionWithOperation}.
95+
* @throws IllegalArgumentException if the required argument is {@literal null}.
96+
*/
97+
public UnionWithOperation pipeline(List<AggregationOperation> aggregationStages) {
98+
return new UnionWithOperation(collection, new AggregationPipeline(aggregationStages), domainType);
99+
}
100+
101+
/**
102+
* Set the aggregation pipeline stages to apply to the specified collection. The pipeline corresponds to the optional
103+
* {@code pipeline} field of the {@code $unionWith} aggregation stage and is used to compute the documents going into
104+
* the result set.
105+
*
106+
* @param aggregationStages the aggregation pipeline stages that compute the documents. Must not be {@literal null}.
107+
* @return new instance of {@link UnionWithOperation}.
108+
* @throws IllegalArgumentException if the required argument is {@literal null}.
109+
*/
110+
public UnionWithOperation pipeline(AggregationOperation... aggregationStages) {
111+
return new UnionWithOperation(collection, new AggregationPipeline(Arrays.asList(aggregationStages)), domainType);
112+
}
113+
114+
/**
115+
* Set domain type used for field name mapping of property references used by the {@link AggregationPipeline}.
116+
* Remember to also use a {@link TypedAggregation} in the outer pipeline.<br />
117+
* If not set, field names used within {@link AggregationOperation pipeline operations} are taken as is.
118+
*
119+
* @param domainType the domain type to map field names used in pipeline operations to. Must not be {@literal null}.
120+
* @return new instance of {@link UnionWithOperation}.
121+
* @throws IllegalArgumentException if the required argument is {@literal null}.
122+
*/
123+
public UnionWithOperation mapFieldsTo(Class<?> domainType) {
124+
125+
Assert.notNull(domainType, "DomainType must not be null!");
126+
return new UnionWithOperation(collection, pipeline, domainType);
127+
}
128+
129+
/*
130+
* (non-Javadoc)
131+
* @see org.springframework.data.mongodb.core.aggregation.AggregationOperation#toDocument(org.springframework.data.mongodb.core.aggregation.AggregationOperationContext)
132+
*/
133+
@Override
134+
public Document toDocument(AggregationOperationContext context) {
135+
136+
Document $unionWith = new Document("coll", collection);
137+
if (pipeline == null || pipeline.isEmpty()) {
138+
return new Document(getOperator(), $unionWith);
139+
}
140+
141+
$unionWith.append("pipeline", pipeline.toDocuments(computeContext(context)));
142+
return new Document(getOperator(), $unionWith);
143+
}
144+
145+
private AggregationOperationContext computeContext(AggregationOperationContext source) {
146+
147+
if (domainType == null) {
148+
return Aggregation.DEFAULT_CONTEXT;
149+
}
150+
151+
if (source instanceof TypeBasedAggregationOperationContext) {
152+
return ((TypeBasedAggregationOperationContext) source).continueOnMissingFieldReference(domainType);
153+
}
154+
155+
if (source instanceof ExposedFieldsAggregationOperationContext) {
156+
return computeContext(((ExposedFieldsAggregationOperationContext) source).getRootContext());
157+
}
158+
159+
return source;
160+
}
161+
162+
/*
163+
* (non-Javadoc)
164+
* @see org.springframework.data.mongodb.core.aggregation.AggregationOperation#getOperator()
165+
*/
166+
@Override
167+
public String getOperator() {
168+
return "$unionWith";
169+
}
170+
}

0 commit comments

Comments
 (0)