@@ -1137,6 +1137,104 @@ void testTransformWithTimestamps(String timezone) throws Exception {
11371137 "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, null, null, null, null, null, null], op=INSERT, meta=()}" );
11381138 }
11391139
1140+ /** This tests if transform variant functions works as expected. */
1141+ @ Test
1142+ void testTransformWithVariantFunctions () throws Exception {
1143+ FlinkPipelineComposer composer = FlinkPipelineComposer .ofMiniCluster ();
1144+
1145+ // Setup value source
1146+ Configuration sourceConfig = new Configuration ();
1147+ sourceConfig .set (
1148+ ValuesDataSourceOptions .EVENT_SET_ID ,
1149+ ValuesDataSourceHelper .EventSetId .CUSTOM_SOURCE_EVENTS );
1150+
1151+ TableId myTable = TableId .tableId ("default_namespace" , "default_schema" , "mytable1" );
1152+ Schema tableSchema =
1153+ Schema .newBuilder ()
1154+ .physicalColumn ("id" , DataTypes .INT ().notNull ())
1155+ .physicalColumn ("message" , DataTypes .STRING ())
1156+ .physicalColumn ("duplicatedMessage" , DataTypes .STRING ())
1157+ .physicalColumn ("invalidMessage" , DataTypes .STRING ())
1158+ .primaryKey ("id" )
1159+ .build ();
1160+
1161+ BinaryRecordDataGenerator generator =
1162+ new BinaryRecordDataGenerator (
1163+ tableSchema .getColumnDataTypes ().toArray (new DataType [0 ]));
1164+ List <Event > events =
1165+ Arrays .asList (
1166+ new CreateTableEvent (myTable , tableSchema ),
1167+ DataChangeEvent .insertEvent (
1168+ myTable ,
1169+ generator .generate (
1170+ new Object [] {
1171+ 1 ,
1172+ BinaryStringData .fromString (
1173+ "{\" name\" :\" Bob\" ,\" age\" :30,\" is_active\" :true,\" email\" :\" zhangsan@example.com\" ,\" hobbies\" :[\" reading\" ,\" coding\" ,\" traveling\" ],\" address\" :{\" street\" :\" MainSt\" ,\" city\" :\" Beijing\" ,\" zip\" :\" 100000\" }}" ),
1174+ BinaryStringData .fromString (
1175+ "{\" name\" :\" Bob\" ,\" name\" :\" Bob\" ,\" age\" :30,\" is_active\" :true,\" email\" :\" zhangsan@example.com\" ,\" hobbies\" :[\" reading\" ,\" coding\" ,\" traveling\" ],\" address\" :{\" street\" :\" MainSt\" ,\" city\" :\" Beijing\" ,\" zip\" :\" 100000\" }}" ),
1176+ BinaryStringData .fromString ("invalidJson" ),
1177+ })),
1178+ DataChangeEvent .insertEvent (
1179+ myTable ,
1180+ generator .generate (
1181+ new Object [] {
1182+ 2 ,
1183+ BinaryStringData .fromString (
1184+ "{\" name\" :\" Mark\" ,\" age\" :40,\" is_active\" :true,\" email\" :\" lisi@example.com\" ,\" hobbies\" :[\" reading\" ,\" coding\" ,\" traveling\" ],\" address\" :{\" street\" :\" MainSt\" ,\" city\" :\" Beijing\" ,\" zip\" :\" 100000\" }}" ),
1185+ BinaryStringData .fromString (
1186+ "{\" name\" :\" Mark\" ,\" name\" :\" Mark\" ,\" age\" :40,\" is_active\" :true,\" email\" :\" lisi@example.com\" ,\" hobbies\" :[\" reading\" ,\" coding\" ,\" traveling\" ],\" address\" :{\" street\" :\" MainSt\" ,\" city\" :\" Beijing\" ,\" zip\" :\" 100000\" }}" ),
1187+ BinaryStringData .fromString ("invalidJson" ),
1188+ })),
1189+ DataChangeEvent .insertEvent (
1190+ myTable , generator .generate (new Object [] {3 , null , null , null })));
1191+
1192+ ValuesDataSourceHelper .setSourceEvents (Collections .singletonList (events ));
1193+
1194+ SourceDef sourceDef =
1195+ new SourceDef (ValuesDataFactory .IDENTIFIER , "Value Source" , sourceConfig );
1196+
1197+ // Setup value sink
1198+ Configuration sinkConfig = new Configuration ();
1199+ sinkConfig .set (ValuesDataSinkOptions .MATERIALIZED_IN_MEMORY , true );
1200+ SinkDef sinkDef = new SinkDef (ValuesDataFactory .IDENTIFIER , "Value Sink" , sinkConfig );
1201+
1202+ // Setup pipeline
1203+ Configuration pipelineConfig = new Configuration ();
1204+ pipelineConfig .set (PipelineOptions .PIPELINE_PARALLELISM , 1 );
1205+ PipelineDef pipelineDef =
1206+ new PipelineDef (
1207+ sourceDef ,
1208+ sinkDef ,
1209+ Collections .emptyList (),
1210+ Collections .singletonList (
1211+ new TransformDef (
1212+ "default_namespace.default_schema.\\ .*" ,
1213+ "id, parse_json(message) as variantVal, parse_json(duplicatedMessage, true) as duplicatedVariantVal, try_parse_json(message) as variantVal2, try_parse_json(duplicatedMessage, true) as duplicatedVariantVal2, try_parse_json(invalidMessage) as invalidVariantVal" ,
1214+ null ,
1215+ null ,
1216+ null ,
1217+ null ,
1218+ null ,
1219+ null )),
1220+ Collections .emptyList (),
1221+ pipelineConfig );
1222+
1223+ // Execute the pipeline
1224+ PipelineExecution execution = composer .compose (pipelineDef );
1225+ execution .execute ();
1226+
1227+ // Check the order and content of all received events
1228+ String [] outputEvents = outCaptor .toString ().trim ().split ("\n " );
1229+
1230+ Assertions .assertThat (outputEvents )
1231+ .containsExactly (
1232+ "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`variantVal` VARIANT,`duplicatedVariantVal` VARIANT,`variantVal2` VARIANT,`duplicatedVariantVal2` VARIANT,`invalidVariantVal` VARIANT}, primaryKeys=id, options=()}" ,
1233+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, {\" address\" :{\" city\" :\" Beijing\" ,\" street\" :\" MainSt\" ,\" zip\" :\" 100000\" },\" age\" :30,\" email\" :\" zhangsan@example.com\" ,\" hobbies\" :[\" reading\" ,\" coding\" ,\" traveling\" ],\" is_active\" :true,\" name\" :\" Bob\" }, {\" address\" :{\" city\" :\" Beijing\" ,\" street\" :\" MainSt\" ,\" zip\" :\" 100000\" },\" age\" :30,\" email\" :\" zhangsan@example.com\" ,\" hobbies\" :[\" reading\" ,\" coding\" ,\" traveling\" ],\" is_active\" :true,\" name\" :\" Bob\" }, {\" address\" :{\" city\" :\" Beijing\" ,\" street\" :\" MainSt\" ,\" zip\" :\" 100000\" },\" age\" :30,\" email\" :\" zhangsan@example.com\" ,\" hobbies\" :[\" reading\" ,\" coding\" ,\" traveling\" ],\" is_active\" :true,\" name\" :\" Bob\" }, {\" address\" :{\" city\" :\" Beijing\" ,\" street\" :\" MainSt\" ,\" zip\" :\" 100000\" },\" age\" :30,\" email\" :\" zhangsan@example.com\" ,\" hobbies\" :[\" reading\" ,\" coding\" ,\" traveling\" ],\" is_active\" :true,\" name\" :\" Bob\" }, null], op=INSERT, meta=()}" ,
1234+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, {\" address\" :{\" city\" :\" Beijing\" ,\" street\" :\" MainSt\" ,\" zip\" :\" 100000\" },\" age\" :40,\" email\" :\" lisi@example.com\" ,\" hobbies\" :[\" reading\" ,\" coding\" ,\" traveling\" ],\" is_active\" :true,\" name\" :\" Mark\" }, {\" address\" :{\" city\" :\" Beijing\" ,\" street\" :\" MainSt\" ,\" zip\" :\" 100000\" },\" age\" :40,\" email\" :\" lisi@example.com\" ,\" hobbies\" :[\" reading\" ,\" coding\" ,\" traveling\" ],\" is_active\" :true,\" name\" :\" Mark\" }, {\" address\" :{\" city\" :\" Beijing\" ,\" street\" :\" MainSt\" ,\" zip\" :\" 100000\" },\" age\" :40,\" email\" :\" lisi@example.com\" ,\" hobbies\" :[\" reading\" ,\" coding\" ,\" traveling\" ],\" is_active\" :true,\" name\" :\" Mark\" }, {\" address\" :{\" city\" :\" Beijing\" ,\" street\" :\" MainSt\" ,\" zip\" :\" 100000\" },\" age\" :40,\" email\" :\" lisi@example.com\" ,\" hobbies\" :[\" reading\" ,\" coding\" ,\" traveling\" ],\" is_active\" :true,\" name\" :\" Mark\" }, null], op=INSERT, meta=()}" ,
1235+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, null, null, null, null, null], op=INSERT, meta=()}" );
1236+ }
1237+
11401238 @ ParameterizedTest
11411239 @ EnumSource
11421240 void testTransformMergingIncompatibleRules (ValuesDataSink .SinkApi apiVersion ) {
0 commit comments