|
57 | 57 | import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; |
58 | 58 | import org.apache.beam.sdk.schemas.logicaltypes.FixedString; |
59 | 59 | import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; |
| 60 | +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; |
| 61 | +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; |
60 | 62 | import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; |
61 | 63 | import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; |
62 | 64 | import org.apache.beam.sdk.schemas.logicaltypes.VariableString; |
@@ -416,6 +418,10 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu |
416 | 418 | String identifier = logicalType.getIdentifier(); |
417 | 419 | if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { |
418 | 420 | return Instant.ofEpochMilli(((Number) value).longValue()); |
| 421 | + } else if (MicrosInstant.IDENTIFIER.equals(identifier) |
| 422 | + || NanosInstant.IDENTIFIER.equals(identifier)) { |
| 423 | + // Portable instant logical types: treat calcite numeric as epoch milliseconds. |
| 424 | + return Instant.ofEpochMilli(((Number) value).longValue()); |
419 | 425 | } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { |
420 | 426 | if (value instanceof Date) { |
421 | 427 | value = SqlFunctions.toInt((Date) value); |
@@ -574,6 +580,17 @@ private static Expression getBeamField( |
574 | 580 | value = Expressions.call(expression, "getBytes", fieldName); |
575 | 581 | } else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { |
576 | 582 | value = Expressions.call(expression, "getDateTime", fieldName); |
| 583 | + } else if (MicrosInstant.IDENTIFIER.equals(identifier) |
| 584 | + || NanosInstant.IDENTIFIER.equals(identifier)) { |
| 585 | + // Instant-like logical types: retrieve as Instant |
| 586 | + value = |
| 587 | + Expressions.convert_( |
| 588 | + Expressions.call( |
| 589 | + expression, |
| 590 | + "getLogicalTypeValue", |
| 591 | + fieldName, |
| 592 | + Expressions.constant(java.time.Instant.class)), |
| 593 | + java.time.Instant.class); |
577 | 594 | } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { |
578 | 595 | value = |
579 | 596 | Expressions.convert_( |
@@ -601,6 +618,59 @@ private static Expression getBeamField( |
601 | 618 | fieldName, |
602 | 619 | Expressions.constant(LocalDateTime.class)), |
603 | 620 | LocalDateTime.class); |
| 621 | + } else if (fieldType.getLogicalType() instanceof PassThroughLogicalType) { |
| 622 | + // For pass-through logical types, read the underlying base type using the |
| 623 | + // corresponding Row getter. |
| 624 | + FieldType baseType = fieldType.getLogicalType().getBaseType(); |
| 625 | + switch (baseType.getTypeName()) { |
| 626 | + case BYTE: |
| 627 | + value = Expressions.call(expression, "getByte", fieldName); |
| 628 | + break; |
| 629 | + case INT16: |
| 630 | + value = Expressions.call(expression, "getInt16", fieldName); |
| 631 | + break; |
| 632 | + case INT32: |
| 633 | + value = Expressions.call(expression, "getInt32", fieldName); |
| 634 | + break; |
| 635 | + case INT64: |
| 636 | + value = Expressions.call(expression, "getInt64", fieldName); |
| 637 | + break; |
| 638 | + case DECIMAL: |
| 639 | + value = Expressions.call(expression, "getDecimal", fieldName); |
| 640 | + break; |
| 641 | + case FLOAT: |
| 642 | + value = Expressions.call(expression, "getFloat", fieldName); |
| 643 | + break; |
| 644 | + case DOUBLE: |
| 645 | + value = Expressions.call(expression, "getDouble", fieldName); |
| 646 | + break; |
| 647 | + case STRING: |
| 648 | + value = Expressions.call(expression, "getString", fieldName); |
| 649 | + break; |
| 650 | + case DATETIME: |
| 651 | + value = Expressions.call(expression, "getDateTime", fieldName); |
| 652 | + break; |
| 653 | + case BOOLEAN: |
| 654 | + value = Expressions.call(expression, "getBoolean", fieldName); |
| 655 | + break; |
| 656 | + case BYTES: |
| 657 | + value = Expressions.call(expression, "getBytes", fieldName); |
| 658 | + break; |
| 659 | + case ARRAY: |
| 660 | + value = Expressions.call(expression, "getArray", fieldName); |
| 661 | + break; |
| 662 | + case MAP: |
| 663 | + value = Expressions.call(expression, "getMap", fieldName); |
| 664 | + break; |
| 665 | + case ROW: |
| 666 | + value = Expressions.call(expression, "getRow", fieldName); |
| 667 | + break; |
| 668 | + case ITERABLE: |
| 669 | + value = Expressions.call(expression, "getIterable", fieldName); |
| 670 | + break; |
| 671 | + default: |
| 672 | + throw new UnsupportedOperationException("Unable to get logical type " + identifier); |
| 673 | + } |
604 | 674 | } else if (FixedPrecisionNumeric.IDENTIFIER.equals(identifier)) { |
605 | 675 | value = Expressions.call(expression, "getDecimal", fieldName); |
606 | 676 | } else { |
@@ -661,6 +731,11 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) |
661 | 731 | } else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { |
662 | 732 | return nullOr( |
663 | 733 | value, Expressions.call(Expressions.convert_(value, DateTime.class), "getMillis")); |
| 734 | + } else if (MicrosInstant.IDENTIFIER.equals(identifier) |
| 735 | + || NanosInstant.IDENTIFIER.equals(identifier)) { |
| 736 | + // Convert java.time.Instant to epoch milliseconds for Calcite |
| 737 | + return nullOr( |
| 738 | + value, Expressions.call(Expressions.convert_(value, java.time.Instant.class), "toEpochMilli")); |
664 | 739 | } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { |
665 | 740 | return nullOr( |
666 | 741 | value, |
@@ -690,8 +765,44 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) |
690 | 765 | Expressions.multiply(dateValue, Expressions.constant(MILLIS_PER_DAY)), |
691 | 766 | Expressions.divide(timeValue, Expressions.constant(NANOS_PER_MILLISECOND))); |
692 | 767 | return nullOr(value, returnValue); |
693 | | - } else if (FixedPrecisionNumeric.IDENTIFIER.equals(identifier)) { |
694 | | - return Expressions.convert_(value, BigDecimal.class); |
| 768 | + } else if (fieldType.getLogicalType() instanceof PassThroughLogicalType) { |
| 769 | + // For pass-through logical types, convert underlying base type to Calcite value |
| 770 | + FieldType baseType = fieldType.getLogicalType().getBaseType(); |
| 771 | + switch (baseType.getTypeName()) { |
| 772 | + case BYTE: |
| 773 | + return Expressions.convert_(value, Byte.class); |
| 774 | + case INT16: |
| 775 | + return Expressions.convert_(value, Short.class); |
| 776 | + case INT32: |
| 777 | + return Expressions.convert_(value, Integer.class); |
| 778 | + case INT64: |
| 779 | + return Expressions.convert_(value, Long.class); |
| 780 | + case DECIMAL: |
| 781 | + return Expressions.convert_(value, BigDecimal.class); |
| 782 | + case FLOAT: |
| 783 | + return Expressions.convert_(value, Float.class); |
| 784 | + case DOUBLE: |
| 785 | + return Expressions.convert_(value, Double.class); |
| 786 | + case STRING: |
| 787 | + return Expressions.convert_(value, String.class); |
| 788 | + case BOOLEAN: |
| 789 | + return Expressions.convert_(value, Boolean.class); |
| 790 | + case DATETIME: |
| 791 | + return nullOr( |
| 792 | + value, |
| 793 | + Expressions.call(Expressions.convert_(value, AbstractInstant.class), "getMillis")); |
| 794 | + case BYTES: |
| 795 | + return nullOr( |
| 796 | + value, Expressions.new_(ByteString.class, Expressions.convert_(value, byte[].class))); |
| 797 | + case ARRAY: |
| 798 | + return nullOr(value, toCalciteList(value, baseType.getCollectionElementType())); |
| 799 | + case MAP: |
| 800 | + return nullOr(value, toCalciteMap(value, baseType.getMapValueType())); |
| 801 | + case ROW: |
| 802 | + return nullOr(value, toCalciteRow(value, baseType.getRowSchema())); |
| 803 | + default: |
| 804 | + throw new UnsupportedOperationException("Unable to convert logical type " + identifier); |
| 805 | + } |
695 | 806 | } else { |
696 | 807 | throw new UnsupportedOperationException("Unable to convert logical type " + identifier); |
697 | 808 | } |
|
0 commit comments