Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
674d2ee
WIP
suddendust Sep 22, 2025
f6cb67a
Revert "WIP"
suddendust Sep 22, 2025
6e7795d
Reapply "WIP"
suddendust Sep 22, 2025
957a897
Revert "Reapply "WIP""
suddendust Sep 22, 2025
502c373
Reapply "Reapply "WIP""
suddendust Sep 22, 2025
0544358
WIP
suddendust Sep 22, 2025
82dc37f
WIP
suddendust Sep 22, 2025
c0ed72a
Add default impl for `getDocumentType`
suddendust Sep 22, 2025
e9f25da
WIP
suddendust Sep 22, 2025
e6be8b9
Merge branch 'pg_subdocument' of github.com:suddendust/document-store…
suddendust Sep 22, 2025
ee7678f
Rollback unnecessary changes
suddendust Sep 22, 2025
63b0dec
Added default DocumentType in Document.java
suddendust Sep 22, 2025
7f91cf0
Address comments
suddendust Sep 23, 2025
7376b56
Merge branch 'main' into pg_subdocument
suddendust Sep 24, 2025
ea2aefd
Address comments
suddendust Sep 24, 2025
e3ccb5b
Merge remote-tracking branch 'myfork/pg_subdocument' into pg_subdocument
suddendust Sep 24, 2025
3e3e6cf
Address docs
suddendust Sep 24, 2025
e6a6f7e
Adds more test cases in JSONDocumentTest.java
suddendust Sep 24, 2025
483eb16
Spotless
suddendust Sep 24, 2025
e5926ea
Added more test cases
suddendust Sep 24, 2025
b261463
Spotless
suddendust Sep 24, 2025
7cf8e17
Add test case for PostgresResultIteratorWithBasicTypes
suddendust Sep 24, 2025
9c474db
Merge branch 'main' into pg_subdocument
suddendust Sep 24, 2025
cb2ed18
WIP
suddendust Sep 24, 2025
06fdc8f
Merge branch 'pg_subdocument' into poc
suddendust Sep 24, 2025
fbce388
WIP
suddendust Sep 24, 2025
50b09b7
Merge branch 'main' of github.com:hypertrace/document-store into refa…
suddendust Sep 24, 2025
0f8df71
WIP
suddendust Sep 24, 2025
7f7defe
WIP
suddendust Sep 24, 2025
13e591e
Added test cases
suddendust Sep 26, 2025
e1074b9
Added 1 more integration test case to ensure result consistency b/w f…
suddendust Sep 26, 2025
77e1473
Added test cases for unnest
suddendust Sep 26, 2025
70763bb
Merge branch 'main' into refactor
suddendust Oct 2, 2025
7f76f74
Address comments except integration test
suddendust Oct 2, 2025
40946ad
Revert "Address comments except integration test"
suddendust Oct 2, 2025
41a405f
Merge branch 'refactor' of github.com:suddendust/document-store into …
suddendust Oct 2, 2025
1962554
Reapply "Address comments except integration test"
suddendust Oct 2, 2025
1903a4a
WIP
suddendust Oct 2, 2025
c3d0a32
Test cases working
suddendust Oct 2, 2025
0fa2a62
WIP
suddendust Oct 2, 2025
b6a4cf6
WIP
suddendust Oct 2, 2025
d2fef95
Add more test cases
suddendust Oct 2, 2025
de09a90
Add more test cases
suddendust Oct 2, 2025
4f5e40d
WIP
suddendust Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.hypertrace.core.documentstore.metric.DocStoreMetricProvider;

public interface Datastore {

Set<String> listCollections();

boolean createCollection(String collectionName, Map<String, String> options);
Expand All @@ -19,4 +20,19 @@ public interface Datastore {
DocStoreMetricProvider getDocStoreMetricProvider();

void close();

/**
* Returns a collection with the given name and type. A type can be used to specify different
* storage modes of the collection. For example, a collection can have all top-level fields or a
* single JSON column that contains all the fields. Both collections are handled differently in
* this case.
*
* @param collectionName name of the collection
* @param documentType type of the collection. For PG, we support FLAT and Legacy (for backward
* compatibility)
* @return the corresponding collection impl
*/
default Collection getCollectionForType(String collectionName, DocumentType documentType) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add comment on other methods as well.
for eg: what would be the behavior of getCollection(), createCollection()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see a corresponding implementation change in MongoDatastore.
If we upgrade document-store in entity-service, is the expectation that we would getCollection for mongodb collection and getCollectionForType for postgres collection?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
How is this supposed to be used by the clients/consumers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.is the expectation that we would getCollection for mongodb collection and getCollectionForType for postgres collection?

That's the expectation right now, yes. Now that I think of it, a better approach would be to perhaps, for Mongo, to return an instance of MongoCollection for type NESTED and an IllegalArgumentException otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@suresh-prakash This API is intended to be primarily used for SQL datastores like PG in which we can store both flat and nested documents. For Mongo, since we only support nested, it'll throw an IllegalArgumentException for FLAT doc types (as per this PR).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I am a bit confused here still. The consumers are the ones who are choosing which of the Postgres FLAT/NESTED to use. So, why would the clients again ask the library which one is being used. I mean, the clients would already have this information with them, right?

throw new UnsupportedOperationException("Unsupported collection type: " + documentType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.hypertrace.core.documentstore.postgres;

import org.hypertrace.core.documentstore.CloseableIterator;
import org.hypertrace.core.documentstore.Document;
import org.hypertrace.core.documentstore.model.options.QueryOptions;
import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser;
import org.hypertrace.core.documentstore.query.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* PostgreSQL collection implementation for flat documents. All fields are stored as top-level
* PostgreSQL columns.
*/
public class FlatPostgresCollection extends PostgresCollection {

private static final Logger LOGGER = LoggerFactory.getLogger(FlatPostgresCollection.class);

FlatPostgresCollection(final PostgresClient client, final String collectionName) {
super(client, collectionName);
}

@Override
public CloseableIterator<Document> query(
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) {
PostgresQueryParser queryParser = createParser(query);
return queryWithParser(query, queryParser);
}

@Override
public CloseableIterator<Document> find(
final org.hypertrace.core.documentstore.query.Query query) {
PostgresQueryParser queryParser = createParser(query);
return queryWithParser(query, queryParser);
}

@Override
public long count(
org.hypertrace.core.documentstore.query.Query query, QueryOptions queryOptions) {
PostgresQueryParser queryParser = new PostgresQueryParser(tableIdentifier, query);
return countWithParser(query, queryParser);
}

private PostgresQueryParser createParser(Query query) {
return new PostgresQueryParser(tableIdentifier, PostgresQueryExecutor.transformAndLog(query));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to create thos object every time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, parser maintains state on a per-query level. This is how it is today as well.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.hypertrace.core.documentstore.postgres;

import org.hypertrace.core.documentstore.CloseableIterator;
import org.hypertrace.core.documentstore.Document;
import org.hypertrace.core.documentstore.model.options.QueryOptions;
import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser;
import org.hypertrace.core.documentstore.postgres.query.v1.transformer.NestedPostgresColTransformer;
import org.hypertrace.core.documentstore.query.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* PostgreSQL collection implementation for legacy document storage mode. Fields are stored within
* JSONB columns.
*/
public class NestedPostgresCollection extends PostgresCollection {

private static final Logger LOGGER = LoggerFactory.getLogger(NestedPostgresCollection.class);

NestedPostgresCollection(final PostgresClient client, final String collectionName) {
super(client, collectionName);
}

@Override
public CloseableIterator<Document> query(
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) {
PostgresQueryParser queryParser = createParser(query);
return queryWithParser(query, queryParser);
}

@Override
public CloseableIterator<Document> find(
final org.hypertrace.core.documentstore.query.Query query) {
PostgresQueryParser queryParser = createParser(query);
return queryWithParser(query, queryParser);
}

@Override
public long count(
org.hypertrace.core.documentstore.query.Query query, QueryOptions queryOptions) {
PostgresQueryParser queryParser = createParser(query);
return countWithParser(query, queryParser);
}

private PostgresQueryParser createParser(Query query) {
return new PostgresQueryParser(
tableIdentifier,
PostgresQueryExecutor.transformAndLog(query),
new NestedPostgresColTransformer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,20 @@
import org.hypertrace.core.documentstore.commons.UpdateValidator;
import org.hypertrace.core.documentstore.expression.impl.KeyExpression;
import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException;
import org.hypertrace.core.documentstore.model.options.QueryOptions;
import org.hypertrace.core.documentstore.model.options.ReturnDocumentType;
import org.hypertrace.core.documentstore.model.options.UpdateOptions;
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate;
import org.hypertrace.core.documentstore.postgres.internal.BulkUpdateSubDocsInternalResult;
import org.hypertrace.core.documentstore.postgres.model.DocumentAndId;
import org.hypertrace.core.documentstore.postgres.query.v1.transformer.NestedPostgresColTransformer;
import org.hypertrace.core.documentstore.postgres.subdoc.PostgresSubDocumentUpdater;
import org.hypertrace.core.documentstore.postgres.utils.PostgresUtils;
import org.postgresql.util.PSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Provides {@link Collection} implementation on Postgres using jsonb format */
public class PostgresCollection implements Collection {
public abstract class PostgresCollection implements Collection {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCollection.class);
public static final String ID = "id";
Expand All @@ -98,12 +98,11 @@ public class PostgresCollection implements Collection {
private static final String CREATED_NOW_ALIAS = "created_now_alias";
private static final CloseableIterator<Document> EMPTY_ITERATOR =
CloseableIterator.emptyIterator();
private static final String FLAT_STRUCTURE_COLLECTION_KEY = "flatStructureCollection";

private final PostgresClient client;
private final PostgresTableIdentifier tableIdentifier;
protected final PostgresClient client;
protected final PostgresTableIdentifier tableIdentifier;
private final PostgresSubDocumentUpdater subDocUpdater;
private final PostgresQueryExecutor queryExecutor;
protected final PostgresQueryExecutor queryExecutor;
private final UpdateValidator updateValidator;

public PostgresCollection(final PostgresClient client, final String collectionName) {
Expand All @@ -115,7 +114,7 @@ public PostgresCollection(final PostgresClient client, final String collectionNa
this.tableIdentifier = tableIdentifier;
this.subDocUpdater =
new PostgresSubDocumentUpdater(new PostgresQueryBuilder(this.tableIdentifier));
this.queryExecutor = new PostgresQueryExecutor(this.tableIdentifier);
this.queryExecutor = new PostgresQueryExecutor();
this.updateValidator = new CommonUpdateValidator();
}

Expand Down Expand Up @@ -480,26 +479,12 @@ private CloseableIterator<Document> search(Query query, boolean removeDocumentId
return closeableIterator;
} catch (SQLException e) {
LOGGER.error(
"SQLException in querying documents - query: {}, sqlQuery:{}", query, pgSqlQuery, e);
"SQLException in querying documents - query: {}, sqlQuery: {}", query, pgSqlQuery, e);
}

return EMPTY_ITERATOR;
}

@Override
public CloseableIterator<Document> find(
final org.hypertrace.core.documentstore.query.Query query) {
return queryExecutor.execute(client.getConnection(), query);
}

@Override
public CloseableIterator<Document> query(
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) {
String flatStructureCollectionName =
client.getCustomParameters().get(FLAT_STRUCTURE_COLLECTION_KEY);
return queryExecutor.execute(client.getConnection(), query, flatStructureCollectionName);
}

@Override
public Optional<Document> update(
final org.hypertrace.core.documentstore.query.Query query,
Expand All @@ -511,7 +496,7 @@ public Optional<Document> update(
try (final Connection connection = client.getPooledConnection()) {
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser parser =
new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser(
tableIdentifier, query);
tableIdentifier, query, new NestedPostgresColTransformer());
final String selectQuery = parser.buildSelectQueryForUpdate();

try (final PreparedStatement preparedStatement =
Expand Down Expand Up @@ -546,7 +531,10 @@ public Optional<Document> update(
.build();

try (final CloseableIterator<Document> iterator =
queryExecutor.execute(connection, findByIdQuery)) {
queryWithParser(
findByIdQuery,
new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser(
tableIdentifier, query))) {
returnDocument = getFirstDocument(iterator).orElseThrow();
}
} else if (updateOptions.getReturnDocumentType() == NONE) {
Expand Down Expand Up @@ -610,28 +598,6 @@ public CloseableIterator<Document> bulkUpdate(
}
}

@Override
public long count(
org.hypertrace.core.documentstore.query.Query query, QueryOptions queryOptions) {
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser =
new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser(
tableIdentifier, query);
String subQuery = queryParser.parse();
String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) p(count)", subQuery);
try {
PreparedStatement preparedStatement =
queryExecutor.buildPreparedStatement(
sqlQuery, queryParser.getParamsBuilder().build(), client.getConnection());
ResultSet resultSet = preparedStatement.executeQuery();
resultSet.next();
return resultSet.getLong(1);
} catch (SQLException e) {
LOGGER.error(
"SQLException querying documents. original query: {}, sql query:", query, sqlQuery, e);
throw new RuntimeException(e);
}
}

@Override
public boolean delete(Key key) {
String deleteSQL = String.format("DELETE FROM %s WHERE %s = ?", tableIdentifier, ID);
Expand Down Expand Up @@ -773,7 +739,7 @@ public long total(Query query) {
count = resultSet.getLong(1);
}
} catch (SQLException e) {
LOGGER.error("SQLException querying documents. query: {}", query, e);
LOGGER.error("SQLException querying documents. query: {}, sqlQuery: {}", query, sqlQuery, e);
}
return count;
}
Expand Down Expand Up @@ -856,6 +822,43 @@ public void drop() {
}
}

protected long countWithParser(
org.hypertrace.core.documentstore.query.Query query,
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser) {
String subQuery = queryParser.parse();
String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) p(countWithParser)", subQuery);
try {
PreparedStatement preparedStatement =
queryExecutor.buildPreparedStatement(
sqlQuery, queryParser.getParamsBuilder().build(), client.getConnection());
ResultSet resultSet = preparedStatement.executeQuery();
resultSet.next();
return resultSet.getLong(1);
} catch (SQLException e) {
LOGGER.error(
"SQLException querying documents. Original query: {}, sql query: {}", query, sqlQuery, e);
throw new RuntimeException(e);
}
}

protected CloseableIterator<Document> queryWithParser(
org.hypertrace.core.documentstore.query.Query query,
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser) {
try {
ResultSet resultSet = queryExecutor.execute(client.getConnection(), queryParser);

if (queryParser.getPgColTransformer() instanceof NestedPostgresColTransformer) {
return !query.getSelections().isEmpty()
? new PostgresResultIteratorWithMetaData(resultSet)
: new PostgresResultIterator(resultSet);
} else {
return new PostgresResultIteratorWithBasicTypes(resultSet, DocumentType.FLAT);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private Map<String, String> getDocIdToTenantIdMap(BulkArrayValueUpdateRequest request) {
return request.getKeys().stream()
.collect(
Expand Down Expand Up @@ -1369,7 +1372,13 @@ private void addColumnToJsonNode(
if (jsonString != null) {
try {
JsonNode jsonValue = MAPPER.readTree(jsonString);
jsonNode.set(columnName, jsonValue);
// Handle like MetaData iterator - check for encoded nested fields
if (PostgresUtils.isEncodedNestedField(columnName)) {
handleNestedField(
PostgresUtils.decodeAliasForNestedField(columnName), jsonNode, jsonValue);
} else {
jsonNode.set(columnName, jsonValue);
}
} catch (IOException e) {
// Fallback to string if JSON parsing fails
jsonNode.put(columnName, jsonString);
Expand All @@ -1385,6 +1394,25 @@ private void addColumnToJsonNode(
break;
}
}

private void handleNestedField(String columnName, ObjectNode rootNode, JsonNode leafNodeValue) {
List<String> keys = PostgresUtils.splitNestedField(columnName);
// Find the leaf node or create one for adding property value
ObjectNode curNode = rootNode;
for (int l = 0; l < keys.size() - 1; l++) {
String key = keys.get(l);
JsonNode node = curNode.get(key);
if (node == null || !node.isObject()) {
ObjectNode newNode = MAPPER.createObjectNode();
curNode.set(key, newNode);
curNode = newNode;
} else {
curNode = (ObjectNode) node;
}
}
String leafKey = keys.get(keys.size() - 1);
curNode.set(leafKey, leafNodeValue);
}
}

static class PostgresResultIterator implements CloseableIterator<Document> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.core.documentstore.Collection;
import org.hypertrace.core.documentstore.Datastore;
import org.hypertrace.core.documentstore.DocumentType;
import org.hypertrace.core.documentstore.metric.DocStoreMetricProvider;
import org.hypertrace.core.documentstore.metric.postgres.PostgresDocStoreMetricProvider;
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
Expand Down Expand Up @@ -142,13 +143,33 @@ public boolean deleteCollection(String collectionName) {
return false;
}

/**
* For PG, this method returns the legacy collection
*
* @param collectionName the legacy collection name
* @return the legacy PG collection
*/
@Override
public Collection getCollection(String collectionName) {
Set<String> tables = listCollections();
if (!tables.contains(collectionName)) {
createCollection(collectionName, null);
}
return new PostgresCollection(client, collectionName);
return new NestedPostgresCollection(client, collectionName);
}

@Override
public Collection getCollectionForType(String collectionName, DocumentType documentType) {
switch (documentType) {
case FLAT:
{
return new FlatPostgresCollection(client, collectionName);
}
case NESTED:
return getCollection(collectionName);
default:
throw new IllegalArgumentException("Unknown collection type: " + documentType);
}
}

@Override
Expand Down
Loading
Loading