-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-38879][pipeline-connector][paimon] Add support for creating and writing table with variant type. #4228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…d writing table with variant type.
|
Hi @yunfengzhou-hub, maybe you can help to review this as you are more familiar with Paimon Sink. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request adds support for creating and writing tables with the VARIANT data type in the Paimon connector for Flink CDC.
Changes:
- Introduced a new
TypeUtilsutility class to handle type conversions between CDC and Paimon systems with special handling for VARIANT types - Added field getter implementation for VARIANT type in
PaimonWriterHelperto convert between CDC'sBinaryVariantand Paimon'sGenericVariant - Refactored existing type conversion code to use the new
TypeUtilsclass for consistency and to support VARIANT type
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| TypeUtils.java | New utility class providing bidirectional type conversion between CDC and Paimon types with special handling for VARIANT |
| PaimonWriterHelper.java | Added VARIANT field getter and updated schema deduction to use TypeUtils for type conversions |
| SchemaChangeProvider.java | Refactored to use TypeUtils for consistent type conversion, eliminating duplicated conversion logic |
| PaimonMetadataApplier.java | Refactored to use TypeUtils for type conversion when applying schema changes |
| PaimonWriterHelperTest.java | Added comprehensive tests for VARIANT type handling in data conversion and schema deduction |
| PaimonMetadataApplierTest.java | Added tests for creating tables and adding columns with VARIANT type |
| PaimonHashFunctionTest.java | Added VARIANT type to hash function tests to ensure proper handling in partitioning logic |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
yunfengzhou-hub
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Left some comments as below.
...nector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/utils/TypeUtils.java
Show resolved
Hide resolved
...-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
Outdated
Show resolved
Hide resolved
yunfengzhou-hub
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Add support for creating and writing table with variant type.
Why not add end-to-end (e2e) tests?
Because the Flink version we're currently using is 1.20, and Flink SQL in this version does not yet support querying the VARIANT type. Therefore, we cannot use the catalog in Flink SQL to query and verify the results.
Support for this will only be available after we upgrade to Flink 2.2.