|
54 | 54 | import org.apache.beam.sdk.schemas.Schema; |
55 | 55 | import org.apache.beam.sdk.schemas.Schema.LogicalType; |
56 | 56 | import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; |
| 57 | +import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; |
57 | 58 | import org.apache.beam.sdk.schemas.logicaltypes.FixedString; |
58 | 59 | import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; |
59 | 60 | import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; |
@@ -600,6 +601,8 @@ private static Expression getBeamField( |
600 | 601 | fieldName, |
601 | 602 | Expressions.constant(LocalDateTime.class)), |
602 | 603 | LocalDateTime.class); |
| 604 | + } else if (FixedPrecisionNumeric.IDENTIFIER.equals(identifier)) { |
| 605 | + value = Expressions.call(expression, "getDecimal", fieldName); |
603 | 606 | } else { |
604 | 607 | throw new UnsupportedOperationException("Unable to get logical type " + identifier); |
605 | 608 | } |
@@ -687,6 +690,8 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) |
687 | 690 | Expressions.multiply(dateValue, Expressions.constant(MILLIS_PER_DAY)), |
688 | 691 | Expressions.divide(timeValue, Expressions.constant(NANOS_PER_MILLISECOND))); |
689 | 692 | return nullOr(value, returnValue); |
| 693 | + } else if (FixedPrecisionNumeric.IDENTIFIER.equals(identifier)) { |
| 694 | + return Expressions.convert_(value, BigDecimal.class); |
690 | 695 | } else { |
691 | 696 | throw new UnsupportedOperationException("Unable to convert logical type " + identifier); |
692 | 697 | } |
|
0 commit comments