Skip to content

Commit b5e6b08

Browse files
shitouFlechazoW
authored andcommitted
[feat-875][doris] postgresql sync support array type.
1 parent a02eef9 commit b5e6b08

File tree

9 files changed

+382
-1
lines changed

9 files changed

+382
-1
lines changed

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcColumnConverter.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.chunjun.conf.ChunJunCommonConf;
2222
import com.dtstack.chunjun.conf.FieldConf;
2323
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
24+
import com.dtstack.chunjun.constants.ConstantValue;
2425
import com.dtstack.chunjun.converter.AbstractRowConverter;
2526
import com.dtstack.chunjun.converter.IDeserializationConverter;
2627
import com.dtstack.chunjun.converter.ISerializationConverter;
@@ -88,12 +89,25 @@ public JdbcColumnConverter(RowType rowType, ChunJunCommonConf commonConf) {
8889
@SuppressWarnings("unchecked")
8990
public RowData toInternal(ResultSet resultSet) throws Exception {
9091
List<FieldConf> fieldConfList = commonConf.getColumn();
91-
ColumnRowData result = new ColumnRowData(fieldConfList.size());
92+
ColumnRowData result;
93+
if (fieldConfList.size() == 1
94+
&& ConstantValue.STAR_SYMBOL.equals(fieldConfList.get(0).getName())) {
95+
result = new ColumnRowData(fieldTypes.length);
96+
for (int index = 0; index < fieldTypes.length; index++) {
97+
Object field = resultSet.getObject(index + 1);
98+
AbstractBaseColumn baseColumn =
99+
(AbstractBaseColumn) toInternalConverters.get(index).deserialize(field);
100+
result.addField(baseColumn);
101+
}
102+
return result;
103+
}
92104
int converterIndex = 0;
105+
result = new ColumnRowData(fieldConfList.size());
93106
for (FieldConf fieldConf : fieldConfList) {
94107
AbstractBaseColumn baseColumn = null;
95108
if (StringUtils.isBlank(fieldConf.getValue())) {
96109
Object field = resultSet.getObject(converterIndex + 1);
110+
97111
baseColumn =
98112
(AbstractBaseColumn)
99113
toInternalConverters.get(converterIndex).deserialize(field);

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.io.InputStream;
3939
import java.io.Reader;
4040
import java.math.BigDecimal;
41+
import java.sql.Array;
4142
import java.sql.Connection;
4243
import java.sql.Date;
4344
import java.sql.ResultSet;
@@ -354,6 +355,11 @@ public void setClob(int fieldIndex, Reader reader) throws SQLException {
354355
currentFieldNamedPstmt.setClob(fieldIndex, reader);
355356
}
356357

358+
@Override
359+
public void setArray(int fieldIndex, Array array) throws SQLException {
360+
currentFieldNamedPstmt.setArray(fieldIndex, array);
361+
}
362+
357363
@Override
358364
public void close() throws SQLException {
359365
currentFieldNamedPstmt.close();

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatement.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.InputStream;
2121
import java.io.Reader;
2222
import java.math.BigDecimal;
23+
import java.sql.Array;
2324
import java.sql.Connection;
2425
import java.sql.Date;
2526
import java.sql.PreparedStatement;
@@ -250,6 +251,9 @@ static FieldNamedPreparedStatement prepareStatement(
250251
void setBlob(int fieldIndex, InputStream is) throws SQLException;
251252

252253
void setClob(int fieldIndex, Reader reader) throws SQLException;
254+
255+
void setArray(int fieldIndex, Array array) throws SQLException;
256+
253257
/**
254258
* Releases this <code>Statement</code> object's database and JDBC resources immediately instead
255259
* of waiting for this to happen when it is automatically closed. It is generally good practice

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.InputStream;
2121
import java.io.Reader;
2222
import java.math.BigDecimal;
23+
import java.sql.Array;
2324
import java.sql.Connection;
2425
import java.sql.Date;
2526
import java.sql.PreparedStatement;
@@ -258,6 +259,13 @@ public void setClob(int fieldIndex, Reader reader) throws SQLException {
258259
}
259260
}
260261

262+
@Override
263+
public void setArray(int fieldIndex, Array array) throws SQLException {
264+
for (int index : indexMapping[fieldIndex]) {
265+
statement.setArray(index, array);
266+
}
267+
}
268+
261269
@Override
262270
public void close() throws SQLException {
263271
statement.close();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
*
3+
* *
4+
* * * Licensed to the Apache Software Foundation (ASF) under one
5+
* * * or more contributor license agreements. See the NOTICE file
6+
* * * distributed with this work for additional information
7+
* * * regarding copyright ownership. The ASF licenses this file
8+
* * * to you under the Apache License, Version 2.0 (the
9+
* * * "License"); you may not use this file except in compliance
10+
* * * with the License. You may obtain a copy of the License at
11+
* * *
12+
* * * http://www.apache.org/licenses/LICENSE-2.0
13+
* * *
14+
* * * Unless required by applicable law or agreed to in writing, software
15+
* * * distributed under the License is distributed on an "AS IS" BASIS,
16+
* * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* * * See the License for the specific language governing permissions and
18+
* * * limitations under the License.
19+
* *
20+
*
21+
*/
22+
23+
package com.dtstack.chunjun.connector.postgresql.converter;
24+
25+
import com.dtstack.chunjun.conf.ChunJunCommonConf;
26+
import com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter;
27+
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
28+
import com.dtstack.chunjun.converter.IDeserializationConverter;
29+
import com.dtstack.chunjun.converter.ISerializationConverter;
30+
import com.dtstack.chunjun.element.AbstractBaseColumn;
31+
import com.dtstack.chunjun.element.ColumnRowData;
32+
import com.dtstack.chunjun.element.column.ArrayColumn;
33+
import com.dtstack.chunjun.element.column.BigDecimalColumn;
34+
import com.dtstack.chunjun.element.column.BooleanColumn;
35+
import com.dtstack.chunjun.element.column.BytesColumn;
36+
import com.dtstack.chunjun.element.column.SqlDateColumn;
37+
import com.dtstack.chunjun.element.column.StringColumn;
38+
import com.dtstack.chunjun.element.column.TimeColumn;
39+
import com.dtstack.chunjun.element.column.TimestampColumn;
40+
41+
import org.apache.flink.table.data.RowData;
42+
import org.apache.flink.table.types.logical.LogicalType;
43+
import org.apache.flink.table.types.logical.RowType;
44+
import org.apache.flink.table.types.logical.TimestampType;
45+
import org.apache.flink.table.types.logical.YearMonthIntervalType;
46+
47+
import org.postgresql.core.BaseConnection;
48+
import org.postgresql.core.Oid;
49+
import org.postgresql.jdbc.PgArray;
50+
51+
import java.math.BigDecimal;
52+
import java.sql.Array;
53+
import java.sql.Date;
54+
import java.sql.Time;
55+
import java.sql.Timestamp;
56+
import java.util.HashMap;
57+
import java.util.List;
58+
import java.util.Map;
59+
60+
/**
61+
* Company:www.dtstack.com.
62+
*
63+
* @author shitou
64+
* @date 2022/4/14
65+
*/
66+
public class PostgresqlColumnConverter extends JdbcColumnConverter {
67+
68+
private List<String> fieldTypeList;
69+
private transient BaseConnection connection;
70+
private static final Map<String, Integer> arrayType = new HashMap<>();
71+
72+
public PostgresqlColumnConverter(RowType rowType, ChunJunCommonConf commonConf) {
73+
super(rowType, commonConf);
74+
}
75+
76+
static {
77+
arrayType.put("_int4", Oid.INT4_ARRAY);
78+
arrayType.put("_int8", Oid.INT8_ARRAY);
79+
arrayType.put("_float4", Oid.FLOAT4_ARRAY);
80+
arrayType.put("_text", Oid.TEXT_ARRAY);
81+
}
82+
83+
@Override
84+
public FieldNamedPreparedStatement toExternal(
85+
RowData rowData, FieldNamedPreparedStatement statement) throws Exception {
86+
for (int index = 0; index < rowData.getArity(); index++) {
87+
if (arrayType.containsKey(fieldTypeList.get(index))) {
88+
// eg: {1000,1000,10001}、{{1000,1000,10001},{1,2,3}}
89+
String field = ((ColumnRowData) rowData).getField(index).asString();
90+
Array array =
91+
new PgArray(connection, arrayType.get(fieldTypeList.get(index)), field);
92+
AbstractBaseColumn arrayColumn = new ArrayColumn(array);
93+
((ColumnRowData) rowData).setField(index, arrayColumn);
94+
}
95+
toExternalConverters.get(index).serialize(rowData, index, statement);
96+
}
97+
return statement;
98+
}
99+
100+
@Override
101+
protected IDeserializationConverter createInternalConverter(LogicalType type) {
102+
switch (type.getTypeRoot()) {
103+
case BOOLEAN:
104+
return val -> new BooleanColumn(Boolean.parseBoolean(val.toString()));
105+
case TINYINT:
106+
return val -> new BigDecimalColumn(((Integer) val).byteValue());
107+
case SMALLINT:
108+
case INTEGER:
109+
return val -> new BigDecimalColumn((Integer) val);
110+
case INTERVAL_YEAR_MONTH:
111+
return (IDeserializationConverter<Object, AbstractBaseColumn>)
112+
val -> {
113+
YearMonthIntervalType yearMonthIntervalType =
114+
(YearMonthIntervalType) type;
115+
switch (yearMonthIntervalType.getResolution()) {
116+
case YEAR:
117+
return new BigDecimalColumn(
118+
Integer.parseInt(String.valueOf(val).substring(0, 4)));
119+
case MONTH:
120+
case YEAR_TO_MONTH:
121+
default:
122+
throw new UnsupportedOperationException(
123+
"jdbc converter only support YEAR");
124+
}
125+
};
126+
case FLOAT:
127+
return val -> new BigDecimalColumn((Float) val);
128+
case DOUBLE:
129+
return val -> new BigDecimalColumn((Double) val);
130+
case BIGINT:
131+
return val -> new BigDecimalColumn((Long) val);
132+
case DECIMAL:
133+
return val -> new BigDecimalColumn((BigDecimal) val);
134+
case CHAR:
135+
case VARCHAR:
136+
return val -> new StringColumn((String) val);
137+
case DATE:
138+
return val -> new SqlDateColumn((Date) val);
139+
case TIME_WITHOUT_TIME_ZONE:
140+
return val -> new TimeColumn((Time) val);
141+
case TIMESTAMP_WITH_TIME_ZONE:
142+
case TIMESTAMP_WITHOUT_TIME_ZONE:
143+
return (IDeserializationConverter<Object, AbstractBaseColumn>)
144+
val ->
145+
new TimestampColumn(
146+
(Timestamp) val, ((TimestampType) (type)).getPrecision());
147+
148+
case BINARY:
149+
case VARBINARY:
150+
return val -> new BytesColumn((byte[]) val);
151+
case ARRAY:
152+
// integer[] -> {1000,1000,10001}
153+
// integer[][]-> {{1000,1000,10001},{1,2,3}}
154+
return val -> new StringColumn(((Array) val).toString());
155+
default:
156+
throw new UnsupportedOperationException("Unsupported type:" + type);
157+
}
158+
}
159+
160+
@Override
161+
protected ISerializationConverter<FieldNamedPreparedStatement> createExternalConverter(
162+
LogicalType type) {
163+
switch (type.getTypeRoot()) {
164+
case BOOLEAN:
165+
return (val, index, statement) ->
166+
statement.setBoolean(
167+
index, ((ColumnRowData) val).getField(index).asBoolean());
168+
case TINYINT:
169+
return (val, index, statement) -> statement.setByte(index, val.getByte(index));
170+
case SMALLINT:
171+
case INTEGER:
172+
case INTERVAL_YEAR_MONTH:
173+
return (val, index, statement) ->
174+
statement.setInt(index, ((ColumnRowData) val).getField(index).asYearInt());
175+
case FLOAT:
176+
return (val, index, statement) ->
177+
statement.setFloat(index, ((ColumnRowData) val).getField(index).asFloat());
178+
case DOUBLE:
179+
return (val, index, statement) ->
180+
statement.setDouble(
181+
index, ((ColumnRowData) val).getField(index).asDouble());
182+
183+
case BIGINT:
184+
return (val, index, statement) ->
185+
statement.setLong(index, ((ColumnRowData) val).getField(index).asLong());
186+
case DECIMAL:
187+
return (val, index, statement) ->
188+
statement.setBigDecimal(
189+
index, ((ColumnRowData) val).getField(index).asBigDecimal());
190+
case CHAR:
191+
case VARCHAR:
192+
return (val, index, statement) ->
193+
statement.setString(
194+
index, ((ColumnRowData) val).getField(index).asString());
195+
case DATE:
196+
return (val, index, statement) ->
197+
statement.setDate(index, ((ColumnRowData) val).getField(index).asSqlDate());
198+
case TIME_WITHOUT_TIME_ZONE:
199+
return (val, index, statement) ->
200+
statement.setTime(index, ((ColumnRowData) val).getField(index).asTime());
201+
case TIMESTAMP_WITH_TIME_ZONE:
202+
case TIMESTAMP_WITHOUT_TIME_ZONE:
203+
return (val, index, statement) ->
204+
statement.setTimestamp(
205+
index, ((ColumnRowData) val).getField(index).asTimestamp());
206+
207+
case BINARY:
208+
case VARBINARY:
209+
return (val, index, statement) ->
210+
statement.setBytes(index, ((ColumnRowData) val).getField(index).asBytes());
211+
case ARRAY:
212+
return (val, index, statement) ->
213+
statement.setArray(
214+
index, (Array) ((ColumnRowData) val).getField(index).getData());
215+
default:
216+
throw new UnsupportedOperationException("Unsupported type:" + type);
217+
}
218+
}
219+
220+
public void setFieldTypeList(List<String> fieldTypeList) {
221+
this.fieldTypeList = fieldTypeList;
222+
}
223+
224+
public void setConnection(BaseConnection connection) {
225+
this.connection = connection;
226+
}
227+
}

chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/converter/PostgresqlRawTypeConverter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121
import com.dtstack.chunjun.throwable.UnsupportedTypeException;
2222

2323
import org.apache.flink.table.api.DataTypes;
24+
import org.apache.flink.table.types.AtomicDataType;
2425
import org.apache.flink.table.types.DataType;
26+
import org.apache.flink.table.types.logical.FloatType;
27+
import org.apache.flink.table.types.logical.IntType;
28+
import org.apache.flink.table.types.logical.VarCharType;
2529

2630
import java.util.Locale;
2731

@@ -95,6 +99,13 @@ public static DataType apply(String type) {
9599
case "BOOLEAN":
96100
case "BOOL":
97101
return DataTypes.BOOLEAN();
102+
case "_INT4":
103+
case "_INT8":
104+
return DataTypes.ARRAY(new AtomicDataType(new IntType()));
105+
case "_TEXT":
106+
return DataTypes.ARRAY(new AtomicDataType(new VarCharType()));
107+
case "_FLOAT4":
108+
return DataTypes.ARRAY(new AtomicDataType(new FloatType()));
98109

99110
// 以下类型无法支持
100111
// Enumerated Types

chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,22 @@
1818

1919
package com.dtstack.chunjun.connector.postgresql.dialect;
2020

21+
import com.dtstack.chunjun.conf.ChunJunCommonConf;
2122
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
23+
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
2224
import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
25+
import com.dtstack.chunjun.connector.postgresql.converter.PostgresqlColumnConverter;
2326
import com.dtstack.chunjun.connector.postgresql.converter.PostgresqlRawTypeConverter;
27+
import com.dtstack.chunjun.converter.AbstractRowConverter;
2428
import com.dtstack.chunjun.converter.RawTypeConverter;
2529

30+
import org.apache.flink.table.types.logical.LogicalType;
31+
import org.apache.flink.table.types.logical.RowType;
32+
33+
import io.vertx.core.json.JsonArray;
2634
import org.apache.commons.lang3.StringUtils;
2735

36+
import java.sql.ResultSet;
2837
import java.util.Arrays;
2938
import java.util.Optional;
3039
import java.util.stream.Collectors;
@@ -58,6 +67,12 @@ public RawTypeConverter getRawTypeConverter() {
5867
return PostgresqlRawTypeConverter::apply;
5968
}
6069

70+
@Override
71+
public AbstractRowConverter<ResultSet, JsonArray, FieldNamedPreparedStatement, LogicalType>
72+
getColumnConverter(RowType rowType, ChunJunCommonConf commonConf) {
73+
return new PostgresqlColumnConverter(rowType, commonConf);
74+
}
75+
6176
@Override
6277
public Optional<String> defaultDriverName() {
6378
return Optional.of(DRIVER);

0 commit comments

Comments
 (0)