Skip to content

Commit 683b702

Browse files
authored
SubDocUpdate Parsers for FlatPostgresCollection (#279)
1 parent cfc9c08 commit 683b702

18 files changed

+2232
-1009
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java

Lines changed: 1039 additions & 595 deletions
Large diffs are not rendered by default.

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/MongoFlatPgConsistencyTest.java

Lines changed: 749 additions & 0 deletions
Large diffs are not rendered by default.

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java

Lines changed: 56 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package org.hypertrace.core.documentstore.postgres;
22

3+
import static java.util.Map.entry;
34
import static org.hypertrace.core.documentstore.model.options.ReturnDocumentType.AFTER_UPDATE;
45
import static org.hypertrace.core.documentstore.model.options.ReturnDocumentType.BEFORE_UPDATE;
56
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.ADD;
7+
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.ADD_TO_LIST_IF_ABSENT;
8+
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.APPEND_TO_LIST;
9+
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.REMOVE_ALL_FROM_LIST;
610
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.SET;
11+
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.UNSET;
712

813
import com.fasterxml.jackson.databind.JsonNode;
914
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -47,10 +52,14 @@
4752
import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType;
4853
import org.hypertrace.core.documentstore.postgres.query.v1.transformer.FlatPostgresFieldTransformer;
4954
import org.hypertrace.core.documentstore.postgres.query.v1.transformer.LegacyFilterToQueryFilterTransformer;
50-
import org.hypertrace.core.documentstore.postgres.update.FlatUpdateContext;
51-
import org.hypertrace.core.documentstore.postgres.update.parser.FlatCollectionSubDocAddOperatorParser;
52-
import org.hypertrace.core.documentstore.postgres.update.parser.FlatCollectionSubDocSetOperatorParser;
53-
import org.hypertrace.core.documentstore.postgres.update.parser.FlatCollectionSubDocUpdateOperatorParser;
55+
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresAddToListIfAbsentParser;
56+
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresAddValueParser;
57+
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresAppendToListParser;
58+
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresRemoveAllFromListParser;
59+
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresSetValueParser;
60+
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresUnsetPathParser;
61+
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresUpdateOperationParser;
62+
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresUpdateOperationParser.UpdateParserInput;
5463
import org.hypertrace.core.documentstore.postgres.utils.PostgresUtils;
5564
import org.hypertrace.core.documentstore.query.Query;
5665
import org.postgresql.util.PSQLException;
@@ -74,11 +83,14 @@ public class FlatPostgresCollection extends PostgresCollection {
7483
private static final String MISSING_COLUMN_STRATEGY_CONFIG = "missingColumnStrategy";
7584
private static final String DEFAULT_PRIMARY_KEY_COLUMN = "key";
7685

77-
private static final Map<UpdateOperator, FlatCollectionSubDocUpdateOperatorParser>
78-
SUB_DOC_UPDATE_PARSERS =
79-
Map.of(
80-
SET, new FlatCollectionSubDocSetOperatorParser(),
81-
ADD, new FlatCollectionSubDocAddOperatorParser());
86+
private static final Map<UpdateOperator, PostgresUpdateOperationParser> UPDATE_PARSER_MAP =
87+
Map.ofEntries(
88+
entry(SET, new PostgresSetValueParser()),
89+
entry(UNSET, new PostgresUnsetPathParser()),
90+
entry(ADD, new PostgresAddValueParser()),
91+
entry(REMOVE_ALL_FROM_LIST, new PostgresRemoveAllFromListParser()),
92+
entry(ADD_TO_LIST_IF_ABSENT, new PostgresAddToListIfAbsentParser()),
93+
entry(APPEND_TO_LIST, new PostgresAppendToListParser()));
8294

8395
private final PostgresLazyilyLoadedSchemaRegistry schemaRegistry;
8496

@@ -624,7 +636,7 @@ private Map<String, String> resolvePathsToColumns(
624636
UpdateOperator operator = update.getOperator();
625637

626638
Preconditions.checkArgument(
627-
SUB_DOC_UPDATE_PARSERS.containsKey(operator), "Unsupported UPDATE operator: " + operator);
639+
UPDATE_PARSER_MAP.containsKey(operator), "Unsupported UPDATE operator: " + operator);
628640

629641
String path = update.getSubDocument().getPath();
630642
Optional<String> columnName = resolveColumnName(path, tableName);
@@ -744,20 +756,40 @@ private void executeUpdate(
744756
PostgresColumnMetadata colMeta =
745757
schemaRegistry.getColumnOrRefresh(tableName, columnName).orElseThrow();
746758

747-
FlatUpdateContext context =
748-
FlatUpdateContext.builder()
749-
.columnName(columnName)
750-
// get the nested path. So for example, if colName is `customAttr` and full path is
751-
// `customAttr.props`, then the nested path is `props`.
752-
.nestedPath(getNestedPath(path, columnName))
753-
.columnType(colMeta.getPostgresType())
754-
.value(update.getSubDocumentValue())
755-
.params(params)
756-
.build();
757-
758-
FlatCollectionSubDocUpdateOperatorParser operatorParser =
759-
SUB_DOC_UPDATE_PARSERS.get(update.getOperator());
760-
String fragment = operatorParser.parse(context);
759+
String[] nestedPath = getNestedPath(path, columnName);
760+
boolean isTopLevel = nestedPath.length == 0;
761+
UpdateOperator operator = update.getOperator();
762+
763+
Params.Builder paramsBuilder = Params.newBuilder();
764+
PostgresUpdateOperationParser unifiedParser = UPDATE_PARSER_MAP.get(operator);
765+
766+
String fragment;
767+
768+
if (isTopLevel) {
769+
UpdateParserInput input =
770+
UpdateParserInput.builder()
771+
.baseField(columnName)
772+
.path(new String[0])
773+
.update(update)
774+
.paramsBuilder(paramsBuilder)
775+
.columnType(colMeta.getPostgresType())
776+
.build();
777+
fragment = unifiedParser.parseNonJsonbField(input);
778+
} else {
779+
// parseInternal() returns just the value expression
780+
UpdateParserInput jsonbInput =
781+
UpdateParserInput.builder()
782+
.baseField(String.format("\"%s\"", columnName))
783+
.path(nestedPath)
784+
.update(update)
785+
.paramsBuilder(paramsBuilder)
786+
.columnType(colMeta.getPostgresType())
787+
.build();
788+
String valueExpr = unifiedParser.parseInternal(jsonbInput);
789+
fragment = String.format("\"%s\" = %s", columnName, valueExpr);
790+
}
791+
// Transfer params from builder to our list
792+
params.addAll(paramsBuilder.build().getObjectParams().values());
761793
setFragments.add(fragment);
762794
}
763795

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/PostgresInRelationalFilterParserJsonArray.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.hypertrace.core.documentstore.expression.impl.JsonIdentifierExpression;
77
import org.hypertrace.core.documentstore.expression.impl.RelationalExpression;
88
import org.hypertrace.core.documentstore.postgres.Params;
9+
import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType;
910

1011
/**
1112
* Optimized parser for IN operations on JSON array fields with type-specific casting.
@@ -119,7 +120,7 @@ private String prepareFilterStringForArrayInOperator(
119120
final Params.Builder paramsBuilder) {
120121

121122
// Determine the appropriate type cast for jsonb_build_array elements
122-
String typeCast = getTypeCastForArray(fieldType);
123+
String typeCast = PostgresDataType.getJsonArrayElementTypeCast(fieldType);
123124

124125
// For JSON arrays, we use the @> containment operator
125126
// Check if ANY of the RHS values is contained in the LHS array
@@ -137,26 +138,4 @@ private String prepareFilterStringForArrayInOperator(
137138
? String.format("(%s)", orConditions)
138139
: orConditions;
139140
}
140-
141-
/**
142-
* Returns the PostgreSQL type cast string for jsonb_build_array elements based on array type.
143-
*
144-
* @param fieldType The JSON field type (must not be null)
145-
* @return Type cast string (e.g., "::text", "::numeric")
146-
*/
147-
private String getTypeCastForArray(JsonFieldType fieldType) {
148-
switch (fieldType) {
149-
case STRING_ARRAY:
150-
return "::text";
151-
case NUMBER_ARRAY:
152-
return "::numeric";
153-
case BOOLEAN_ARRAY:
154-
return "::boolean";
155-
case OBJECT_ARRAY:
156-
return "::jsonb";
157-
default:
158-
throw new IllegalArgumentException(
159-
"Unsupported array type: " + fieldType + ". Expected *_ARRAY types.");
160-
}
161-
}
162141
}

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/parser/filter/nonjson/field/PostgresDataType.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field;
22

33
import org.hypertrace.core.documentstore.expression.impl.DataType;
4+
import org.hypertrace.core.documentstore.expression.impl.JsonFieldType;
45

56
/**
67
* PostgreSQL-specific data types with their SQL type strings.
@@ -40,6 +41,31 @@ public String getArraySqlType() {
4041
return sqlType + "[]";
4142
}
4243

44+
public String getTypeCast() {
45+
return sqlType == null ? "" : "::" + sqlType;
46+
}
47+
48+
public String getArrayTypeCast() {
49+
return sqlType == null ? "" : "::" + sqlType + "[]";
50+
}
51+
52+
public static PostgresDataType fromJavaValue(Object value) {
53+
if (value instanceof String) {
54+
return TEXT;
55+
} else if (value instanceof Integer) {
56+
return INTEGER;
57+
} else if (value instanceof Long) {
58+
return BIGINT;
59+
} else if (value instanceof Float) {
60+
return REAL;
61+
} else if (value instanceof Double) {
62+
return DOUBLE_PRECISION;
63+
} else if (value instanceof Boolean) {
64+
return BOOLEAN;
65+
}
66+
return UNKNOWN;
67+
}
68+
4369
/**
4470
* Maps a generic DataType to its PostgreSQL equivalent.
4571
*
@@ -70,4 +96,27 @@ public static PostgresDataType fromDataType(DataType dataType) {
7096
throw new IllegalArgumentException("Unknown DataType: " + dataType);
7197
}
7298
}
99+
100+
/**
101+
* Returns the PostgreSQL type cast string for JSONB array element types.
102+
*
103+
* @param fieldType the JSON field type (must be an array type)
104+
* @return Type cast string (e.g., "::text", "::numeric", "::boolean", "::jsonb")
105+
* @throws IllegalArgumentException if fieldType is not a supported array type
106+
*/
107+
public static String getJsonArrayElementTypeCast(JsonFieldType fieldType) {
108+
switch (fieldType) {
109+
case STRING_ARRAY:
110+
return "::text";
111+
case NUMBER_ARRAY:
112+
return "::numeric";
113+
case BOOLEAN_ARRAY:
114+
return "::boolean";
115+
case OBJECT_ARRAY:
116+
return "::jsonb";
117+
default:
118+
throw new IllegalArgumentException(
119+
"Unsupported array type: " + fieldType + ". Expected *_ARRAY types.");
120+
}
121+
}
73122
}

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/query/v1/vistors/PostgresFilterTypeExpressionVisitor.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.hypertrace.core.documentstore.postgres.query.v1.parser.builder.PostgresSelectExpressionParserBuilderImpl;
3131
import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.PostgresRelationalFilterParser.PostgresRelationalFilterContext;
3232
import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.PostgresRelationalFilterParserFactoryImpl;
33+
import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType;
3334

3435
public class PostgresFilterTypeExpressionVisitor implements FilterTypeExpressionVisitor {
3536

@@ -260,26 +261,20 @@ private String inferArrayTypeCastFromFilter(FilterTypeExpression filter) {
260261
if (filter instanceof RelationalExpression) {
261262
RelationalExpression relExpr = (RelationalExpression) filter;
262263

263-
// The visitor returns a string representation, but we need the actual value
264264
// Try to get the constant value directly if it's a ConstantExpression
265265
if (relExpr.getRhs() instanceof ConstantExpression) {
266266
ConstantExpression constExpr = (ConstantExpression) relExpr.getRhs();
267267
Object value = constExpr.getValue();
268268

269-
if (value instanceof String) {
270-
return "::text[]";
271-
} else if (value instanceof Integer || value instanceof Long) {
272-
return "::bigint[]";
273-
} else if (value instanceof Double || value instanceof Float) {
274-
return "::double precision[]";
275-
} else if (value instanceof Boolean) {
276-
return "::boolean[]";
269+
PostgresDataType pgType = PostgresDataType.fromJavaValue(value);
270+
if (pgType != PostgresDataType.UNKNOWN) {
271+
return pgType.getArrayTypeCast();
277272
}
278273
}
279274
}
280275

281276
// Default to text[] if we can't infer the type
282-
return "::text[]";
277+
return PostgresDataType.TEXT.getArrayTypeCast();
283278
}
284279

285280
private String getFilterStringForAnyOperator(final DocumentArrayFilterExpression expression) {

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/update/FlatUpdateContext.java

Lines changed: 0 additions & 43 deletions
This file was deleted.

0 commit comments

Comments
 (0)