Skip to content

Commit cf37932

Browse files
author
Rishabh Singh
authored
Support for unnesting array fields (#71)
* Add interface for From clause * Add unnest expression * From visitor * Parser * Fix integ test * Update test * minor change * Add docs * Add integration test under MongoQueryExecutorIntegrationTest * Update docs * address review comments
1 parent 21bd250 commit cf37932

File tree

12 files changed

+317
-29
lines changed

12 files changed

+317
-29
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutorIntegrationTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression;
4343
import org.hypertrace.core.documentstore.expression.impl.LogicalExpression;
4444
import org.hypertrace.core.documentstore.expression.impl.RelationalExpression;
45+
import org.hypertrace.core.documentstore.expression.impl.UnnestExpression;
4546
import org.hypertrace.core.documentstore.query.Filter;
4647
import org.hypertrace.core.documentstore.query.Pagination;
4748
import org.hypertrace.core.documentstore.query.Query;
@@ -477,6 +478,24 @@ public void testDistinctCount() throws IOException {
477478
assertDocsEqual(resultDocs, "mongo/distinct_count_response.json");
478479
}
479480

481+
@Test
482+
public void testUnnestAndAggregate() throws IOException {
483+
org.hypertrace.core.documentstore.query.Query query =
484+
org.hypertrace.core.documentstore.query.Query.builder()
485+
.addSelection(IdentifierExpression.of("sales.medium.type"))
486+
.addAggregation(IdentifierExpression.of("sales.medium.type"))
487+
.addSelection(
488+
AggregateExpression.of(SUM, IdentifierExpression.of("sales.medium.volume")),
489+
"totalSales")
490+
.addFromClause(UnnestExpression.of(IdentifierExpression.of("sales")))
491+
.addFromClause(UnnestExpression.of(IdentifierExpression.of("sales.medium")))
492+
.addSort(IdentifierExpression.of("totalSales"), DESC)
493+
.build();
494+
495+
Iterator<Document> iterator = collection.aggregate(query);
496+
assertDocsEqual(iterator, "mongo/aggregate_on_nested_array_reponse.json");
497+
}
498+
480499
private static void assertDocsEqual(Iterator<Document> documents, String filePath)
481500
throws IOException {
482501
String fileContent = readFileFromResource(filePath).orElseThrow();
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
[
2+
{
3+
"sales": {
4+
"medium": {
5+
"type": "online"
6+
}
7+
},
8+
"totalSales": 9000
9+
},
10+
{
11+
"sales": {
12+
"medium": {
13+
"type": "distributionChannel"
14+
}
15+
},
16+
"totalSales": 5000
17+
},
18+
{
19+
"sales": {
20+
"medium": {
21+
"type": "retail"
22+
}
23+
},
24+
"totalSales": 1500
25+
}
26+
]

document-store/src/integrationTest/resources/mongo/collection_data.json

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,39 @@
1515
"pincode": 400004
1616
}
1717
}
18-
}
18+
},
19+
"sales": [
20+
{
21+
"city": "delhi",
22+
"medium": [
23+
{
24+
"type": "distributionChannel",
25+
"volume": 1000
26+
},
27+
{
28+
"type": "retail",
29+
"volume": 500
30+
},
31+
{
32+
"type": "online",
33+
"volume": 1000
34+
}
35+
]
36+
},
37+
{
38+
"city": "pune",
39+
"medium": [
40+
{
41+
"type": "distributionChannel",
42+
"volume": 300
43+
},
44+
{
45+
"type": "online",
46+
"volume": 2000
47+
}
48+
]
49+
}
50+
]
1951
},
2052
{
2153
"_id": 2,
@@ -40,7 +72,43 @@
4072
"pincode": 400004
4173
}
4274
}
43-
}
75+
},
76+
"sales": [
77+
{
78+
"city": "delhi",
79+
"medium": [
80+
{
81+
"type": "distributionChannel",
82+
"volume": 3000
83+
},
84+
{
85+
"type": "retail",
86+
"volume": 500
87+
},
88+
{
89+
"type": "online",
90+
"volume": 1000
91+
}
92+
]
93+
},
94+
{
95+
"city": "mumbai",
96+
"medium": [
97+
{
98+
"type": "distributionChannel",
99+
"volume": 700
100+
},
101+
{
102+
"type": "retail",
103+
"volume": 500
104+
},
105+
{
106+
"type": "online",
107+
"volume": 5000
108+
}
109+
]
110+
}
111+
]
44112
},
45113
{
46114
"_id": 4,
@@ -97,4 +165,4 @@
97165
"quantity": 5,
98166
"date": "2016-02-06T20:20:13Z"
99167
}
100-
]
168+
]
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.hypertrace.core.documentstore.expression.impl;
2+
3+
import com.google.common.base.Preconditions;
4+
import lombok.AccessLevel;
5+
import lombok.AllArgsConstructor;
6+
import lombok.Value;
7+
import org.hypertrace.core.documentstore.expression.type.FromTypeExpression;
8+
import org.hypertrace.core.documentstore.parser.FromTypeExpressionVisitor;
9+
10+
/**
11+
* This expression allows expanding an array field to a set of rows
12+
*
13+
* <p><code>
14+
* UnnestExpression.of(IdentifierExpression.of("array_col")) </code>
15+
*/
16+
@Value
17+
@AllArgsConstructor(access = AccessLevel.PRIVATE)
18+
public class UnnestExpression implements FromTypeExpression {
19+
20+
IdentifierExpression identifierExpression;
21+
22+
public static UnnestExpression of(final IdentifierExpression identifierExpression) {
23+
Preconditions.checkArgument(identifierExpression != null, "expression is null");
24+
return new UnnestExpression(identifierExpression);
25+
}
26+
27+
@Override
28+
public <T> T accept(FromTypeExpressionVisitor visitor) {
29+
return visitor.visit(this);
30+
}
31+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.hypertrace.core.documentstore.expression.type;
2+
3+
import org.hypertrace.core.documentstore.parser.FromTypeExpressionVisitor;
4+
5+
/**
6+
* Expression to retrieve rows from the referenced tables Implementations can perform table
7+
* functions, join, lateral subqueries
8+
*/
9+
public interface FromTypeExpression {
10+
<T> T accept(final FromTypeExpressionVisitor visitor);
11+
}

document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoQueryExecutor.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.hypertrace.core.documentstore.mongo;
22

3+
import static java.util.Collections.singleton;
34
import static java.util.function.Predicate.not;
45
import static org.hypertrace.core.documentstore.mongo.MongoPaginationHelper.applyPagination;
56
import static org.hypertrace.core.documentstore.mongo.MongoPaginationHelper.getLimitClause;
@@ -16,19 +17,33 @@
1617
import com.mongodb.client.AggregateIterable;
1718
import com.mongodb.client.FindIterable;
1819
import com.mongodb.client.MongoCursor;
20+
import java.util.Collection;
1921
import java.util.List;
22+
import java.util.function.Function;
2023
import java.util.stream.Collectors;
21-
import java.util.stream.Stream;
2224
import lombok.AllArgsConstructor;
2325
import lombok.extern.slf4j.Slf4j;
2426
import org.bson.conversions.Bson;
27+
import org.hypertrace.core.documentstore.mongo.parser.MongoFromTypeExpressionParser;
2528
import org.hypertrace.core.documentstore.mongo.query.transformer.MongoQueryTransformer;
2629
import org.hypertrace.core.documentstore.query.Pagination;
2730
import org.hypertrace.core.documentstore.query.Query;
2831

2932
@Slf4j
3033
@AllArgsConstructor
3134
public class MongoQueryExecutor {
35+
private static final List<Function<Query, Collection<BasicDBObject>>>
36+
AGGREGATE_PIPELINE_FUNCTIONS =
37+
List.of(
38+
query -> singleton(getFilterClause(query, Query::getFilter)),
39+
MongoFromTypeExpressionParser::getFromClauses,
40+
query -> singleton(getGroupClause(query)),
41+
query -> singleton(getProjectClause(query)),
42+
query -> singleton(getFilterClause(query, Query::getAggregationFilter)),
43+
query -> singleton(getSortClause(query)),
44+
query -> singleton(getSkipClause(query)),
45+
query -> singleton(getLimitClause(query)));
46+
3247
final com.mongodb.client.MongoCollection<BasicDBObject> collection;
3348

3449
public MongoCursor<BasicDBObject> find(final Query query) {
@@ -52,26 +67,9 @@ public MongoCursor<BasicDBObject> find(final Query query) {
5267
public MongoCursor<BasicDBObject> aggregate(final Query originalQuery) {
5368
Query query = transformAndLog(originalQuery);
5469

55-
BasicDBObject filterClause = getFilterClause(query, Query::getFilter);
56-
BasicDBObject groupFilterClause = getFilterClause(query, Query::getAggregationFilter);
57-
58-
BasicDBObject groupClause = getGroupClause(query);
59-
BasicDBObject sortClause = getSortClause(query);
60-
61-
BasicDBObject skipClause = getSkipClause(query);
62-
BasicDBObject limitClause = getLimitClause(query);
63-
64-
BasicDBObject projectClause = getProjectClause(query);
65-
6670
List<BasicDBObject> pipeline =
67-
Stream.of(
68-
filterClause,
69-
groupClause,
70-
projectClause,
71-
groupFilterClause,
72-
sortClause,
73-
skipClause,
74-
limitClause)
71+
AGGREGATE_PIPELINE_FUNCTIONS.stream()
72+
.flatMap(function -> function.apply(query).stream())
7573
.filter(not(BasicDBObject::isEmpty))
7674
.collect(Collectors.toList());
7775

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.hypertrace.core.documentstore.mongo.parser;
2+
3+
import com.mongodb.BasicDBObject;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.stream.Collectors;
7+
import org.hypertrace.core.documentstore.expression.impl.UnnestExpression;
8+
import org.hypertrace.core.documentstore.parser.FromTypeExpressionVisitor;
9+
import org.hypertrace.core.documentstore.query.Query;
10+
11+
public class MongoFromTypeExpressionParser implements FromTypeExpressionVisitor {
12+
13+
private static final String PATH_KEY = "path";
14+
private static final String UNWIND_OPERATOR = "$unwind";
15+
16+
private static final MongoIdentifierPrefixingParser mongoIdentifierPrefixingParser =
17+
new MongoIdentifierPrefixingParser(new MongoIdentifierExpressionParser());
18+
19+
@SuppressWarnings("unchecked")
20+
@Override
21+
public BasicDBObject visit(UnnestExpression unnestExpression) {
22+
String parsedIdentifierExpression =
23+
mongoIdentifierPrefixingParser.visit(unnestExpression.getIdentifierExpression());
24+
return new BasicDBObject(UNWIND_OPERATOR, Map.of(PATH_KEY, parsedIdentifierExpression));
25+
}
26+
27+
public static List<BasicDBObject> getFromClauses(final Query query) {
28+
MongoFromTypeExpressionParser mongoFromTypeExpressionParser =
29+
new MongoFromTypeExpressionParser();
30+
return query.getFromTypeExpressions().stream()
31+
.map(
32+
fromTypeExpression ->
33+
(BasicDBObject) fromTypeExpression.accept(mongoFromTypeExpressionParser))
34+
.collect(Collectors.toList());
35+
}
36+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.hypertrace.core.documentstore.parser;
2+
3+
import org.hypertrace.core.documentstore.expression.impl.UnnestExpression;
4+
5+
public interface FromTypeExpressionVisitor {
6+
<T> T visit(UnnestExpression unnestExpression);
7+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.hypertrace.core.documentstore.query;
2+
3+
import com.google.common.base.Preconditions;
4+
import java.util.List;
5+
import java.util.Objects;
6+
import lombok.AccessLevel;
7+
import lombok.AllArgsConstructor;
8+
import lombok.Builder;
9+
import lombok.Singular;
10+
import lombok.Value;
11+
import org.hypertrace.core.documentstore.expression.type.FromTypeExpression;
12+
13+
@Value
14+
@Builder(toBuilder = true)
15+
@AllArgsConstructor(access = AccessLevel.PRIVATE)
16+
public class FromClause {
17+
@Singular List<FromTypeExpression> fromTypeExpressions;
18+
19+
public static class FromClauseBuilder {
20+
public FromClause build() {
21+
Preconditions.checkArgument(!fromTypeExpressions.isEmpty(), "expressions is empty");
22+
Preconditions.checkArgument(
23+
fromTypeExpressions.stream().noneMatch(Objects::isNull),
24+
"One or more expressions is null");
25+
return new FromClause(fromTypeExpressions);
26+
}
27+
}
28+
}

0 commit comments

Comments
 (0)