Skip to content

Commit e36a06f

Browse files
feat: adds postgres impl for new query interface (#87)
* wip branch * feat: adds support of aggregation api only for where clause * adds integration test for one of the operator * reverting comments added for understanding * reverting comments that were added for understanding * rename excpetion strings * Keeping new v1 references fully qualified * Update document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/PostgresQueryParser.java Co-authored-by: Suresh Prakash <[email protected]> * Update document-store/src/main/java/org/hypertrace/core/documentstore/postgres/utils/PostgresUtils.java Co-authored-by: Suresh Prakash <[email protected]> * fixed spotless issue * addressed comments of adding checks for documents * fixed spotless issue Co-authored-by: Suresh Prakash <[email protected]>
1 parent d40f191 commit e36a06f

File tree

10 files changed

+596
-1
lines changed

10 files changed

+596
-1
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
package org.hypertrace.core.documentstore;
22

3+
import static org.hypertrace.core.documentstore.expression.operators.RelationalOperator.NEQ;
34
import static org.hypertrace.core.documentstore.utils.CreateUpdateTestThread.FAILURE;
45
import static org.hypertrace.core.documentstore.utils.CreateUpdateTestThread.SUCCESS;
6+
import static org.hypertrace.core.documentstore.utils.Utils.convertDocumentToMap;
7+
import static org.hypertrace.core.documentstore.utils.Utils.convertJsonToMap;
8+
import static org.hypertrace.core.documentstore.utils.Utils.createDocumentsFromResource;
9+
import static org.hypertrace.core.documentstore.utils.Utils.readFileFromResource;
510
import static org.junit.jupiter.api.Assertions.assertEquals;
611
import static org.junit.jupiter.api.Assertions.assertFalse;
712
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -26,6 +31,9 @@
2631
import org.apache.commons.lang3.tuple.ImmutablePair;
2732
import org.bson.codecs.configuration.CodecConfigurationException;
2833
import org.hypertrace.core.documentstore.Filter.Op;
34+
import org.hypertrace.core.documentstore.expression.impl.ConstantExpression;
35+
import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression;
36+
import org.hypertrace.core.documentstore.expression.impl.RelationalExpression;
2937
import org.hypertrace.core.documentstore.mongo.MongoDatastore;
3038
import org.hypertrace.core.documentstore.postgres.PostgresDatastore;
3139
import org.hypertrace.core.documentstore.utils.CreateUpdateTestThread;
@@ -1487,6 +1495,29 @@ public void testSearchIteratorInterface(String dataStoreName) throws IOException
14871495
}
14881496
}
14891497

1498+
@ParameterizedTest
1499+
@MethodSource("databaseContextProvider")
1500+
public void testNewAggregateApiWhereClause(String dataStoreName) throws IOException {
1501+
Map<Key, Document> documents = createDocumentsFromResource("mongo/collection_data.json");
1502+
Datastore datastore = datastoreMap.get(dataStoreName);
1503+
Collection collection = datastore.getCollection(COLLECTION_NAME);
1504+
1505+
// add docs
1506+
boolean result = collection.bulkUpsert(documents);
1507+
Assertions.assertTrue(result);
1508+
1509+
// query docs
1510+
org.hypertrace.core.documentstore.query.Query query =
1511+
org.hypertrace.core.documentstore.query.Query.builder()
1512+
.setFilter(
1513+
RelationalExpression.of(
1514+
IdentifierExpression.of("quantity"), NEQ, ConstantExpression.of(10)))
1515+
.build();
1516+
1517+
Iterator<Document> iterator = collection.aggregate(query);
1518+
assertSizeAndDocsEqual(dataStoreName, iterator, 6, "mongo/simple_filter_quantity_neq_10.json");
1519+
}
1520+
14901521
private Map<String, List<CreateUpdateTestThread>> executeCreateUpdateThreads(
14911522
Collection collection, Operation operation, int numThreads, SingleValueKey documentKey) {
14921523
List<CreateUpdateTestThread> threads = new ArrayList<CreateUpdateTestThread>();
@@ -1577,4 +1608,36 @@ static String getId(String dataStoreName) {
15771608
return "id";
15781609
}
15791610
}
1611+
1612+
private static void assertSizeAndDocsEqual(
1613+
String dataStoreName, Iterator<Document> documents, int expectedSize, String filePath)
1614+
throws IOException {
1615+
String fileContent = readFileFromResource(filePath).orElseThrow();
1616+
List<Map<String, Object>> expectedDocs = convertJsonToMap(fileContent);
1617+
1618+
List<Map<String, Object>> actualDocs = new ArrayList<>();
1619+
int actualSize = 0;
1620+
while (documents.hasNext()) {
1621+
Map<String, Object> doc = convertDocumentToMap(documents.next());
1622+
removesDateRelatedFields(dataStoreName, doc);
1623+
actualDocs.add(doc);
1624+
actualSize++;
1625+
}
1626+
1627+
long count =
1628+
expectedDocs.stream().filter(expectedDoc -> actualDocs.contains(expectedDoc)).count();
1629+
assertEquals(expectedSize, actualSize);
1630+
assertEquals(expectedSize, count);
1631+
}
1632+
1633+
private static void removesDateRelatedFields(String dataStoreName, Map<String, Object> document) {
1634+
if (isMongo(dataStoreName)) {
1635+
document.remove(MONGO_CREATED_TIME_KEY);
1636+
document.remove(MONGO_LAST_UPDATED_TIME_KEY);
1637+
document.remove(MONGO_LAST_UPDATE_TIME_KEY);
1638+
} else if (isPostgress(dataStoreName)) {
1639+
document.remove(POSTGRES_CREATED_AT);
1640+
document.remove(POSTGRES_UPDATED_AT);
1641+
}
1642+
}
15801643
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
[
2+
{
3+
"item": "Soap",
4+
"price": 10,
5+
"quantity": 2,
6+
"date": "2014-03-01T08:00:00Z",
7+
"props": {
8+
"brand": "Dettol",
9+
"size": "M",
10+
"seller": {
11+
"name": "Metro Chemicals Pvt. Ltd.",
12+
"address": {
13+
"city": "Mumbai",
14+
"pincode": 400004
15+
}
16+
}
17+
},
18+
"sales": [
19+
{
20+
"city": "delhi",
21+
"medium": [
22+
{
23+
"type": "distributionChannel",
24+
"volume": 1000
25+
},
26+
{
27+
"type": "retail",
28+
"volume": 500
29+
},
30+
{
31+
"type": "online",
32+
"volume": 1000
33+
}
34+
]
35+
},
36+
{
37+
"city": "pune",
38+
"medium": [
39+
{
40+
"type": "distributionChannel",
41+
"volume": 300
42+
},
43+
{
44+
"type": "online",
45+
"volume": 2000
46+
}
47+
]
48+
}
49+
]
50+
},
51+
{
52+
"item": "Mirror",
53+
"price": 20,
54+
"quantity": 1,
55+
"date": "2014-03-01T09:00:00Z",
56+
"sales": [
57+
{
58+
"city": "delhi",
59+
"medium": []
60+
}
61+
]
62+
},
63+
{
64+
"item": "Shampoo",
65+
"price": 5,
66+
"quantity": 20,
67+
"date": "2014-04-04T11:21:39.736Z",
68+
"sales": []
69+
},
70+
{
71+
"item": "Soap",
72+
"price": 20,
73+
"quantity": 5,
74+
"date": "2014-04-04T21:23:13.331Z",
75+
"props": {
76+
"brand": "Lifebuoy",
77+
"size": "S",
78+
"seller": {
79+
"name": "Hans and Co.",
80+
"address": {
81+
"city": "Kolkata",
82+
"pincode": 700007
83+
}
84+
}
85+
}
86+
},
87+
{
88+
"item": "Comb",
89+
"price": 7.5,
90+
"quantity": 5,
91+
"date": "2015-06-04T05:08:13Z"
92+
},
93+
{
94+
"item": "Soap",
95+
"price": 10,
96+
"quantity": 5,
97+
"date": "2016-02-06T20:20:13Z"
98+
}
99+
]

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,18 @@ public CloseableIterator<Document> find(
287287
@Override
288288
public CloseableIterator<Document> aggregate(
289289
final org.hypertrace.core.documentstore.query.Query query) {
290-
throw new UnsupportedOperationException();
290+
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser =
291+
new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser(collectionName);
292+
String sqlQuery = queryParser.parse(query);
293+
try {
294+
PreparedStatement preparedStatement =
295+
buildPreparedStatement(sqlQuery, queryParser.getParamsBuilder().build());
296+
ResultSet resultSet = preparedStatement.executeQuery();
297+
return new PostgresResultIterator(resultSet);
298+
} catch (SQLException e) {
299+
LOGGER.error("SQLException querying documents. query: {}", query, e);
300+
}
301+
return EMPTY_ITERATOR;
291302
}
292303

293304
@Override
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package org.hypertrace.core.documentstore.postgres.query.v1;
2+
3+
import java.util.List;
4+
import java.util.Optional;
5+
import org.hypertrace.core.documentstore.expression.type.FilterTypeExpression;
6+
import org.hypertrace.core.documentstore.expression.type.GroupTypeExpression;
7+
import org.hypertrace.core.documentstore.postgres.Params;
8+
import org.hypertrace.core.documentstore.postgres.Params.Builder;
9+
import org.hypertrace.core.documentstore.postgres.query.v1.vistors.PostgresFilterTypeExpressionVisitor;
10+
import org.hypertrace.core.documentstore.query.Pagination;
11+
import org.hypertrace.core.documentstore.query.Query;
12+
import org.hypertrace.core.documentstore.query.SortingSpec;
13+
14+
public class PostgresQueryParser {
15+
private static String NOT_YET_SUPPORTED = "Not yet supported %s";
16+
private Builder paramsBuilder;
17+
private final String collection;
18+
19+
public PostgresQueryParser(String collection) {
20+
this.collection = collection;
21+
}
22+
23+
public Builder getParamsBuilder() {
24+
return paramsBuilder;
25+
}
26+
27+
public String parse(Query query) {
28+
// prepare selection and form clause
29+
// TODO : add impl for selection + form clause for unwind
30+
StringBuilder sqlBuilder = new StringBuilder(String.format("SELECT * FROM %s", collection));
31+
paramsBuilder = Params.newBuilder();
32+
33+
// where clause
34+
Optional<String> whereFilter = parseFilter(query.getFilter());
35+
if (whereFilter.isPresent()) {
36+
sqlBuilder.append(String.format(" WHERE %s", whereFilter.get()));
37+
}
38+
39+
// group by
40+
Optional<String> groupBy = parseGroupBy(query.getAggregations());
41+
if (groupBy.isPresent()) {
42+
sqlBuilder.append(String.format(" GROUP BY %s", groupBy.get()));
43+
}
44+
45+
// having
46+
Optional<String> having = parseHaving(query.getAggregationFilter());
47+
if (having.isPresent()) {
48+
sqlBuilder.append(String.format(" HAVING %s", having.get()));
49+
}
50+
51+
// order by
52+
Optional<String> orderBy = parseOrderBy(query.getSorts());
53+
if (having.isPresent()) {
54+
sqlBuilder.append(String.format(" ORDER BY %s", orderBy.get()));
55+
}
56+
57+
// offset and limit
58+
Optional<String> pagination = parsePagination(query.getPagination());
59+
if (having.isPresent()) {
60+
sqlBuilder.append(String.format(" %s", pagination.get()));
61+
}
62+
63+
return sqlBuilder.toString();
64+
}
65+
66+
private Optional<String> parseFilter(Optional<FilterTypeExpression> filterTypeExpression) {
67+
return filterTypeExpression.map(
68+
expression -> expression.accept(new PostgresFilterTypeExpressionVisitor(this)));
69+
}
70+
71+
private Optional<String> parseGroupBy(List<GroupTypeExpression> groupTypeExpressionList) {
72+
if (groupTypeExpressionList.size() > 0) {
73+
throw new UnsupportedOperationException(String.format(NOT_YET_SUPPORTED, "group by clause"));
74+
}
75+
return Optional.empty();
76+
}
77+
78+
private Optional<String> parseHaving(Optional<FilterTypeExpression> filterTypeExpression) {
79+
if (filterTypeExpression.isPresent()) {
80+
throw new UnsupportedOperationException(String.format(NOT_YET_SUPPORTED, "having clause"));
81+
}
82+
return Optional.empty();
83+
}
84+
85+
private Optional<String> parseOrderBy(List<SortingSpec> sortingSpecs) {
86+
if (sortingSpecs.size() > 0) {
87+
throw new UnsupportedOperationException(String.format(NOT_YET_SUPPORTED, "order by clause"));
88+
}
89+
return Optional.empty();
90+
}
91+
92+
private Optional<String> parsePagination(Optional<Pagination> pagination) {
93+
if (pagination.isPresent()) {
94+
throw new UnsupportedOperationException(
95+
String.format(NOT_YET_SUPPORTED, "pagination clause"));
96+
}
97+
return Optional.empty();
98+
}
99+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.hypertrace.core.documentstore.postgres.query.v1.vistors;
2+
3+
import org.hypertrace.core.documentstore.expression.impl.ConstantExpression;
4+
5+
public class PostgresConstantExpressionVisitor extends PostgresSelectTypeExpressionVisitor {
6+
7+
@Override
8+
public Object visit(final ConstantExpression expression) {
9+
return expression.getValue();
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package org.hypertrace.core.documentstore.postgres.query.v1.vistors;
2+
3+
import org.hypertrace.core.documentstore.expression.impl.LogicalExpression;
4+
import org.hypertrace.core.documentstore.expression.impl.RelationalExpression;
5+
import org.hypertrace.core.documentstore.expression.operators.RelationalOperator;
6+
import org.hypertrace.core.documentstore.expression.type.SelectTypeExpression;
7+
import org.hypertrace.core.documentstore.parser.FilterTypeExpressionVisitor;
8+
import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser;
9+
import org.hypertrace.core.documentstore.postgres.utils.PostgresUtils;
10+
11+
public class PostgresFilterTypeExpressionVisitor implements FilterTypeExpressionVisitor {
12+
13+
private PostgresQueryParser postgresQueryParser;
14+
15+
public PostgresFilterTypeExpressionVisitor(PostgresQueryParser postgresQueryParser) {
16+
this.postgresQueryParser = postgresQueryParser;
17+
}
18+
19+
@Override
20+
public <T> T visit(final LogicalExpression expression) {
21+
throw new UnsupportedOperationException("Not yet supported logical expression");
22+
}
23+
24+
@Override
25+
public String visit(final RelationalExpression expression) {
26+
SelectTypeExpression lhs = expression.getLhs();
27+
RelationalOperator operator = expression.getOperator();
28+
SelectTypeExpression rhs = expression.getRhs();
29+
30+
// Only an identifier LHS and a constant RHS is supported as of now.
31+
PostgresSelectTypeExpressionVisitor lhsParser = new PostgresIdentifierExpressionVisitor();
32+
PostgresSelectTypeExpressionVisitor rhsParser = new PostgresConstantExpressionVisitor();
33+
34+
String key = lhs.accept(lhsParser);
35+
Object value = rhs.accept(rhsParser);
36+
37+
return PostgresUtils.parseNonCompositeFilter(
38+
key, operator.toString(), value, this.postgresQueryParser.getParamsBuilder());
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.hypertrace.core.documentstore.postgres.query.v1.vistors;
2+
3+
import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression;
4+
5+
public class PostgresIdentifierExpressionVisitor extends PostgresSelectTypeExpressionVisitor {
6+
7+
@Override
8+
public String visit(final IdentifierExpression expression) {
9+
return expression.getName();
10+
}
11+
}

0 commit comments

Comments
 (0)