diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index 044e75574391..6c8adc006df9 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -56,6 +56,8 @@ import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; import org.apache.beam.sdk.schemas.logicaltypes.FixedString; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; @@ -416,6 +418,10 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu String identifier = logicalType.getIdentifier(); if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { return Instant.ofEpochMilli(((Number) value).longValue()); + } else if (MicrosInstant.IDENTIFIER.equals(identifier) + || NanosInstant.IDENTIFIER.equals(identifier)) { + // Portable instant logical types: treat calcite numeric as epoch milliseconds. + return java.time.Instant.ofEpochMilli(((Number) value).longValue()); } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { if (value instanceof Date) { value = SqlFunctions.toInt((Date) value); @@ -574,6 +580,17 @@ private static Expression getBeamField( value = Expressions.call(expression, "getBytes", fieldName); } else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { value = Expressions.call(expression, "getDateTime", fieldName); + } else if (MicrosInstant.IDENTIFIER.equals(identifier) + || NanosInstant.IDENTIFIER.equals(identifier)) { + // Instant-like logical types: retrieve as Instant + value = + Expressions.convert_( + Expressions.call( + expression, + "getLogicalTypeValue", + fieldName, + Expressions.constant(java.time.Instant.class)), + java.time.Instant.class); } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { value = Expressions.convert_( @@ -661,6 +678,13 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) } else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { return nullOr( value, Expressions.call(Expressions.convert_(value, DateTime.class), "getMillis")); + } else if (MicrosInstant.IDENTIFIER.equals(identifier) + || NanosInstant.IDENTIFIER.equals(identifier)) { + // Convert java.time.Instant to epoch milliseconds for Calcite + return nullOr( + value, + Expressions.call( + Expressions.convert_(value, java.time.Instant.class), "toEpochMilli")); } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { return nullOr( value,