diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java index c72e48ba..981e5b8f 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java @@ -6,6 +6,7 @@ import com.mongodb.MongoClientSettings; import com.typesafe.config.Config; import java.time.Duration; +import java.util.Collections; import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode; import org.hypertrace.core.documentstore.model.config.DatabaseType; import org.hypertrace.core.documentstore.model.config.DatastoreConfig; @@ -34,7 +35,8 @@ public DatastoreConfig convert(final Config config) { null, AggregatePipelineMode.DEFAULT_ALWAYS, DataFreshness.SYSTEM_DEFAULT, - Duration.ofMinutes(20)) { + Duration.ofMinutes(20), + Collections.emptyMap()) { public MongoClientSettings toSettings() { final MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder() @@ -88,7 +90,8 @@ public DatastoreConfig convert(final Config config) { connectionConfig.database(), connectionConfig.credentials(), connectionConfig.applicationName(), - connectionConfig.connectionPoolConfig()) { + connectionConfig.connectionPoolConfig(), + connectionConfig.customParameters()) { @Override public String toConnectionString() { return config.hasPath("url") diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java index db11c43d..af24ba0b 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java @@ -5,7 +5,10 @@ import com.google.common.base.Preconditions; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import lombok.AccessLevel; import lombok.AllArgsConstructor; @@ -34,18 +37,21 @@ public class ConnectionConfig { @NonNull AggregatePipelineMode aggregationPipelineMode; @NonNull DataFreshness dataFreshness; @NonNull Duration queryTimeout; + @NonNull Map customParameters; public ConnectionConfig( @NonNull List<@NonNull Endpoint> endpoints, @NonNull String database, - @Nullable ConnectionCredentials credentials) { + @Nullable ConnectionCredentials credentials, + Map customParameters) { this( endpoints, database, credentials, AggregatePipelineMode.DEFAULT_ALWAYS, DataFreshness.SYSTEM_DEFAULT, - Duration.ofMinutes(20)); + Duration.ofMinutes(20), + customParameters != null ? customParameters : Collections.emptyMap()); } public static ConnectionConfigBuilder builder() { @@ -63,6 +69,13 @@ public static class ConnectionConfigBuilder { ConnectionCredentials credentials; String applicationName = DEFAULT_APP_NAME; String replicaSet; + Map customParameters = new HashMap<>(); + + public ConnectionConfigBuilder customParameter(String key, String value) { + this.customParameters.put(key, value); + return this; + } + ConnectionPoolConfig connectionPoolConfig; AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS; DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT; @@ -96,7 +109,8 @@ public ConnectionConfig build() { connectionPoolConfig, aggregationPipelineMode, dataFreshness, - queryTimeout); + queryTimeout, + customParameters); case POSTGRES: return new PostgresConnectionConfig( @@ -104,7 +118,8 @@ public ConnectionConfig build() { database, credentials, applicationName, - connectionPoolConfig); + connectionPoolConfig, + customParameters); } throw new IllegalArgumentException("Unsupported database type: " + type); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java index 00ebde33..91607ed2 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java @@ -11,9 +11,13 @@ import org.hypertrace.core.documentstore.model.config.DatastoreConfig.DatastoreConfigBuilder; import org.hypertrace.core.documentstore.model.config.Endpoint.EndpointBuilder; import org.hypertrace.core.documentstore.model.options.DataFreshness; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Value public class TypesafeConfigDatastoreConfigExtractor { + private static final Logger LOGGER = + LoggerFactory.getLogger(TypesafeConfigDatastoreConfigExtractor.class); private static final String DEFAULT_HOST_KEY = "host"; private static final String DEFAULT_PORT_KEY = "port"; private static final String DEFAULT_ENDPOINTS_KEY = "endpoints"; @@ -29,6 +33,7 @@ public class TypesafeConfigDatastoreConfigExtractor { private static final String DEFAULT_AGGREGATION_PIPELINE_MODE_KEY = "aggregationPipelineMode"; private static final String DEFAULT_DATA_FRESHNESS_KEY = "dataFreshness"; private static final String DEFAULT_QUERY_TIMEOUT_KEY = "queryTimeout"; + private static final String DEFAULT_CUSTOM_PARAMETERS_PREFIX = "customParams"; @NonNull Config config; DatastoreConfigBuilder datastoreConfigBuilder; @@ -74,7 +79,8 @@ private TypesafeConfigDatastoreConfigExtractor( .poolConnectionSurrenderTimeoutKey(DEFAULT_CONNECTION_IDLE_TIME_KEY) .aggregationPipelineMode(DEFAULT_AGGREGATION_PIPELINE_MODE_KEY) .dataFreshnessKey(DEFAULT_DATA_FRESHNESS_KEY) - .queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY); + .queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY) + .customParametersKey(DEFAULT_CUSTOM_PARAMETERS_PREFIX); } public static TypesafeConfigDatastoreConfigExtractor from( @@ -169,6 +175,27 @@ public TypesafeConfigDatastoreConfigExtractor replicaSetKey(@NonNull final Strin return this; } + public TypesafeConfigDatastoreConfigExtractor customParametersKey(@NonNull final String key) { + if (config.hasPath(key)) { + try { + // Try to extract parameters as an object (Config) + Config paramConfig = config.getConfig(key); + paramConfig + .entrySet() + .forEach( + entry -> { + connectionConfigBuilder.customParameter( + entry.getKey(), paramConfig.getString(entry.getKey())); + }); + } catch (Exception e) { + // If not a Config object, log warning + LOGGER.warn("Custom parameters key '{}' exists but is not a config object", key); + } + } + + return this; + } + public TypesafeConfigDatastoreConfigExtractor poolMaxConnectionsKey(@NonNull final String key) { if (config.hasPath(key)) { connectionPoolConfigBuilder.maxConnections(config.getInt(key)); @@ -228,6 +255,7 @@ public DatastoreConfig extract() { connectionConfigBuilder .connectionPoolConfig(connectionPoolConfigBuilder.build()) .credentials(connectionCredentialsBuilder.build()) + .customParameters(connectionConfigBuilder.customParameters()) .build()) .build(); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java index a5b36965..db878bf5 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java @@ -13,6 +13,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -53,14 +54,16 @@ public MongoConnectionConfig( @Nullable final ConnectionPoolConfig connectionPoolConfig, @NonNull final AggregatePipelineMode aggregationPipelineMode, @NonNull final DataFreshness dataFreshness, - @NonNull final Duration queryTimeout) { + @NonNull final Duration queryTimeout, + @NonNull final Map customParameters) { super( ensureAtLeastOneEndpoint(endpoints), getDatabaseOrDefault(database), getCredentialsOrDefault(credentials, database), aggregationPipelineMode, dataFreshness, - queryTimeout); + queryTimeout, + customParameters); this.applicationName = applicationName; this.replicaSetName = replicaSetName; this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java index a867260f..925e4b1e 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; import javax.annotation.Nullable; @@ -45,11 +46,13 @@ public PostgresConnectionConfig( @Nullable final String database, @Nullable final ConnectionCredentials credentials, @NonNull final String applicationName, - @Nullable final ConnectionPoolConfig connectionPoolConfig) { + @Nullable final ConnectionPoolConfig connectionPoolConfig, + @NonNull final Map customParameters) { super( ensureSingleEndpoint(endpoints), getDatabaseOrDefault(database), - getCredentialsOrDefault(credentials)); + getCredentialsOrDefault(credentials), + customParameters); this.applicationName = applicationName; this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java index 1f99831b..7ce94ac2 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java @@ -6,6 +6,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.Duration; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; import org.hypertrace.core.documentstore.model.config.postgres.PostgresDefaults; @@ -51,6 +52,10 @@ public Connection getPooledConnection() throws SQLException { return connectionPool.getConnection(); } + public Map getCustomParameters() { + return connectionConfig.customParameters(); + } + public void close() { if (connection != null) { try { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index 72933808..5582889a 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -32,6 +32,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.math.BigInteger; +import java.sql.Array; import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.PreparedStatement; @@ -96,6 +97,7 @@ public class PostgresCollection implements Collection { private static final String CREATED_NOW_ALIAS = "created_now_alias"; private static final CloseableIterator EMPTY_ITERATOR = CloseableIterator.emptyIterator(); + private static final String FLAT_STRUCTURE_COLLECTION_KEY = "flatStructureCollection"; private final PostgresClient client; private final PostgresTableIdentifier tableIdentifier; @@ -492,7 +494,9 @@ public CloseableIterator find( @Override public CloseableIterator query( final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) { - return queryExecutor.execute(client.getConnection(), query); + String flatStructureCollectionName = + client.getCustomParameters().get(FLAT_STRUCTURE_COLLECTION_KEY); + return queryExecutor.execute(client.getConnection(), query, flatStructureCollectionName); } @Override @@ -1254,6 +1258,130 @@ private String getSubDocUpdateQuery() { tableIdentifier, DOCUMENT, DOCUMENT, ID); } + static class PostgresResultIteratorWithBasicTypes extends PostgresResultIterator { + + public PostgresResultIteratorWithBasicTypes(ResultSet resultSet) { + super(resultSet); + } + + @Override + public Document next() { + try { + if (!cursorMovedForward) { + resultSet.next(); + } + // reset the cursorMovedForward state, if it was forwarded in hasNext. + cursorMovedForward = false; + return prepareDocument(); + } catch (IOException | SQLException e) { + System.out.println("prepare document failed!"); + closeResultSet(); + return JSONDocument.errorDocument(e.getMessage()); + } + } + + protected Document prepareDocument() throws SQLException, IOException { + ObjectNode jsonNode = MAPPER.createObjectNode(); + + // Get metadata to iterate through all columns + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + + for (int i = 1; i <= columnCount; i++) { + String columnName = metaData.getColumnName(i); + String columnType = metaData.getColumnTypeName(i); + + addColumnToJsonNode(jsonNode, columnName, columnType, i); + } + + // Remove document ID if needed + if (shouldRemoveDocumentId()) { + jsonNode.remove(DOCUMENT_ID); + } + + return new JSONDocument(MAPPER.writeValueAsString(jsonNode)); + } + + private void addColumnToJsonNode( + ObjectNode jsonNode, String columnName, String columnType, int columnIndex) + throws SQLException { + switch (columnType.toLowerCase()) { + case "bool": + case "boolean": + boolean boolValue = resultSet.getBoolean(columnIndex); + if (!resultSet.wasNull()) { + jsonNode.put(columnName, boolValue); + } + break; + + case "int4": + case "integer": + int intValue = resultSet.getInt(columnIndex); + if (!resultSet.wasNull()) { + jsonNode.put(columnName, intValue); + } + break; + + case "int8": + case "bigint": + long longValue = resultSet.getLong(columnIndex); + if (!resultSet.wasNull()) { + jsonNode.put(columnName, longValue); + } + break; + + case "float8": + case "double": + double doubleValue = resultSet.getDouble(columnIndex); + if (!resultSet.wasNull()) { + jsonNode.put(columnName, doubleValue); + } + break; + + case "text": + case "varchar": + String stringValue = resultSet.getString(columnIndex); + if (stringValue != null) { + jsonNode.put(columnName, stringValue); + } + break; + + case "_text": // text array + Array array = resultSet.getArray(columnIndex); + if (array != null) { + String[] stringArray = (String[]) array.getArray(); + ArrayNode arrayNode = MAPPER.createArrayNode(); + for (String item : stringArray) { + arrayNode.add(item); + } + jsonNode.set(columnName, arrayNode); + } + break; + + case "jsonb": + case "json": + String jsonString = resultSet.getString(columnIndex); + if (jsonString != null) { + try { + JsonNode jsonValue = MAPPER.readTree(jsonString); + jsonNode.set(columnName, jsonValue); + } catch (IOException e) { + // Fallback to string if JSON parsing fails + jsonNode.put(columnName, jsonString); + } + } + break; + + default: + Object objectValue = resultSet.getObject(columnIndex); + if (objectValue != null) { + jsonNode.put(columnName, objectValue.toString()); + } + break; + } + } + } + static class PostgresResultIterator implements CloseableIterator { protected final ObjectMapper MAPPER = new ObjectMapper(); @@ -1310,7 +1438,7 @@ protected Document prepareDocument() throws SQLException, IOException { String documentString = resultSet.getString(DOCUMENT); ObjectNode jsonNode = (ObjectNode) MAPPER.readTree(documentString); // internal iterators may need document id - if (removeDocumentId) { + if (shouldRemoveDocumentId()) { jsonNode.remove(DOCUMENT_ID); } // Add Timestamps to Document @@ -1336,6 +1464,10 @@ protected void closeResultSet() { public void close() { closeResultSet(); } + + protected boolean shouldRemoveDocumentId() { + return removeDocumentId; + } } static class PostgresResultIteratorWithMetaData extends PostgresResultIterator { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java index 76a0f11a..389a52b8 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java @@ -21,15 +21,24 @@ public class PostgresQueryExecutor { private final PostgresTableIdentifier tableIdentifier; public CloseableIterator execute(final Connection connection, final Query query) { + return execute(connection, query, null); + } + + public CloseableIterator execute( + final Connection connection, final Query query, String flatStructureCollectionName) { final org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser = new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser( - tableIdentifier, transformAndLog(query)); + tableIdentifier, transformAndLog(query), flatStructureCollectionName); final String sqlQuery = queryParser.parse(); try { final PreparedStatement preparedStatement = buildPreparedStatement(sqlQuery, queryParser.getParamsBuilder().build(), connection); log.debug("Executing executeQueryV1 sqlQuery:{}", preparedStatement.toString()); final ResultSet resultSet = preparedStatement.executeQuery(); + + if ((tableIdentifier.getTableName().equals(flatStructureCollectionName))) { + return new PostgresCollection.PostgresResultIteratorWithBasicTypes(resultSet); + } return query.getSelections().size() > 0 ? new PostgresResultIteratorWithMetaData(resultSet) : new PostgresResultIterator(resultSet); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresTableIdentifier.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresTableIdentifier.java index 3e38db3a..17f97d1f 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresTableIdentifier.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresTableIdentifier.java @@ -14,6 +14,7 @@ public class PostgresTableIdentifier { @Nullable private final String schema; @Getter @Nonnull private final String quotedTable; + @Getter @Nonnull private final String tableName; public Optional getSchema() { return Optional.ofNullable(this.schema); @@ -26,6 +27,7 @@ public Optional getSchema() { PostgresTableIdentifier(@Nullable String schema, @Nonnull String tableName) { this.schema = schema; this.quotedTable = "\"" + tableName + "\""; + this.tableName = tableName; } public static PostgresTableIdentifier parse(String tableString) { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/PostgresQueryParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/PostgresQueryParser.java index 8070f46d..c9a7a55b 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/PostgresQueryParser.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/PostgresQueryParser.java @@ -28,6 +28,7 @@ public class PostgresQueryParser { @Getter private final PostgresTableIdentifier tableIdentifier; @Getter private final Query query; + @Getter private final String flatStructureCollectionName; @Setter String finalTableName; @Getter private final Builder paramsBuilder = Params.newBuilder(); @@ -44,13 +45,19 @@ public class PostgresQueryParser { @Getter private final Map pgColumnNames = new HashMap<>(); @Getter private final FieldToPgColumnTransformer toPgColumnTransformer; - public PostgresQueryParser(PostgresTableIdentifier tableIdentifier, Query query) { + public PostgresQueryParser( + PostgresTableIdentifier tableIdentifier, Query query, String flatStructureCollectionName) { this.tableIdentifier = tableIdentifier; this.query = query; + this.flatStructureCollectionName = flatStructureCollectionName; this.finalTableName = tableIdentifier.toString(); toPgColumnTransformer = new FieldToPgColumnTransformer(this); } + public PostgresQueryParser(PostgresTableIdentifier tableIdentifier, Query query) { + this(tableIdentifier, query, null); + } + public String parse() { StringBuilder sqlBuilder = new StringBuilder(); int startIndexOfSelection = 0; diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresContainsRelationalFilterParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresContainsRelationalFilterParser.java index 190c69ea..16c5ebdd 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresContainsRelationalFilterParser.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresContainsRelationalFilterParser.java @@ -6,7 +6,8 @@ import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.expression.impl.RelationalExpression; -class PostgresContainsRelationalFilterParser implements PostgresRelationalFilterParser { +class PostgresContainsRelationalFilterParser + implements PostgresContainsRelationalFilterParserInterface { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @Override diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresContainsRelationalFilterParserInterface.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresContainsRelationalFilterParserInterface.java new file mode 100644 index 00000000..5d0e5c62 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresContainsRelationalFilterParserInterface.java @@ -0,0 +1,12 @@ +package org.hypertrace.core.documentstore.postgres.query.v1.parser.filter; + +/** + * Interface for handling CONTAINS operations in PostgreSQL queries. Implementations can provide + * different strategies for handling containment operations based on the context of the query (e.g., + * first-class fields vs. JSON fields). + */ +public interface PostgresContainsRelationalFilterParserInterface + extends PostgresRelationalFilterParser { + // Interface inherits the parse method from PostgresRelationalFilterParser + // No additional methods required at this time +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresInRelationalFilterParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresInRelationalFilterParser.java index a5310d1f..872d28bf 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresInRelationalFilterParser.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresInRelationalFilterParser.java @@ -5,7 +5,7 @@ import org.hypertrace.core.documentstore.expression.impl.RelationalExpression; import org.hypertrace.core.documentstore.postgres.Params; -class PostgresInRelationalFilterParser implements PostgresRelationalFilterParser { +class PostgresInRelationalFilterParser implements PostgresInRelationalFilterParserInterface { @Override public String parse( diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresInRelationalFilterParserInterface.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresInRelationalFilterParserInterface.java new file mode 100644 index 00000000..f47fed75 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresInRelationalFilterParserInterface.java @@ -0,0 +1,11 @@ +package org.hypertrace.core.documentstore.postgres.query.v1.parser.filter; + +/** + * Interface for handling IN operation filters in PostgreSQL queries. Implementations can provide + * different strategies for handling IN operations based on the context of the query (e.g., + * first-class fields vs. JSON fields). + */ +public interface PostgresInRelationalFilterParserInterface extends PostgresRelationalFilterParser { + // Interface inherits the parse method from PostgresRelationalFilterParser + // No additional methods required at this time +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresNotContainsRelationalFilterParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresNotContainsRelationalFilterParser.java index ebbef01b..15eea744 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresNotContainsRelationalFilterParser.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresNotContainsRelationalFilterParser.java @@ -1,19 +1,32 @@ package org.hypertrace.core.documentstore.postgres.query.v1.parser.filter; -import static org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.PostgresContainsRelationalFilterParser.prepareJsonValueForContainsOp; - import org.hypertrace.core.documentstore.expression.impl.RelationalExpression; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresContainsRelationalFilterParserNonJsonField; class PostgresNotContainsRelationalFilterParser implements PostgresRelationalFilterParser { + private static final PostgresContainsRelationalFilterParser jsonContainsParser = + new PostgresContainsRelationalFilterParser(); + private static final PostgresContainsRelationalFilterParserNonJsonField nonJsonContainsParser = + new PostgresContainsRelationalFilterParserNonJsonField(); + @Override public String parse( final RelationalExpression expression, final PostgresRelationalFilterContext context) { final String parsedLhs = expression.getLhs().accept(context.lhsParser()); - final Object parsedRhs = expression.getRhs().accept(context.rhsParser()); - final Object convertedRhs = prepareJsonValueForContainsOp(parsedRhs); - context.getParamsBuilder().addObjectParam(convertedRhs); + String flatStructureCollection = context.getFlatStructureCollectionName(); + boolean isFirstClassField = + flatStructureCollection != null + && flatStructureCollection.equals(context.getTableIdentifier().getTableName()); - return String.format("%s IS NULL OR NOT %s @> ?::jsonb", parsedLhs, parsedLhs); + if (isFirstClassField) { + // Use the non-JSON logic for first-class fields + String containsExpression = nonJsonContainsParser.parse(expression, context); + return String.format("%s IS NULL OR NOT (%s)", parsedLhs, containsExpression); + } else { + // Use the JSON logic for document fields. + jsonContainsParser.parse(expression, context); // This adds the parameter. + return String.format("%s IS NULL OR NOT %s @> ?::jsonb", parsedLhs, parsedLhs); + } } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresNotInRelationalFilterParser.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresNotInRelationalFilterParser.java index b168c6a5..92b2759c 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresNotInRelationalFilterParser.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresNotInRelationalFilterParser.java @@ -1,16 +1,32 @@ package org.hypertrace.core.documentstore.postgres.query.v1.parser.filter; import org.hypertrace.core.documentstore.expression.impl.RelationalExpression; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresInRelationalFilterParserNonJsonField; class PostgresNotInRelationalFilterParser implements PostgresRelationalFilterParser { - private static final PostgresInRelationalFilterParser inRelationalFilterParser = + private static final PostgresInRelationalFilterParserInterface jsonFieldInFilterParser = new PostgresInRelationalFilterParser(); + private static final PostgresInRelationalFilterParserInterface nonJsonFieldInFilterParser = + new PostgresInRelationalFilterParserNonJsonField(); @Override public String parse( final RelationalExpression expression, final PostgresRelationalFilterContext context) { final String parsedLhs = expression.getLhs().accept(context.lhsParser()); - final String parsedInExpression = inRelationalFilterParser.parse(expression, context); + + PostgresInRelationalFilterParserInterface inFilterParser = getInFilterParser(context); + + final String parsedInExpression = inFilterParser.parse(expression, context); return String.format("%s IS NULL OR NOT (%s)", parsedLhs, parsedInExpression); } + + private PostgresInRelationalFilterParserInterface getInFilterParser( + PostgresRelationalFilterContext context) { + String flatStructureCollection = context.getFlatStructureCollectionName(); + boolean isFirstClassField = + flatStructureCollection != null + && flatStructureCollection.equals(context.getTableIdentifier().getTableName()); + + return isFirstClassField ? nonJsonFieldInFilterParser : jsonFieldInFilterParser; + } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresRelationalFilterParserFactory.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresRelationalFilterParserFactory.java index ca239b67..83b7caad 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresRelationalFilterParserFactory.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresRelationalFilterParserFactory.java @@ -1,7 +1,9 @@ package org.hypertrace.core.documentstore.postgres.query.v1.parser.filter; import org.hypertrace.core.documentstore.expression.impl.RelationalExpression; +import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser; public interface PostgresRelationalFilterParserFactory { - PostgresRelationalFilterParser parser(final RelationalExpression expression); + PostgresRelationalFilterParser parser( + final RelationalExpression expression, final PostgresQueryParser postgresQueryParser); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresRelationalFilterParserFactoryImpl.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresRelationalFilterParserFactoryImpl.java index 3584e09c..8b4ef735 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresRelationalFilterParserFactoryImpl.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresRelationalFilterParserFactoryImpl.java @@ -14,25 +14,55 @@ import java.util.Map; import org.hypertrace.core.documentstore.expression.impl.RelationalExpression; import org.hypertrace.core.documentstore.expression.operators.RelationalOperator; +import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresContainsRelationalFilterParserNonJsonField; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresInRelationalFilterParserNonJsonField; public class PostgresRelationalFilterParserFactoryImpl implements PostgresRelationalFilterParserFactory { private static final Map parserMap = Maps.immutableEnumMap( Map.ofEntries( - entry(CONTAINS, new PostgresContainsRelationalFilterParser()), + // CONTAINS is conditionally chosen between JSON and non-JSON versions entry(NOT_CONTAINS, new PostgresNotContainsRelationalFilterParser()), entry(EXISTS, new PostgresExistsRelationalFilterParser()), entry(NOT_EXISTS, new PostgresNotExistsRelationalFilterParser()), - entry(IN, new PostgresInRelationalFilterParser()), + // IN are conditionally chosen between JSON and non-JSON versions entry(NOT_IN, new PostgresNotInRelationalFilterParser()), entry(LIKE, new PostgresLikeRelationalFilterParser()), entry(STARTS_WITH, new PostgresStartsWithRelationalFilterParser()))); + + // IN filter parsers + private static final PostgresInRelationalFilterParserInterface jsonFieldInFilterParser = + new PostgresInRelationalFilterParser(); + private static final PostgresInRelationalFilterParserInterface nonJsonFieldInFilterParser = + new PostgresInRelationalFilterParserNonJsonField(); + + // CONTAINS parser implementations + private static final PostgresContainsRelationalFilterParserInterface jsonFieldContainsParser = + new PostgresContainsRelationalFilterParser(); + private static final PostgresContainsRelationalFilterParserInterface nonJsonFieldContainsParser = + new PostgresContainsRelationalFilterParserNonJsonField(); + private static final PostgresStandardRelationalFilterParser postgresStandardRelationalFilterParser = new PostgresStandardRelationalFilterParser(); @Override - public PostgresRelationalFilterParser parser(final RelationalExpression expression) { + public PostgresRelationalFilterParser parser( + final RelationalExpression expression, final PostgresQueryParser postgresQueryParser) { + + String flatStructureCollection = postgresQueryParser.getFlatStructureCollectionName(); + boolean isFirstClassField = + flatStructureCollection != null + && flatStructureCollection.equals( + postgresQueryParser.getTableIdentifier().getTableName()); + + if (expression.getOperator() == CONTAINS) { + return isFirstClassField ? nonJsonFieldContainsParser : jsonFieldContainsParser; + } else if (expression.getOperator() == IN) { + return isFirstClassField ? nonJsonFieldInFilterParser : jsonFieldInFilterParser; + } + return parserMap.getOrDefault(expression.getOperator(), postgresStandardRelationalFilterParser); } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresContainsRelationalFilterParserNonJsonField.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresContainsRelationalFilterParserNonJsonField.java new file mode 100644 index 00000000..c330c7de --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresContainsRelationalFilterParserNonJsonField.java @@ -0,0 +1,40 @@ +package org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field; + +import java.util.Collection; +import java.util.Collections; +import org.hypertrace.core.documentstore.expression.impl.RelationalExpression; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.PostgresContainsRelationalFilterParserInterface; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.PostgresRelationalFilterParser; + +/** + * Implementation of CONTAINS operator for non-JSON fields (regular PostgreSQL arrays). Uses the + * PostgreSQL array containment operator (@>) for checking if one array contains another. + * + *

This class is optimized for first-class array columns rather than JSON document fields. + */ +public class PostgresContainsRelationalFilterParserNonJsonField + implements PostgresContainsRelationalFilterParserInterface { + + @Override + public String parse( + final RelationalExpression expression, + final PostgresRelationalFilterParser.PostgresRelationalFilterContext context) { + final String parsedLhs = expression.getLhs().accept(context.lhsParser()); + final Object parsedRhs = expression.getRhs().accept(context.rhsParser()); + + Object normalizedRhs = normalizeValue(parsedRhs); + context.getParamsBuilder().addObjectParam(normalizedRhs); + + return String.format("%s @> ARRAY[?]::text[]", parsedLhs); + } + + private Object normalizeValue(final Object value) { + if (value == null) { + return null; + } else if (value instanceof Collection) { + return value; + } else { + return Collections.singletonList(value); + } + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresInRelationalFilterParserNonJsonField.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresInRelationalFilterParserNonJsonField.java new file mode 100644 index 00000000..fd5a5f4d --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresInRelationalFilterParserNonJsonField.java @@ -0,0 +1,44 @@ +package org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field; + +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.hypertrace.core.documentstore.expression.impl.RelationalExpression; +import org.hypertrace.core.documentstore.postgres.Params; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.PostgresInRelationalFilterParserInterface; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.PostgresRelationalFilterParser; + +/** + * Implementation of PostgresInRelationalFilterParserInterface for handling IN operations on + * first-class fields (non-JSON columns), using the standard IN clause syntax. + */ +public class PostgresInRelationalFilterParserNonJsonField + implements PostgresInRelationalFilterParserInterface { + + @Override + public String parse( + final RelationalExpression expression, + final PostgresRelationalFilterParser.PostgresRelationalFilterContext context) { + final String parsedLhs = expression.getLhs().accept(context.lhsParser()); + final Iterable parsedRhs = expression.getRhs().accept(context.rhsParser()); + + return prepareFilterStringForInOperator(parsedLhs, parsedRhs, context.getParamsBuilder()); + } + + private String prepareFilterStringForInOperator( + final String parsedLhs, + final Iterable parsedRhs, + final Params.Builder paramsBuilder) { + + String placeholders = + StreamSupport.stream(parsedRhs.spliterator(), false) + .map( + value -> { + paramsBuilder.addObjectParam(value); + return "?"; + }) + .collect(Collectors.joining(", ")); + + // return String.format("%s IN (%s)", parsedLhs, placeholders); + return String.format("ARRAY[%s]::text[] && ARRAY[%s]::text[]", parsedLhs, placeholders); + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/transformer/FieldToPgColumnTransformer.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/transformer/FieldToPgColumnTransformer.java index 2cb4c351..7e5769ef 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/transformer/FieldToPgColumnTransformer.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/transformer/FieldToPgColumnTransformer.java @@ -16,6 +16,13 @@ public FieldToPgColumnTransformer(PostgresQueryParser postgresQueryParser) { } public FieldToPgColumn transform(String orgFieldName) { + // TODO: Forcing to map to the first class fields + String flatStructureCollection = postgresQueryParser.getFlatStructureCollectionName(); + if (flatStructureCollection != null + && flatStructureCollection.equals( + postgresQueryParser.getTableIdentifier().getTableName())) { + return new FieldToPgColumn(null, PostgresUtils.wrapFieldNamesWithDoubleQuotes(orgFieldName)); + } Optional parentField = postgresQueryParser.getPgColumnNames().keySet().stream() .filter(orgFieldName::startsWith) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/vistors/PostgresFilterTypeExpressionVisitor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/vistors/PostgresFilterTypeExpressionVisitor.java index 9e017c34..7875119e 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/vistors/PostgresFilterTypeExpressionVisitor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/vistors/PostgresFilterTypeExpressionVisitor.java @@ -79,7 +79,7 @@ public String visit(final RelationalExpression expression) { .build(); return new PostgresRelationalFilterParserFactoryImpl() - .parser(expression) + .parser(expression, postgresQueryParser) .parse(expression, context); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/utils/PostgresUtils.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/utils/PostgresUtils.java index 261c4b3f..24b8ffc6 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/utils/PostgresUtils.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/utils/PostgresUtils.java @@ -547,6 +547,10 @@ public static String wrapAliasWithDoubleQuotes(String fieldName) { return "\"" + fieldName + "\""; } + public static String wrapFieldNamesWithDoubleQuotes(String fieldName) { + return "\"" + fieldName + "\""; + } + public static String formatSubDocPath(String subDocPath) { return "{" + subDocPath.replaceAll(DOC_PATH_SEPARATOR, ",") + "}"; } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java index 222dc731..1d95702a 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java @@ -321,6 +321,32 @@ void testBuildPostgresUsingDefaultKeys() { assertEquals(expected, config); } + @Test + void testBuildPostgresUsingDefaultKeysAndCustomParams() { + final ConnectionConfig config = + TypesafeConfigDatastoreConfigExtractor.from( + buildConfigMapWithDefaultKeysAndCustomParamsForPostgres(), TYPE_KEY) + .extract() + .connectionConfig(); + final ConnectionConfig expected = + ConnectionConfig.builder() + .type(DatabaseType.POSTGRES) + .addEndpoint(Endpoint.builder().host(host).port(port).build()) + .database(database) + .credentials(ConnectionCredentials.builder().username(user).password(password).build()) + .applicationName(appName) + .connectionPoolConfig( + ConnectionPoolConfig.builder() + .maxConnections(maxConnections) + .connectionAccessTimeout(accessTimeout) + .connectionSurrenderTimeout(surrenderTimeout) + .build()) + .customParameter("flatStructureCollection", "earth") + .build(); + + assertEquals(expected, config); + } + private Config buildConfigMap() { return ConfigFactory.parseMap( Map.ofEntries( @@ -355,6 +381,22 @@ private Config buildConfigMapWithDefaultKeysForPostgres() { entry("connectionIdleTime", surrenderTimeout))); } + private Config buildConfigMapWithDefaultKeysAndCustomParamsForPostgres() { + return ConfigFactory.parseMap( + Map.ofEntries( + entry(TYPE_KEY, "postgres"), + entry("postgres.host", host), + entry("postgres.port", port), + entry("postgres.database", database), + entry("postgres.user", user), + entry("postgres.password", password), + entry("appName", appName), + entry("maxPoolSize", maxConnections), + entry("connectionAccessTimeout", accessTimeout), + entry("connectionIdleTime", surrenderTimeout), + entry("customParams.flatStructureCollection", "earth"))); + } + private Config buildConfigMapUsingDefaultKeysForMongo() { return ConfigFactory.parseMap( Map.ofEntries( diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/query/v1/PostgresQueryParserTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/query/v1/PostgresQueryParserTest.java index 3451f211..816d9211 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/query/v1/PostgresQueryParserTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/query/v1/PostgresQueryParserTest.java @@ -638,6 +638,7 @@ void testFindWithSortingAndPagination() { .setPagination(pagination) .build(); + // json field PostgresQueryParser postgresQueryParser = new PostgresQueryParser(TEST_TABLE, PostgresQueryTransformer.transform(query)); String sql = postgresQueryParser.parse(); @@ -661,6 +662,21 @@ void testFindWithSortingAndPagination() { assertEquals("Bottle", params.getObjectParams().get(7)); assertEquals(1, params.getObjectParams().get(9)); assertEquals(3, params.getObjectParams().get(10)); + + // non-json field + postgresQueryParser = + new PostgresQueryParser( + TEST_TABLE, PostgresQueryTransformer.transform(query), "testCollection"); + assertEquals( + "SELECT \"item\" AS \"item\", " + + "\"price\" AS \"price\", " + + "\"quantity\" AS \"quantity\", " + + "\"date\" AS \"date\" " + + "FROM \"testCollection\" " + + "WHERE ARRAY[\"item\"]::text[] && ARRAY[?, ?, ?, ?]::text[] " + + "ORDER BY \"quantity\" DESC NULLS LAST,\"item\" ASC NULLS FIRST " + + "OFFSET ? LIMIT ?", + postgresQueryParser.parse()); } @Test