|
18 | 18 |
|
19 | 19 | package com.dtstack.chunjun.connector.ftp.table; |
20 | 20 |
|
| 21 | +import com.dtstack.chunjun.conf.FieldConf; |
21 | 22 | import com.dtstack.chunjun.connector.ftp.conf.FtpConfig; |
22 | 23 | import com.dtstack.chunjun.connector.ftp.options.FtpOptions; |
23 | 24 | import com.dtstack.chunjun.connector.ftp.sink.FtpDynamicTableSink; |
|
38 | 39 | import org.apache.flink.table.factories.DynamicTableSourceFactory; |
39 | 40 | import org.apache.flink.table.factories.FactoryUtil; |
40 | 41 | import org.apache.flink.table.factories.SerializationFormatFactory; |
| 42 | +import org.apache.flink.table.types.logical.RowType; |
41 | 43 | import org.apache.flink.table.utils.TableSchemaUtils; |
42 | 44 |
|
| 45 | +import java.util.ArrayList; |
43 | 46 | import java.util.HashSet; |
| 47 | +import java.util.List; |
44 | 48 | import java.util.Set; |
45 | 49 |
|
46 | 50 | /** |
@@ -94,7 +98,18 @@ public DynamicTableSource createDynamicTableSource(Context context) { |
94 | 98 | helper.discoverDecodingFormat( |
95 | 99 | DeserializationFormatFactory.class, FtpOptions.FORMAT); |
96 | 100 |
|
| 101 | + RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); |
97 | 102 | FtpConfig ftpConfig = getFtpConfByOptions(config); |
| 103 | + String[] fieldNames = physicalSchema.getFieldNames(); |
| 104 | + List<FieldConf> columnList = new ArrayList<>(fieldNames.length); |
| 105 | + for (int i = 0; i < fieldNames.length; i++) { |
| 106 | + FieldConf field = new FieldConf(); |
| 107 | + field.setName(fieldNames[i]); |
| 108 | + field.setType(rowType.getTypeAt(i).asSummaryString()); |
| 109 | + field.setIndex(i); |
| 110 | + columnList.add(field); |
| 111 | + } |
| 112 | + ftpConfig.setColumn(columnList); |
98 | 113 |
|
99 | 114 | return new FtpDynamicTableSource(physicalSchema, ftpConfig, decodingFormat); |
100 | 115 | } |
|
0 commit comments