Skip to content

Commit f0e2bef

Browse files
authored
Refactor Postgres Field Transformers and Collections (#236)
1 parent 3cc020b commit f0e2bef

22 files changed

+1123
-174
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java

Lines changed: 470 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"statements": [
3+
"INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"tags\", \"props\", \"sales\"\n) VALUES (\n1, 'Soap', 10, 2, '2014-03-01T08:00:00Z',\n'{\"hygiene\", \"personal-care\", \"premium\"}',\n'{\"colors\": [\"Blue\", \"Green\"], \"brand\": \"Dettol\", \"size\": \"M\", \"seller\": {\"name\": \"Metro Chemicals Pvt. Ltd.\", \"address\": {\"city\": \"Mumbai\", \"pincode\": 400004}}}',\nNULL\n)",
4+
"INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"tags\", \"props\", \"sales\"\n) VALUES (\n2, 'Mirror', 20, 1, '2014-03-01T09:00:00Z',\n'{\"home-decor\", \"reflective\", \"glass\"}',\nNULL,\nNULL\n)",
5+
"INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"tags\", \"props\", \"sales\"\n) VALUES (\n3, 'Shampoo', 5, 10, '2014-03-15T09:00:00Z',\n'{\"hair-care\", \"personal-care\", \"premium\", \"herbal\"}',\n'{\"colors\": [\"Black\"], \"brand\": \"Sunsilk\", \"size\": \"L\", \"seller\": {\"name\": \"Metro Chemicals Pvt. Ltd.\", \"address\": {\"city\": \"Mumbai\", \"pincode\": 400004}}}',\nNULL\n)",
6+
"INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"tags\", \"props\", \"sales\"\n) VALUES (\n4, 'Shampoo', 5, 20, '2014-04-04T11:21:39.736Z',\n'{\"hair-care\", \"budget\", \"bulk\"}',\nNULL,\nNULL\n)",
7+
"INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"tags\", \"props\", \"sales\"\n) VALUES (\n5, 'Soap', 20, 5, '2014-04-04T21:23:13.331Z',\n'{\"hygiene\", \"antibacterial\", \"family-pack\"}',\n'{\"colors\": [\"Orange\", \"Blue\"], \"brand\": \"Lifebuoy\", \"size\": \"S\", \"seller\": {\"name\": \"Hans and Co.\", \"address\": {\"city\": \"Kolkata\", \"pincode\": 700007}}}',\nNULL\n)",
8+
"INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"tags\", \"props\", \"sales\"\n) VALUES (\n6, 'Comb', 7.5, 5, '2015-06-04T05:08:13Z',\n'{\"grooming\", \"plastic\", \"essential\"}',\nNULL,\nNULL\n)",
9+
"INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"tags\", \"props\", \"sales\"\n) VALUES (\n7, 'Comb', 7.5, 10, '2015-09-10T08:43:00Z',\n'{\"grooming\", \"bulk\", \"wholesale\"}',\n'{\"colors\": [], \"seller\": {\"name\": \"Go Go Plastics\", \"address\": {\"city\": \"Kolkata\", \"pincode\": 700007}}}',\nNULL\n)",
10+
"INSERT INTO \"myTestFlat\" (\n\"_id\", \"item\", \"price\", \"quantity\", \"date\", \"tags\", \"props\", \"sales\"\n) VALUES (\n8, 'Soap', 10, 5, '2016-02-06T20:20:13Z',\n'{\"hygiene\", \"budget\", \"basic\"}',\nNULL,\nNULL\n)"
11+
]
12+
}

document-store/src/main/java/org/hypertrace/core/documentstore/Datastore.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.hypertrace.core.documentstore.metric.DocStoreMetricProvider;
66

77
public interface Datastore {
8+
89
Set<String> listCollections();
910

1011
boolean createCollection(String collectionName, Map<String, String> options);
@@ -19,4 +20,19 @@ public interface Datastore {
1920
DocStoreMetricProvider getDocStoreMetricProvider();
2021

2122
void close();
23+
24+
/**
25+
* Returns a collection with the given name and type. A type can be used to specify different
26+
* storage modes of the collection. For example, a collection can have all top-level fields or a
27+
* single JSON column that contains all the fields. Both collections are handled differently in
28+
* this case.
29+
*
30+
* @param collectionName name of the collection
31+
* @param documentType type of the collection. For PG, we support FLAT and Legacy (for backward
32+
* compatibility)
33+
* @return the corresponding collection impl
34+
*/
35+
default Collection getCollectionForType(String collectionName, DocumentType documentType) {
36+
throw new UnsupportedOperationException("Unsupported collection type: " + documentType);
37+
}
2238
}

document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoDatastore.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.bson.Document;
1616
import org.hypertrace.core.documentstore.Collection;
1717
import org.hypertrace.core.documentstore.Datastore;
18+
import org.hypertrace.core.documentstore.DocumentType;
1819
import org.hypertrace.core.documentstore.metric.DocStoreMetricProvider;
1920
import org.hypertrace.core.documentstore.metric.mongo.MongoDocStoreMetricProvider;
2021
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
@@ -25,6 +26,7 @@
2526

2627
@Slf4j
2728
public class MongoDatastore implements Datastore {
29+
2830
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDatastore.class);
2931

3032
private final ConnectionConfig connectionConfig;
@@ -78,6 +80,16 @@ public boolean deleteCollection(String collectionName) {
7880
return true;
7981
}
8082

83+
@Override
84+
public Collection getCollectionForType(String collectionName, DocumentType documentType) {
85+
// We support nested mongo docs
86+
if (documentType == DocumentType.NESTED) {
87+
return getCollection(collectionName);
88+
}
89+
throw new IllegalArgumentException(
90+
"Unsupported document type: " + documentType + " for Mongo collection");
91+
}
92+
8193
@Override
8294
public Collection getCollection(String collectionName) {
8395
return new MongoCollection(
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package org.hypertrace.core.documentstore.postgres;
2+
3+
import org.hypertrace.core.documentstore.CloseableIterator;
4+
import org.hypertrace.core.documentstore.Document;
5+
import org.hypertrace.core.documentstore.model.options.QueryOptions;
6+
import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser;
7+
import org.hypertrace.core.documentstore.postgres.query.v1.transformer.FlatPostgresFieldTransformer;
8+
import org.hypertrace.core.documentstore.query.Query;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
/**
13+
* PostgreSQL collection implementation for flat documents. All fields are stored as top-level
14+
* PostgreSQL columns.
15+
*/
16+
// todo: Throw unsupported op exception for all write methods
17+
public class FlatPostgresCollection extends PostgresCollection {
18+
19+
private static final Logger LOGGER = LoggerFactory.getLogger(FlatPostgresCollection.class);
20+
21+
FlatPostgresCollection(final PostgresClient client, final String collectionName) {
22+
super(client, collectionName);
23+
}
24+
25+
@Override
26+
public CloseableIterator<Document> query(
27+
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) {
28+
PostgresQueryParser queryParser = createParser(query);
29+
return queryWithParser(query, queryParser);
30+
}
31+
32+
@Override
33+
public CloseableIterator<Document> find(
34+
final org.hypertrace.core.documentstore.query.Query query) {
35+
PostgresQueryParser queryParser = createParser(query);
36+
return queryWithParser(query, queryParser);
37+
}
38+
39+
@Override
40+
public long count(
41+
org.hypertrace.core.documentstore.query.Query query, QueryOptions queryOptions) {
42+
PostgresQueryParser queryParser =
43+
new PostgresQueryParser(
44+
tableIdentifier,
45+
query,
46+
new org.hypertrace.core.documentstore.postgres.query.v1.transformer
47+
.FlatPostgresFieldTransformer());
48+
return countWithParser(query, queryParser);
49+
}
50+
51+
private PostgresQueryParser createParser(Query query) {
52+
return new PostgresQueryParser(
53+
tableIdentifier,
54+
PostgresQueryExecutor.transformAndLog(query),
55+
new FlatPostgresFieldTransformer());
56+
}
57+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package org.hypertrace.core.documentstore.postgres;
2+
3+
import org.hypertrace.core.documentstore.CloseableIterator;
4+
import org.hypertrace.core.documentstore.Document;
5+
import org.hypertrace.core.documentstore.model.options.QueryOptions;
6+
import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser;
7+
import org.hypertrace.core.documentstore.postgres.query.v1.transformer.NestedPostgresColTransformer;
8+
import org.hypertrace.core.documentstore.query.Query;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
/**
13+
* PostgreSQL collection implementation for legacy document storage mode. Fields are stored within
14+
* JSONB columns.
15+
*/
16+
public class NestedPostgresCollection extends PostgresCollection {
17+
18+
private static final Logger LOGGER = LoggerFactory.getLogger(NestedPostgresCollection.class);
19+
20+
NestedPostgresCollection(final PostgresClient client, final String collectionName) {
21+
super(client, collectionName);
22+
}
23+
24+
@Override
25+
public CloseableIterator<Document> query(
26+
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) {
27+
PostgresQueryParser queryParser = createParser(query);
28+
return queryWithParser(query, queryParser);
29+
}
30+
31+
@Override
32+
public CloseableIterator<Document> find(
33+
final org.hypertrace.core.documentstore.query.Query query) {
34+
PostgresQueryParser queryParser = createParser(query);
35+
return queryWithParser(query, queryParser);
36+
}
37+
38+
@Override
39+
public long count(
40+
org.hypertrace.core.documentstore.query.Query query, QueryOptions queryOptions) {
41+
PostgresQueryParser queryParser = createParser(query);
42+
return countWithParser(query, queryParser);
43+
}
44+
45+
private PostgresQueryParser createParser(Query query) {
46+
return new PostgresQueryParser(
47+
tableIdentifier,
48+
PostgresQueryExecutor.transformAndLog(query),
49+
new NestedPostgresColTransformer());
50+
}
51+
}

0 commit comments

Comments
 (0)