Skip to content

Commit 20d6116

Browse files
committed
fixed integration test
1 parent e869f86 commit 20d6116

13 files changed

+673
-35
lines changed

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate;
7878
import org.hypertrace.core.documentstore.postgres.internal.BulkUpdateSubDocsInternalResult;
7979
import org.hypertrace.core.documentstore.postgres.model.DocumentAndId;
80+
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnRegistry;
8081
import org.hypertrace.core.documentstore.postgres.subdoc.PostgresSubDocumentUpdater;
8182
import org.hypertrace.core.documentstore.postgres.utils.PostgresUtils;
8283
import org.postgresql.util.PSQLException;
@@ -101,19 +102,36 @@ public class PostgresCollection implements Collection {
101102

102103
private final PostgresClient client;
103104
private final PostgresTableIdentifier tableIdentifier;
105+
private final PostgresColumnRegistry columnRegistry;
104106
private final PostgresSubDocumentUpdater subDocUpdater;
105107
private final PostgresQueryExecutor queryExecutor;
106108
private final UpdateValidator updateValidator;
107109

108110
public PostgresCollection(final PostgresClient client, final String collectionName) {
109-
this(client, PostgresTableIdentifier.parse(collectionName));
111+
this(client, PostgresTableIdentifier.parse(collectionName), null);
112+
}
113+
114+
public PostgresCollection(
115+
final PostgresClient client,
116+
final String collectionName,
117+
final PostgresColumnRegistry columnRegistry) {
118+
this(client, PostgresTableIdentifier.parse(collectionName), columnRegistry);
110119
}
111120

112121
PostgresCollection(final PostgresClient client, final PostgresTableIdentifier tableIdentifier) {
122+
this(client, tableIdentifier, null);
123+
}
124+
125+
PostgresCollection(
126+
final PostgresClient client,
127+
final PostgresTableIdentifier tableIdentifier,
128+
final PostgresColumnRegistry columnRegistry) {
113129
this.client = client;
114130
this.tableIdentifier = tableIdentifier;
131+
this.columnRegistry = columnRegistry;
115132
this.subDocUpdater =
116-
new PostgresSubDocumentUpdater(new PostgresQueryBuilder(this.tableIdentifier));
133+
new PostgresSubDocumentUpdater(
134+
new PostgresQueryBuilder(this.tableIdentifier, this.columnRegistry));
117135
this.queryExecutor = new PostgresQueryExecutor(this.tableIdentifier);
118136
this.updateValidator = new CommonUpdateValidator();
119137
}
@@ -488,15 +506,16 @@ private CloseableIterator<Document> search(Query query, boolean removeDocumentId
488506
@Override
489507
public CloseableIterator<Document> find(
490508
final org.hypertrace.core.documentstore.query.Query query) {
491-
return queryExecutor.execute(client.getConnection(), query);
509+
return queryExecutor.execute(client.getConnection(), query, null, columnRegistry);
492510
}
493511

494512
@Override
495513
public CloseableIterator<Document> query(
496514
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) {
497515
String flatStructureCollectionName =
498516
client.getCustomParameters().get(FLAT_STRUCTURE_COLLECTION_KEY);
499-
return queryExecutor.execute(client.getConnection(), query, flatStructureCollectionName);
517+
return queryExecutor.execute(
518+
client.getConnection(), query, flatStructureCollectionName, columnRegistry);
500519
}
501520

502521
@Override
@@ -510,7 +529,7 @@ public Optional<Document> update(
510529
try (final Connection connection = client.getPooledConnection()) {
511530
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser parser =
512531
new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser(
513-
tableIdentifier, query);
532+
tableIdentifier, query, columnRegistry);
514533
final String selectQuery = parser.buildSelectQueryForUpdate();
515534

516535
try (final PreparedStatement preparedStatement =
@@ -545,7 +564,7 @@ public Optional<Document> update(
545564
.build();
546565

547566
try (final CloseableIterator<Document> iterator =
548-
queryExecutor.execute(connection, findByIdQuery)) {
567+
queryExecutor.execute(connection, findByIdQuery, null, columnRegistry)) {
549568
returnDocument = getFirstDocument(iterator).orElseThrow();
550569
}
551570
} else if (updateOptions.getReturnDocumentType() == NONE) {
@@ -1568,4 +1587,32 @@ private static Optional<JsonNode> mapValueToJsonNode(int columnType, String colu
15681587
return Optional.empty();
15691588
}
15701589
}
1590+
1591+
/**
1592+
* Iterator that extracts clean documents from the 'document' column for queries without
1593+
* selections, removing the wrapper structure.
1594+
*/
1595+
static class PostgresResultIteratorCleanDocument extends PostgresResultIterator {
1596+
1597+
public PostgresResultIteratorCleanDocument(ResultSet resultSet) {
1598+
super(resultSet, true);
1599+
}
1600+
1601+
@Override
1602+
protected Document prepareDocument() throws SQLException, IOException {
1603+
// For queries without selections, extract the document content directly
1604+
String documentString = resultSet.getString(DOCUMENT);
1605+
if (documentString != null) {
1606+
ObjectNode jsonNode = (ObjectNode) MAPPER.readTree(documentString);
1607+
// Remove document ID if needed
1608+
if (shouldRemoveDocumentId()) {
1609+
jsonNode.remove(DOCUMENT_ID);
1610+
}
1611+
return new JSONDocument(MAPPER.writeValueAsString(jsonNode));
1612+
}
1613+
1614+
// Fallback to empty document if no document column
1615+
return new JSONDocument("{}");
1616+
}
1617+
}
15711618
}

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.Map;
1616
import java.util.Optional;
1717
import java.util.Set;
18+
import java.util.concurrent.ConcurrentHashMap;
1819
import lombok.NonNull;
1920
import lombok.extern.slf4j.Slf4j;
2021
import org.hypertrace.core.documentstore.Collection;
@@ -24,6 +25,9 @@
2425
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
2526
import org.hypertrace.core.documentstore.model.config.DatastoreConfig;
2627
import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig;
28+
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnRegistry;
29+
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnRegistryImpl;
30+
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnType;
2731
import org.slf4j.Logger;
2832
import org.slf4j.LoggerFactory;
2933

@@ -37,6 +41,9 @@ public class PostgresDatastore implements Datastore {
3741
private final String database;
3842
private final DocStoreMetricProvider docStoreMetricProvider;
3943

44+
// Cache for PostgreSQL column registries per table
45+
private final Map<String, PostgresColumnRegistry> registryCache = new ConcurrentHashMap<>();
46+
4047
public PostgresDatastore(@NonNull final DatastoreConfig datastoreConfig) {
4148
final ConnectionConfig connectionConfig = datastoreConfig.connectionConfig();
4249

@@ -148,7 +155,81 @@ public Collection getCollection(String collectionName) {
148155
if (!tables.contains(collectionName)) {
149156
createCollection(collectionName, null);
150157
}
151-
return new PostgresCollection(client, collectionName);
158+
159+
// Create or get cached registry for this collection
160+
PostgresColumnRegistry registry = createOrGetRegistry(collectionName);
161+
162+
return new PostgresCollection(client, collectionName, registry);
163+
}
164+
165+
/**
166+
* Creates or retrieves a cached PostgresColumnRegistry for the specified collection. The registry
167+
* is cached to avoid repeated database schema queries.
168+
*
169+
* @param collectionName the collection name to create/get registry for
170+
* @return the PostgresColumnRegistry for the collection
171+
*/
172+
private PostgresColumnRegistry createOrGetRegistry(String collectionName) {
173+
return registryCache.computeIfAbsent(
174+
collectionName,
175+
tableName -> {
176+
try {
177+
PostgresColumnRegistry registry =
178+
new PostgresColumnRegistryImpl(client.getConnection(), tableName);
179+
180+
LOGGER.debug(
181+
"Created PostgresColumnRegistry for collection '{}' with {} first-class columns",
182+
tableName,
183+
registry.getAllFirstClassColumns().size());
184+
185+
return registry;
186+
} catch (SQLException e) {
187+
LOGGER.warn(
188+
"Failed to create PostgresColumnRegistry for collection '{}': {}. "
189+
+ "Falling back to JSONB-only behavior.",
190+
tableName,
191+
e.getMessage());
192+
193+
// Return an empty registry that treats all fields as JSONB
194+
return createEmptyRegistry(tableName);
195+
}
196+
});
197+
}
198+
199+
/**
200+
* Creates an empty registry that treats all fields as JSONB fields. This is used as a fallback
201+
* when registry creation fails.
202+
*
203+
* @param tableName the table name
204+
* @return an empty registry
205+
*/
206+
private PostgresColumnRegistry createEmptyRegistry(String tableName) {
207+
return new PostgresColumnRegistry() {
208+
@Override
209+
public boolean isFirstClassColumn(String fieldName) {
210+
return false; // All fields are treated as JSONB
211+
}
212+
213+
@Override
214+
public Optional<PostgresColumnType> getColumnType(String fieldName) {
215+
return Optional.empty();
216+
}
217+
218+
@Override
219+
public Optional<String> getColumnName(String fieldName) {
220+
return Optional.empty();
221+
}
222+
223+
@Override
224+
public Set<String> getAllFirstClassColumns() {
225+
return Set.of(); // No first-class columns
226+
}
227+
228+
@Override
229+
public String getTableName() {
230+
return tableName;
231+
}
232+
};
152233
}
153234

154235
@Override

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryBuilder.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.hypertrace.core.documentstore.model.subdoc.UpdateOperator;
2222
import org.hypertrace.core.documentstore.postgres.Params.Builder;
2323
import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser;
24+
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnRegistry;
2425
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresAddToListIfAbsentParser;
2526
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresAddValueParser;
2627
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresAppendToListParser;
@@ -44,12 +45,14 @@ public class PostgresQueryBuilder {
4445
entry(APPEND_TO_LIST, new PostgresAppendToListParser()));
4546

4647
@Getter private final PostgresTableIdentifier tableIdentifier;
48+
private final PostgresColumnRegistry columnRegistry;
4749

4850
public String getSubDocUpdateQuery(
4951
final Query query,
5052
final Collection<SubDocumentUpdate> updates,
5153
final Params.Builder paramBuilder) {
52-
final PostgresQueryParser baseQueryParser = new PostgresQueryParser(tableIdentifier, query);
54+
final PostgresQueryParser baseQueryParser =
55+
new PostgresQueryParser(tableIdentifier, query, columnRegistry);
5356
String selectQuery =
5457
String.format(
5558
"(SELECT %s, %s FROM %s AS t0 %s)",

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
import lombok.extern.slf4j.Slf4j;
1111
import org.hypertrace.core.documentstore.CloseableIterator;
1212
import org.hypertrace.core.documentstore.Document;
13-
import org.hypertrace.core.documentstore.postgres.PostgresCollection.PostgresResultIterator;
1413
import org.hypertrace.core.documentstore.postgres.PostgresCollection.PostgresResultIteratorWithMetaData;
1514
import org.hypertrace.core.documentstore.postgres.query.v1.transformer.PostgresQueryTransformer;
15+
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnRegistry;
1616
import org.hypertrace.core.documentstore.query.Query;
1717

1818
@Slf4j
@@ -21,27 +21,38 @@ public class PostgresQueryExecutor {
2121
private final PostgresTableIdentifier tableIdentifier;
2222

2323
public CloseableIterator<Document> execute(final Connection connection, final Query query) {
24-
return execute(connection, query, null);
24+
return execute(connection, query, null, null);
2525
}
2626

2727
public CloseableIterator<Document> execute(
2828
final Connection connection, final Query query, String flatStructureCollectionName) {
29+
return execute(connection, query, flatStructureCollectionName, null);
30+
}
31+
32+
public CloseableIterator<Document> execute(
33+
final Connection connection,
34+
final Query query,
35+
String flatStructureCollectionName,
36+
PostgresColumnRegistry columnRegistry) {
2937
final org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser =
3038
new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser(
31-
tableIdentifier, transformAndLog(query), flatStructureCollectionName);
39+
tableIdentifier, transformAndLog(query), flatStructureCollectionName, columnRegistry);
3240
final String sqlQuery = queryParser.parse();
3341
try {
3442
final PreparedStatement preparedStatement =
3543
buildPreparedStatement(sqlQuery, queryParser.getParamsBuilder().build(), connection);
3644
log.debug("Executing executeQueryV1 sqlQuery:{}", preparedStatement.toString());
3745
final ResultSet resultSet = preparedStatement.executeQuery();
3846

39-
if ((tableIdentifier.getTableName().equals(flatStructureCollectionName))) {
40-
return new PostgresCollection.PostgresResultIteratorWithBasicTypes(resultSet);
47+
// For queries with selections, use PostgresResultIteratorWithMetaData
48+
// as it properly handles nested field decoding
49+
if (query.getSelections().size() > 0) {
50+
return new PostgresResultIteratorWithMetaData(resultSet);
4151
}
42-
return query.getSelections().size() > 0
43-
? new PostgresResultIteratorWithMetaData(resultSet)
44-
: new PostgresResultIterator(resultSet);
52+
53+
// For queries without selections, use a custom iterator that extracts clean documents
54+
// from the 'document' column instead of returning wrapped results
55+
return new PostgresCollection.PostgresResultIteratorCleanDocument(resultSet);
4556
} catch (SQLException e) {
4657
log.error(
4758
"SQLException querying documents. original query: " + query + ", sql query:" + sqlQuery,

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/PostgresQueryParser.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.hypertrace.core.documentstore.postgres.query.v1.vistors.PostgresSelectTypeExpressionVisitor;
2020
import org.hypertrace.core.documentstore.postgres.query.v1.vistors.PostgresSortTypeExpressionVisitor;
2121
import org.hypertrace.core.documentstore.postgres.query.v1.vistors.PostgresUnnestFilterTypeExpressionVisitor;
22+
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnRegistry;
2223
import org.hypertrace.core.documentstore.query.Pagination;
2324
import org.hypertrace.core.documentstore.query.Query;
2425

@@ -29,6 +30,7 @@ public class PostgresQueryParser {
2930
@Getter private final PostgresTableIdentifier tableIdentifier;
3031
@Getter private final Query query;
3132
@Getter private final String flatStructureCollectionName;
33+
@Getter private final PostgresColumnRegistry columnRegistry;
3234

3335
@Setter String finalTableName;
3436
@Getter private final Builder paramsBuilder = Params.newBuilder();
@@ -47,15 +49,29 @@ public class PostgresQueryParser {
4749

4850
public PostgresQueryParser(
4951
PostgresTableIdentifier tableIdentifier, Query query, String flatStructureCollectionName) {
52+
this(tableIdentifier, query, flatStructureCollectionName, null);
53+
}
54+
55+
public PostgresQueryParser(
56+
PostgresTableIdentifier tableIdentifier,
57+
Query query,
58+
String flatStructureCollectionName,
59+
PostgresColumnRegistry columnRegistry) {
5060
this.tableIdentifier = tableIdentifier;
5161
this.query = query;
5262
this.flatStructureCollectionName = flatStructureCollectionName;
63+
this.columnRegistry = columnRegistry;
5364
this.finalTableName = tableIdentifier.toString();
5465
toPgColumnTransformer = new FieldToPgColumnTransformer(this);
5566
}
5667

5768
public PostgresQueryParser(PostgresTableIdentifier tableIdentifier, Query query) {
58-
this(tableIdentifier, query, null);
69+
this(tableIdentifier, query, null, null);
70+
}
71+
72+
public PostgresQueryParser(
73+
PostgresTableIdentifier tableIdentifier, Query query, PostgresColumnRegistry columnRegistry) {
74+
this(tableIdentifier, query, null, columnRegistry);
5975
}
6076

6177
public String parse() {

0 commit comments

Comments
 (0)