Skip to content

Conversation

@yuxiqian
Copy link
Member

This closes FLINK-38887.

Currently, there's no way for transform rules to handle nested types (ARRAY, MAP, ROW, and VARIANT introduced by FLINK-38877) as most internal converters refuse to handle these complex types.

This PR splits built-in function implementations into modules for maintainability, and tweaks transform operator to handle nested types.

@yuxiqian
Copy link
Member Author

Kindly ping @lvyanquan to take a look when you have time.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Enables Flink CDC transform rules to work with nested/complex types (ARRAY, MAP, ROW, VARIANT) by refactoring type conversions and splitting built-in functions into smaller modules used by the Janino compilation path.

Changes:

  • Refactors type conversion utilities (Calcite ↔ CDC types, internal ↔ Java objects) to support nested types and VARIANT.
  • Splits previous built-in function utilities into module-based static imports (Arithmetic/Casting/Comparison/Logical/String/Temporal).
  • Updates runtime/unit/integration tests and spec expectations for nested/variant data and updated casting/timezone behaviors.

Reviewed changes

Copilot reviewed 41 out of 41 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java Updates expected Janino translations for TO_DATE and DECIMAL casting.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java Renames test method for built-in functions.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java Adjusts timezone setup and casting error expectations.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java Removes legacy converter (split into more focused converters).
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/CalciteDataTypeConverter.java New Calcite↔CDC type converter used by transform parser (includes nested types).
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataExtractor.java Improves human-readable extraction for nested arrays/maps.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/BinaryInternalObjectConverter.java Adds binary encoding path for nested ROW values.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformTable.java Switches to new CalciteDataTypeConverter.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java Uses new CalciteDataTypeConverter for type inference.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java Loads module-based built-in functions; updates casting targets and type mapping.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java Uses JavaClassConverter for Janino parameter type inference.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java Uses JavaClassConverter for Janino parameter/return types.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java Updates row type generation using schema’s RowDataType.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java Switches conversion pipeline to JavaObjectConverter + BinaryInternalObjectConverter.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java Updates row type generation using schema’s RowDataType.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/TemporalFunctions.java New temporal built-in functions module (timezone handling updated).
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StringFunctions.java New string built-in functions module (incl. VARIANT JSON parsing).
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/LogicalFunctions.java New logical built-in functions module.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java New comparison built-in functions module.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/CastingFunctions.java New casting built-in functions module (BigDecimal + timestamp parsing).
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ArithmeticFunctions.java New arithmetic built-in functions module.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java Removes monolithic built-in utility class (replaced by modules).
flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/TimestampTypeReturningClass.scala Updates example UDF return type to Java LocalDateTime.
flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/LocalZonedTimestampTypeReturningClass.scala Updates example UDF return type to Java Instant.
flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeReturningClass.scala Updates example UDF return type to BigDecimal.
flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeNonNullReturningClass.scala Updates example UDF return type to BigDecimal.
flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/TimestampTypeReturningClass.java Updates example UDF return type to Java LocalDateTime.
flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/LocalZonedTimestampTypeReturningClass.java Updates example UDF return type to Java Instant.
flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeReturningClass.java Updates example UDF return type to BigDecimal.
flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeNonNullReturningClass.java Updates example UDF return type to BigDecimal.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelperTest.java Updates helper signature to pass data types for nested rendering.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java Adds type-aware rendering via BinaryRecordDataExtractor (nested/bytes friendly).
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkFunction.java Passes schema data types into ValuesDataSinkHelper.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java Passes schema data types into ValuesDataSinkHelper.
flink-cdc-composer/src/test/resources/specs/nested.yaml Enables nested-spec coverage; updates expected output incl. VARIANT.
flink-cdc-composer/src/test/resources/specs/meta.yaml Updates expected outputs for schema metadata and VARIANT printing.
flink-cdc-composer/src/test/resources/specs/basic.yaml Updates expected outputs for wildcard/nested columns incl. VARIANT.
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java Extends test input schema/data with VARIANT values.
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java Makes UDF output assertion order-insensitive.
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java Updates expected error message to include module import expression.
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java Refactors datetime utilities to Java time types and updates formatting/diff/add APIs.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant