4747import com .dtstack .chunjun .util .MapUtil ;
4848
4949import org .apache .flink .table .data .RowData ;
50- import org .apache .flink .table .types .logical .RowType ;
5150import org .apache .flink .util .CollectionUtil ;
5251
5352import org .apache .commons .collections .CollectionUtils ;
7170
7271import static com .dtstack .chunjun .connector .kafka .option .KafkaOptions .DEFAULT_CODEC ;
7372
73+ /**
74+ * @author chuixue
75+ * @create 2021-06-07 15:51
76+ * @description
77+ */
7478public class KafkaSyncConverter
7579 extends AbstractRowConverter <ConsumerRecord <byte [], byte []>, Object , byte [], String > {
7680
7781 /** source kafka msg decode */
78- private final IDecode decode ;
82+ protected final IDecode decode ;
83+ /** sink json Decoder */
84+ protected final JsonDecoder jsonDecoder ;
7985 /** kafka Conf */
80- private final KafkaConfig kafkaConfig ;
86+ protected final KafkaConfig kafkaConfig ;
8187 /** kafka sink out fields */
82- private List <String > outList ;
88+ protected List <String > outList ;
8389
84- public KafkaSyncConverter (RowType rowType , KafkaConfig kafkaConfig , List <String > keyTypeList ) {
85- super (rowType , kafkaConfig );
90+ public KafkaSyncConverter (KafkaConfig kafkaConfig , List <String > keyTypeList ) {
91+ super (null , kafkaConfig );
8692 this .kafkaConfig = kafkaConfig ;
8793 this .outList = keyTypeList ;
94+ this .jsonDecoder = new JsonDecoder (kafkaConfig .isAddMessage ());
8895 if (DEFAULT_CODEC .defaultValue ().equals (kafkaConfig .getCodec ())) {
8996 this .decode = new JsonDecoder (kafkaConfig .isAddMessage ());
9097 } else {
9198 this .decode = new TextDecoder ();
9299 }
93100 }
94101
95- public KafkaSyncConverter (RowType rowType , KafkaConfig kafkaConfig ) {
96- super (rowType , kafkaConfig );
102+ public KafkaSyncConverter (KafkaConfig kafkaConfig ) {
103+ super (null , kafkaConfig );
97104 this .commonConfig = this .kafkaConfig = kafkaConfig ;
105+ this .jsonDecoder = new JsonDecoder (kafkaConfig .isAddMessage ());
98106 if (DEFAULT_CODEC .defaultValue ().equals (kafkaConfig .getCodec ())) {
99107 this .decode = new JsonDecoder (kafkaConfig .isAddMessage ());
100108 } else {
@@ -162,18 +170,23 @@ public RowData toInternal(ConsumerRecord<byte[], byte[]> input) throws Exception
162170 result = new ColumnRowData (fieldConfList .size ());
163171 }
164172 for (int i = 0 ; i < fieldConfList .size (); i ++) {
165- FieldConfig fieldConfig = fieldConfList .get (i );
166- Object value = map .get (fieldConfig .getName ());
173+ FieldConfig fieldConf = fieldConfList .get (i );
174+ Object value = map .get (fieldConf .getName ());
167175 AbstractBaseColumn baseColumn =
168176 (AbstractBaseColumn ) toInternalConverters .get (i ).deserialize (value );
169- result .addField (assembleFieldProps (fieldConfig , baseColumn ));
177+ result .addField (assembleFieldProps (fieldConf , baseColumn ));
170178 }
171179 }
172180 return result ;
173181 }
174182
175183 @ Override
176184 public byte [] toExternal (RowData rowData , byte [] output ) throws Exception {
185+ Map <String , Object > map = getExternalMap (rowData );
186+ return MapUtil .writeValueAsString (map ).getBytes (StandardCharsets .UTF_8 );
187+ }
188+
189+ protected Map <String , Object > getExternalMap (RowData rowData ) {
177190 Map <String , Object > map ;
178191 int arity = rowData .getArity ();
179192 ColumnRowData row = (ColumnRowData ) rowData ;
@@ -187,22 +200,62 @@ public byte[] toExternal(RowData rowData, byte[] output) throws Exception {
187200 Object value ;
188201 if (object instanceof TimestampColumn ) {
189202 value = ((TimestampColumn ) object ).asTimestampStr ();
203+ } else if (row .getField (i ).getData () == null ) {
204+ value = null ;
190205 } else {
191- value = org . apache . flink . util . StringUtils . arrayAwareToString ( row .getField (i ));
206+ value = row .getField (i ). asString ( );
192207 }
193208 map .put (kafkaConfig .getTableFields ().get (i ), value );
194209 }
210+
211+ // get partition key value
212+ if (!CollectionUtil .isNullOrEmpty (outList )) {
213+ Map <String , Object > keyPartitionMap = new LinkedHashMap <>((arity << 2 ) / 3 );
214+ for (Map .Entry <String , Object > entry : map .entrySet ()) {
215+ if (outList .contains (entry .getKey ())) {
216+ keyPartitionMap .put (entry .getKey (), entry .getValue ());
217+ }
218+ }
219+ map = keyPartitionMap ;
220+ }
195221 } else {
196222 String [] headers = row .getHeaders ();
197223 if (Objects .nonNull (headers ) && headers .length >= 1 ) {
198224 // cdc
199225 map = new HashMap <>(headers .length >> 1 );
200226 for (String header : headers ) {
201227 AbstractBaseColumn val = row .getField (header );
202- if (null == val ) {
228+ if (null == val || val instanceof NullColumn ) {
203229 map .put (header , null );
204230 } else {
205- map .put (header , val .getData ());
231+ // Timestamp需要转为yyyy-MM-dd hh:mm:ss.SSSSSS格式
232+ if (val instanceof TimestampColumn ) {
233+ map .put (header , timeStampTostringBynacosPrecision (val .asTimestamp ()));
234+ } else if (val instanceof MapColumn ) {
235+ Object data = val .getData ();
236+ if (data instanceof Map ) {
237+ Map <String , Object > maps = (Map <String , Object >) data ;
238+ LinkedHashMap <String , Object > datas = new LinkedHashMap <>();
239+ maps .forEach (
240+ (k , v ) -> {
241+ if (v instanceof Timestamp ) {
242+ datas .put (
243+ k ,
244+ timeStampTostringBynacosPrecision (
245+ (Timestamp ) v ));
246+ } else {
247+ datas .put (k , v );
248+ }
249+ });
250+ map .put (header , datas );
251+ } else {
252+ throw new RuntimeException (
253+ "MapColumn data is not Map,map column data type is "
254+ + data .getClass ());
255+ }
256+ } else {
257+ map .put (header , val .getData ());
258+ }
206259 }
207260 }
208261 if (Arrays .stream (headers )
@@ -227,19 +280,7 @@ public byte[] toExternal(RowData rowData, byte[] output) throws Exception {
227280 map = decode .decode (String .join ("," , values ));
228281 }
229282 }
230-
231- // get partition key value
232- if (!CollectionUtil .isNullOrEmpty (outList )) {
233- Map <String , Object > keyPartitionMap = new LinkedHashMap <>((arity << 2 ) / 3 );
234- for (Map .Entry <String , Object > entry : map .entrySet ()) {
235- if (outList .contains (entry .getKey ())) {
236- keyPartitionMap .put (entry .getKey (), entry .getValue ());
237- }
238- }
239- map = keyPartitionMap ;
240- }
241-
242- return MapUtil .writeValueAsString (map ).getBytes (StandardCharsets .UTF_8 );
283+ return map ;
243284 }
244285
245286 @ Override
@@ -300,4 +341,12 @@ protected IDeserializationConverter createInternalConverter(String type) {
300341 throw new UnsupportedOperationException ("Unsupported type:" + type );
301342 }
302343 }
344+
345+ public String timeStampTostringBynacosPrecision (Timestamp t ) {
346+ if (t .getNanos () == 0 ) {
347+ return new TimestampColumn (t , 0 ).asTimestampStr ();
348+ } else {
349+ return t .toString ();
350+ }
351+ }
303352}
0 commit comments