1818
1919package org .apache .paimon .flink .action .cdc .format .aliyun ;
2020
21+ import org .apache .paimon .data .BinaryString ;
2122import org .apache .paimon .flink .action .cdc .CdcSourceRecord ;
23+ import org .apache .paimon .flink .action .cdc .ComputedColumn ;
24+ import org .apache .paimon .flink .action .cdc .ComputedColumnUtils ;
2225import org .apache .paimon .flink .action .cdc .TypeMapping ;
2326import org .apache .paimon .flink .action .cdc .kafka .KafkaActionITCaseBase ;
2427import org .apache .paimon .flink .action .cdc .watermark .MessageQueueCdcTimestampExtractor ;
2528import org .apache .paimon .flink .sink .cdc .CdcRecord ;
2629import org .apache .paimon .flink .sink .cdc .RichCdcMultiplexRecord ;
2730import org .apache .paimon .schema .Schema ;
2831import org .apache .paimon .types .RowKind ;
32+ import org .apache .paimon .utils .BinaryStringUtils ;
2933
3034import org .apache .paimon .shade .jackson2 .com .fasterxml .jackson .databind .JsonNode ;
3135import org .apache .paimon .shade .jackson2 .com .fasterxml .jackson .databind .ObjectMapper ;
4347import java .util .Arrays ;
4448import java .util .Collections ;
4549import java .util .List ;
50+ import java .util .Map ;
4651
4752/** Test for AliyunJsonRecordParser. */
4853public class AliyunJsonRecordParserTest extends KafkaActionITCaseBase {
@@ -51,14 +56,20 @@ public class AliyunJsonRecordParserTest extends KafkaActionITCaseBase {
5156 private static List <String > insertList = new ArrayList <>();
5257 private static List <String > updateList = new ArrayList <>();
5358 private static List <String > deleteList = new ArrayList <>();
59+ private static List <ComputedColumn > computedColumns = new ArrayList <>();
5460
5561 private static ObjectMapper objMapper = new ObjectMapper ();
5662
63+ String dateTimeRegex = "\\ d{4}-\\ d{2}-\\ d{2}T\\ d{2}:\\ d{2}:\\ d{2}\\ .\\ d{3}" ;
64+
5765 @ Before
5866 public void setup () {
5967 String insertRes = "kafka/aliyun/table/event/event-insert.txt" ;
6068 String updateRes = "kafka/aliyun/table/event/event-update-in-one.txt" ;
6169 String deleteRes = "kafka/aliyun/table/event/event-delete.txt" ;
70+
71+ String [] computedColumnArgs = {"etl_create_time=now()" , "etl_update_time=now()" };
72+
6273 URL url ;
6374 try {
6475 url = AliyunJsonRecordParserTest .class .getClassLoader ().getResource (insertRes );
@@ -76,17 +87,22 @@ public void setup() {
7687 .filter (this ::isRecordLine )
7788 .forEach (e -> deleteList .add (e ));
7889
90+ computedColumns =
91+ ComputedColumnUtils .buildComputedColumns (
92+ Arrays .asList (computedColumnArgs ), Collections .emptyList ());
93+
7994 } catch (Exception e ) {
8095 log .error ("Fail to init aliyun-json cases" , e );
8196 }
8297 }
8398
8499 @ Test
85100 public void extractInsertRecord () throws Exception {
101+
86102 AliyunRecordParser parser =
87- new AliyunRecordParser (TypeMapping .defaultMapping (), Collections . emptyList () );
103+ new AliyunRecordParser (TypeMapping .defaultMapping (), computedColumns );
88104 for (String json : insertList ) {
89- // 将json解析为JsonNode对象
105+
90106 JsonNode rootNode = objMapper .readValue (json , JsonNode .class );
91107 CdcSourceRecord cdcRecord = new CdcSourceRecord (rootNode );
92108 Schema schema = parser .buildSchema (cdcRecord );
@@ -106,15 +122,31 @@ public void extractInsertRecord() throws Exception {
106122
107123 MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor ();
108124 Assert .assertTrue (extractor .extractTimestamp (cdcRecord ) > 0 );
125+
126+ Map <String , String > data = records .get (0 ).toRichCdcRecord ().toCdcRecord ().data ();
127+ String createTime = data .get ("etl_create_time" );
128+ String updateTime = data .get ("etl_update_time" );
129+
130+ // Mock the real timestamp string which retrieved from store and convert through paimon
131+ // Timestamp
132+ createTime =
133+ BinaryStringUtils .toTimestamp (BinaryString .fromString (createTime ), 6 )
134+ .toString ();
135+ updateTime =
136+ BinaryStringUtils .toTimestamp (BinaryString .fromString (updateTime ), 6 )
137+ .toString ();
138+
139+ Assert .assertTrue (createTime .matches (dateTimeRegex ));
140+ Assert .assertTrue (updateTime .matches (dateTimeRegex ));
109141 }
110142 }
111143
112144 @ Test
113145 public void extractUpdateRecord () throws Exception {
114146 AliyunRecordParser parser =
115- new AliyunRecordParser (TypeMapping .defaultMapping (), Collections . emptyList () );
147+ new AliyunRecordParser (TypeMapping .defaultMapping (), computedColumns );
116148 for (String json : updateList ) {
117- // 将json解析为JsonNode对象
149+
118150 JsonNode jsonNode = objMapper .readValue (json , JsonNode .class );
119151 CdcSourceRecord cdcRecord = new CdcSourceRecord (jsonNode );
120152 Schema schema = parser .buildSchema (cdcRecord );
@@ -134,15 +166,26 @@ public void extractUpdateRecord() throws Exception {
134166
135167 MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor ();
136168 Assert .assertTrue (extractor .extractTimestamp (cdcRecord ) > 0 );
169+
170+ Map <String , String > data = records .get (0 ).toRichCdcRecord ().toCdcRecord ().data ();
171+ String createTime = data .get ("etl_create_time" );
172+ String updateTime = data .get ("etl_update_time" );
173+ Assert .assertNotNull (createTime );
174+
175+ updateTime =
176+ BinaryStringUtils .toTimestamp (BinaryString .fromString (updateTime ), 6 )
177+ .toString ();
178+
179+ Assert .assertTrue (updateTime .matches (dateTimeRegex ));
137180 }
138181 }
139182
140183 @ Test
141184 public void extractDeleteRecord () throws Exception {
142185 AliyunRecordParser parser =
143- new AliyunRecordParser (TypeMapping .defaultMapping (), Collections . emptyList () );
186+ new AliyunRecordParser (TypeMapping .defaultMapping (), computedColumns );
144187 for (String json : deleteList ) {
145- // 将json解析为JsonNode对象
188+
146189 JsonNode jsonNode = objMapper .readValue (json , JsonNode .class );
147190 CdcSourceRecord cdcRecord = new CdcSourceRecord (jsonNode );
148191 Schema schema = parser .buildSchema (cdcRecord );
@@ -162,6 +205,17 @@ public void extractDeleteRecord() throws Exception {
162205
163206 MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor ();
164207 Assert .assertTrue (extractor .extractTimestamp (cdcRecord ) > 0 );
208+
209+ Map <String , String > data = records .get (0 ).toRichCdcRecord ().toCdcRecord ().data ();
210+ String createTime = data .get ("etl_create_time" );
211+ String updateTime = data .get ("etl_update_time" );
212+ Assert .assertNotNull (createTime );
213+
214+ updateTime =
215+ BinaryStringUtils .toTimestamp (BinaryString .fromString (updateTime ), 6 )
216+ .toString ();
217+
218+ Assert .assertTrue (updateTime .matches (dateTimeRegex ));
165219 }
166220 }
167221}
0 commit comments