From d70b782695f1d037cac340da5bb5cff8cde4f796 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sat, 10 Jan 2026 16:37:29 +0530 Subject: [PATCH 1/6] rebase --- .../extensions/sql/impl/rel/BeamCalcRel.java | 115 +++++++++++++++++- 1 file changed, 113 insertions(+), 2 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index 044e75574391..7181ab439efa 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -57,6 +57,8 @@ import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; import org.apache.beam.sdk.schemas.logicaltypes.FixedString; import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; import org.apache.beam.sdk.schemas.logicaltypes.VariableString; @@ -416,6 +418,10 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu String identifier = logicalType.getIdentifier(); if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { return Instant.ofEpochMilli(((Number) value).longValue()); + } else if (MicrosInstant.IDENTIFIER.equals(identifier) + || NanosInstant.IDENTIFIER.equals(identifier)) { + // Portable instant logical types: treat calcite numeric as epoch milliseconds. + return Instant.ofEpochMilli(((Number) value).longValue()); } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { if (value instanceof Date) { value = SqlFunctions.toInt((Date) value); @@ -574,6 +580,17 @@ private static Expression getBeamField( value = Expressions.call(expression, "getBytes", fieldName); } else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { value = Expressions.call(expression, "getDateTime", fieldName); + } else if (MicrosInstant.IDENTIFIER.equals(identifier) + || NanosInstant.IDENTIFIER.equals(identifier)) { + // Instant-like logical types: retrieve as Instant + value = + Expressions.convert_( + Expressions.call( + expression, + "getLogicalTypeValue", + fieldName, + Expressions.constant(java.time.Instant.class)), + java.time.Instant.class); } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { value = Expressions.convert_( @@ -601,6 +618,59 @@ private static Expression getBeamField( fieldName, Expressions.constant(LocalDateTime.class)), LocalDateTime.class); + } else if (fieldType.getLogicalType() instanceof PassThroughLogicalType) { + // For pass-through logical types, read the underlying base type using the + // corresponding Row getter. + FieldType baseType = fieldType.getLogicalType().getBaseType(); + switch (baseType.getTypeName()) { + case BYTE: + value = Expressions.call(expression, "getByte", fieldName); + break; + case INT16: + value = Expressions.call(expression, "getInt16", fieldName); + break; + case INT32: + value = Expressions.call(expression, "getInt32", fieldName); + break; + case INT64: + value = Expressions.call(expression, "getInt64", fieldName); + break; + case DECIMAL: + value = Expressions.call(expression, "getDecimal", fieldName); + break; + case FLOAT: + value = Expressions.call(expression, "getFloat", fieldName); + break; + case DOUBLE: + value = Expressions.call(expression, "getDouble", fieldName); + break; + case STRING: + value = Expressions.call(expression, "getString", fieldName); + break; + case DATETIME: + value = Expressions.call(expression, "getDateTime", fieldName); + break; + case BOOLEAN: + value = Expressions.call(expression, "getBoolean", fieldName); + break; + case BYTES: + value = Expressions.call(expression, "getBytes", fieldName); + break; + case ARRAY: + value = Expressions.call(expression, "getArray", fieldName); + break; + case MAP: + value = Expressions.call(expression, "getMap", fieldName); + break; + case ROW: + value = Expressions.call(expression, "getRow", fieldName); + break; + case ITERABLE: + value = Expressions.call(expression, "getIterable", fieldName); + break; + default: + throw new UnsupportedOperationException("Unable to get logical type " + identifier); + } } else if (FixedPrecisionNumeric.IDENTIFIER.equals(identifier)) { value = Expressions.call(expression, "getDecimal", fieldName); } else { @@ -661,6 +731,11 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) } else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { return nullOr( value, Expressions.call(Expressions.convert_(value, DateTime.class), "getMillis")); + } else if (MicrosInstant.IDENTIFIER.equals(identifier) + || NanosInstant.IDENTIFIER.equals(identifier)) { + // Convert java.time.Instant to epoch milliseconds for Calcite + return nullOr( + value, Expressions.call(Expressions.convert_(value, java.time.Instant.class), "toEpochMilli")); } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { return nullOr( value, @@ -690,8 +765,44 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) Expressions.multiply(dateValue, Expressions.constant(MILLIS_PER_DAY)), Expressions.divide(timeValue, Expressions.constant(NANOS_PER_MILLISECOND))); return nullOr(value, returnValue); - } else if (FixedPrecisionNumeric.IDENTIFIER.equals(identifier)) { - return Expressions.convert_(value, BigDecimal.class); + } else if (fieldType.getLogicalType() instanceof PassThroughLogicalType) { + // For pass-through logical types, convert underlying base type to Calcite value + FieldType baseType = fieldType.getLogicalType().getBaseType(); + switch (baseType.getTypeName()) { + case BYTE: + return Expressions.convert_(value, Byte.class); + case INT16: + return Expressions.convert_(value, Short.class); + case INT32: + return Expressions.convert_(value, Integer.class); + case INT64: + return Expressions.convert_(value, Long.class); + case DECIMAL: + return Expressions.convert_(value, BigDecimal.class); + case FLOAT: + return Expressions.convert_(value, Float.class); + case DOUBLE: + return Expressions.convert_(value, Double.class); + case STRING: + return Expressions.convert_(value, String.class); + case BOOLEAN: + return Expressions.convert_(value, Boolean.class); + case DATETIME: + return nullOr( + value, + Expressions.call(Expressions.convert_(value, AbstractInstant.class), "getMillis")); + case BYTES: + return nullOr( + value, Expressions.new_(ByteString.class, Expressions.convert_(value, byte[].class))); + case ARRAY: + return nullOr(value, toCalciteList(value, baseType.getCollectionElementType())); + case MAP: + return nullOr(value, toCalciteMap(value, baseType.getMapValueType())); + case ROW: + return nullOr(value, toCalciteRow(value, baseType.getRowSchema())); + default: + throw new UnsupportedOperationException("Unable to convert logical type " + identifier); + } } else { throw new UnsupportedOperationException("Unable to convert logical type " + identifier); } From 6d572e4ac9faafe0e5a0382ff409e9149ff051db Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sat, 10 Jan 2026 16:51:59 +0530 Subject: [PATCH 2/6] reverting minor change --- .../apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index 7181ab439efa..751b276d87cc 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -43,6 +43,8 @@ import java.util.TimeZone; import java.util.TreeSet; import java.util.stream.Collectors; + +import org.apache.beam.model.pipeline.v1.SchemaApi.FieldType; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; import org.apache.beam.sdk.extensions.sql.impl.JavaUdfLoader; @@ -765,6 +767,8 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) Expressions.multiply(dateValue, Expressions.constant(MILLIS_PER_DAY)), Expressions.divide(timeValue, Expressions.constant(NANOS_PER_MILLISECOND))); return nullOr(value, returnValue); + } else if (FixedPrecisionNumeric.IDENTIFIER.equals(identifier)) { + return Expressions.convert_(value, BigDecimal.class); } else if (fieldType.getLogicalType() instanceof PassThroughLogicalType) { // For pass-through logical types, convert underlying base type to Calcite value FieldType baseType = fieldType.getLogicalType().getBaseType(); From 179a8f87b28a2a137467bb86f80faa731abce90b Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sat, 10 Jan 2026 17:24:15 +0530 Subject: [PATCH 3/6] spotless --- .../sdk/extensions/sql/impl/rel/BeamCalcRel.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index 751b276d87cc..023c1ce63339 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -43,7 +43,6 @@ import java.util.TimeZone; import java.util.TreeSet; import java.util.stream.Collectors; - import org.apache.beam.model.pipeline.v1.SchemaApi.FieldType; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; @@ -58,9 +57,9 @@ import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; import org.apache.beam.sdk.schemas.logicaltypes.FixedString; -import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; import org.apache.beam.sdk.schemas.logicaltypes.VariableString; @@ -737,7 +736,9 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) || NanosInstant.IDENTIFIER.equals(identifier)) { // Convert java.time.Instant to epoch milliseconds for Calcite return nullOr( - value, Expressions.call(Expressions.convert_(value, java.time.Instant.class), "toEpochMilli")); + value, + Expressions.call( + Expressions.convert_(value, java.time.Instant.class), "toEpochMilli")); } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { return nullOr( value, @@ -794,10 +795,12 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) case DATETIME: return nullOr( value, - Expressions.call(Expressions.convert_(value, AbstractInstant.class), "getMillis")); + Expressions.call( + Expressions.convert_(value, AbstractInstant.class), "getMillis")); case BYTES: return nullOr( - value, Expressions.new_(ByteString.class, Expressions.convert_(value, byte[].class))); + value, + Expressions.new_(ByteString.class, Expressions.convert_(value, byte[].class))); case ARRAY: return nullOr(value, toCalciteList(value, baseType.getCollectionElementType())); case MAP: @@ -805,7 +808,8 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) case ROW: return nullOr(value, toCalciteRow(value, baseType.getRowSchema())); default: - throw new UnsupportedOperationException("Unable to convert logical type " + identifier); + throw new UnsupportedOperationException( + "Unable to convert logical type " + identifier); } } else { throw new UnsupportedOperationException("Unable to convert logical type " + identifier); From a19b39b2bf2f3cb4132ffb3b3fff62f4d5d7bd11 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sat, 10 Jan 2026 17:36:40 +0530 Subject: [PATCH 4/6] remove redundant import --- .../org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index 023c1ce63339..07c388968606 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -43,7 +43,6 @@ import java.util.TimeZone; import java.util.TreeSet; import java.util.stream.Collectors; -import org.apache.beam.model.pipeline.v1.SchemaApi.FieldType; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; import org.apache.beam.sdk.extensions.sql.impl.JavaUdfLoader; From 288216d5415b7b8a3985cba20e1659b1934b4076 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sat, 10 Jan 2026 18:58:41 +0530 Subject: [PATCH 5/6] adressing gemini comments --- .../extensions/sql/impl/rel/BeamCalcRel.java | 98 +------------------ 1 file changed, 2 insertions(+), 96 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index 07c388968606..2159428c6830 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -421,7 +421,7 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu } else if (MicrosInstant.IDENTIFIER.equals(identifier) || NanosInstant.IDENTIFIER.equals(identifier)) { // Portable instant logical types: treat calcite numeric as epoch milliseconds. - return Instant.ofEpochMilli(((Number) value).longValue()); + return java.time.Instant.ofEpochMilli(((Number) value).longValue()); } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { if (value instanceof Date) { value = SqlFunctions.toInt((Date) value); @@ -618,60 +618,7 @@ private static Expression getBeamField( fieldName, Expressions.constant(LocalDateTime.class)), LocalDateTime.class); - } else if (fieldType.getLogicalType() instanceof PassThroughLogicalType) { - // For pass-through logical types, read the underlying base type using the - // corresponding Row getter. - FieldType baseType = fieldType.getLogicalType().getBaseType(); - switch (baseType.getTypeName()) { - case BYTE: - value = Expressions.call(expression, "getByte", fieldName); - break; - case INT16: - value = Expressions.call(expression, "getInt16", fieldName); - break; - case INT32: - value = Expressions.call(expression, "getInt32", fieldName); - break; - case INT64: - value = Expressions.call(expression, "getInt64", fieldName); - break; - case DECIMAL: - value = Expressions.call(expression, "getDecimal", fieldName); - break; - case FLOAT: - value = Expressions.call(expression, "getFloat", fieldName); - break; - case DOUBLE: - value = Expressions.call(expression, "getDouble", fieldName); - break; - case STRING: - value = Expressions.call(expression, "getString", fieldName); - break; - case DATETIME: - value = Expressions.call(expression, "getDateTime", fieldName); - break; - case BOOLEAN: - value = Expressions.call(expression, "getBoolean", fieldName); - break; - case BYTES: - value = Expressions.call(expression, "getBytes", fieldName); - break; - case ARRAY: - value = Expressions.call(expression, "getArray", fieldName); - break; - case MAP: - value = Expressions.call(expression, "getMap", fieldName); - break; - case ROW: - value = Expressions.call(expression, "getRow", fieldName); - break; - case ITERABLE: - value = Expressions.call(expression, "getIterable", fieldName); - break; - default: - throw new UnsupportedOperationException("Unable to get logical type " + identifier); - } - } else if (FixedPrecisionNumeric.IDENTIFIER.equals(identifier)) { + } else if (FixedPrecisionNumeric.IDENTIFIER.equals(identifier)) { value = Expressions.call(expression, "getDecimal", fieldName); } else { throw new UnsupportedOperationException("Unable to get logical type " + identifier); @@ -769,47 +716,6 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) return nullOr(value, returnValue); } else if (FixedPrecisionNumeric.IDENTIFIER.equals(identifier)) { return Expressions.convert_(value, BigDecimal.class); - } else if (fieldType.getLogicalType() instanceof PassThroughLogicalType) { - // For pass-through logical types, convert underlying base type to Calcite value - FieldType baseType = fieldType.getLogicalType().getBaseType(); - switch (baseType.getTypeName()) { - case BYTE: - return Expressions.convert_(value, Byte.class); - case INT16: - return Expressions.convert_(value, Short.class); - case INT32: - return Expressions.convert_(value, Integer.class); - case INT64: - return Expressions.convert_(value, Long.class); - case DECIMAL: - return Expressions.convert_(value, BigDecimal.class); - case FLOAT: - return Expressions.convert_(value, Float.class); - case DOUBLE: - return Expressions.convert_(value, Double.class); - case STRING: - return Expressions.convert_(value, String.class); - case BOOLEAN: - return Expressions.convert_(value, Boolean.class); - case DATETIME: - return nullOr( - value, - Expressions.call( - Expressions.convert_(value, AbstractInstant.class), "getMillis")); - case BYTES: - return nullOr( - value, - Expressions.new_(ByteString.class, Expressions.convert_(value, byte[].class))); - case ARRAY: - return nullOr(value, toCalciteList(value, baseType.getCollectionElementType())); - case MAP: - return nullOr(value, toCalciteMap(value, baseType.getMapValueType())); - case ROW: - return nullOr(value, toCalciteRow(value, baseType.getRowSchema())); - default: - throw new UnsupportedOperationException( - "Unable to convert logical type " + identifier); - } } else { throw new UnsupportedOperationException("Unable to convert logical type " + identifier); } From 6b4b41550b83894c43470b1c4511b2bf39b58b22 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sat, 10 Jan 2026 19:03:07 +0530 Subject: [PATCH 6/6] spotless --- .../apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index 2159428c6830..6c8adc006df9 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -618,7 +618,7 @@ private static Expression getBeamField( fieldName, Expressions.constant(LocalDateTime.class)), LocalDateTime.class); - } else if (FixedPrecisionNumeric.IDENTIFIER.equals(identifier)) { + } else if (FixedPrecisionNumeric.IDENTIFIER.equals(identifier)) { value = Expressions.call(expression, "getDecimal", fieldName); } else { throw new UnsupportedOperationException("Unable to get logical type " + identifier);