Skip to content

Commit afc7d51

Browse files
authored
[feat][jdbc]improvement jdbc read/write performance (#1488)
1 parent 8ff95f2 commit afc7d51

File tree

13 files changed

+595
-49
lines changed

13 files changed

+595
-49
lines changed

chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/dialect/GreenplumDialect.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public class GreenplumDialect extends PostgresqlDialect {
3333

3434
private static final String DIALECT_NAME = "Greenplum";
3535
private static final String DRIVER = "com.pivotal.jdbc.GreenplumDriver";
36-
private static final String URL_START = "jdbc:pivotal:greenplum:";
36+
public static final String URL_START = "jdbc:pivotal:greenplum:";
37+
public static final String DATABASE_NAME = ";DatabaseName=";
3738

3839
@Override
3940
public String dialectName() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.greenplum.sink;
20+
21+
import com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter;
22+
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat;
23+
import com.dtstack.chunjun.connector.postgresql.converter.PostgresqlColumnConverter;
24+
import com.dtstack.chunjun.connector.postgresql.dialect.PostgresqlDialect;
25+
import com.dtstack.chunjun.constants.ConstantValue;
26+
import com.dtstack.chunjun.element.ColumnRowData;
27+
import com.dtstack.chunjun.enums.EWriteMode;
28+
import com.dtstack.chunjun.throwable.NoRestartException;
29+
import com.dtstack.chunjun.throwable.WriteRecordException;
30+
31+
import org.apache.flink.streaming.api.CheckpointingMode;
32+
import org.apache.flink.table.data.RowData;
33+
34+
import org.apache.commons.lang3.math.NumberUtils;
35+
import org.postgresql.copy.CopyManager;
36+
import org.postgresql.core.BaseConnection;
37+
38+
import java.io.ByteArrayInputStream;
39+
import java.math.BigDecimal;
40+
import java.nio.charset.StandardCharsets;
41+
import java.sql.Connection;
42+
import java.sql.SQLException;
43+
44+
/**
45+
* @program: flinkx
46+
* @author: jier
47+
*/
48+
public class GreenplumOutputFormat extends JdbcOutputFormat {
49+
50+
// pg 字符串里含有\u0000 会报错 ERROR: invalid byte sequence for encoding "UTF8": 0x00
51+
public static final String SPACE = "\u0000";
52+
53+
private static final String LINE_DELIMITER = "\n";
54+
private CopyManager copyManager;
55+
private boolean disableCopyMode = false;
56+
private String copySql = "";
57+
public static final String INSERT_SQL_MODE_TYPE = "copy";
58+
private static final String DEFAULT_FIELD_DELIMITER = "\001";
59+
60+
private static final String DEFAULT_NULL_VALUE = "\002";
61+
62+
/** 数据源类型信息 * */
63+
private final String dbType = DbType.POSTGRESQL.name();
64+
65+
@Override
66+
protected void openInternal(int taskNumber, int numTasks) {
67+
super.openInternal(taskNumber, numTasks);
68+
try {
69+
// check is use copy mode for insert
70+
disableCopyMode =
71+
jdbcConf.getInsertSqlMode() != null
72+
&& !INSERT_SQL_MODE_TYPE.equalsIgnoreCase(jdbcConf.getInsertSqlMode());
73+
if (EWriteMode.INSERT.name().equalsIgnoreCase(jdbcConf.getMode()) && !disableCopyMode) {
74+
LOG.info("will use copy mode");
75+
copyManager = new CopyManager((BaseConnection) dbConn);
76+
77+
PostgresqlDialect pgDialect = (PostgresqlDialect) jdbcDialect;
78+
copySql =
79+
pgDialect.getCopyStatement(
80+
jdbcConf.getSchema(),
81+
jdbcConf.getTable(),
82+
columnNameList.toArray(new String[0]),
83+
DEFAULT_FIELD_DELIMITER,
84+
DEFAULT_NULL_VALUE);
85+
86+
LOG.info("write sql:{}", copySql);
87+
}
88+
checkUpsert();
89+
if (rowConverter instanceof PostgresqlColumnConverter
90+
&& dbConn instanceof BaseConnection) {
91+
((PostgresqlColumnConverter) rowConverter).setConnection((BaseConnection) dbConn);
92+
}
93+
} catch (SQLException sqe) {
94+
throw new IllegalArgumentException("checkUpsert() failed.", sqe);
95+
}
96+
}
97+
98+
@Override
99+
protected void writeSingleRecordInternal(RowData row) throws WriteRecordException {
100+
if (disableCopyMode) {
101+
super.writeSingleRecordInternal(row);
102+
} else {
103+
if (rowConverter instanceof JdbcColumnConverter) {
104+
ColumnRowData colRowData = (ColumnRowData) row;
105+
// write with copy
106+
int index = 0;
107+
try {
108+
StringBuilder rowStr = new StringBuilder();
109+
int lastIndex = row.getArity() - 1;
110+
for (; index < row.getArity(); index++) {
111+
appendColumn(colRowData, index, rowStr, index == lastIndex);
112+
}
113+
String rowVal = copyModeReplace(rowStr.toString());
114+
try (ByteArrayInputStream bi =
115+
new ByteArrayInputStream(rowVal.getBytes(StandardCharsets.UTF_8))) {
116+
copyManager.copyIn(copySql, bi);
117+
}
118+
} catch (Exception e) {
119+
processWriteException(e, index, row);
120+
}
121+
} else {
122+
throw new NoRestartException("copy mode only support data sync with out table");
123+
}
124+
}
125+
}
126+
127+
@Override
128+
protected void writeMultipleRecordsInternal() throws Exception {
129+
if (disableCopyMode) {
130+
super.writeMultipleRecordsInternal();
131+
} else {
132+
if (rowConverter instanceof JdbcColumnConverter) {
133+
StringBuilder rowsStrBuilder = new StringBuilder(128);
134+
for (RowData row : rows) {
135+
ColumnRowData colRowData = (ColumnRowData) row;
136+
int lastIndex = row.getArity() - 1;
137+
StringBuilder rowStr = new StringBuilder(128);
138+
for (int index = 0; index < row.getArity(); index++) {
139+
appendColumn(colRowData, index, rowStr, index == lastIndex);
140+
}
141+
String tempData = rowStr.toString();
142+
rowsStrBuilder.append(copyModeReplace(tempData)).append(LINE_DELIMITER);
143+
}
144+
String rowVal = rowsStrBuilder.toString();
145+
try (ByteArrayInputStream bi =
146+
new ByteArrayInputStream(rowVal.getBytes(StandardCharsets.UTF_8))) {
147+
copyManager.copyIn(copySql, bi);
148+
if (checkpointEnabled && CheckpointingMode.EXACTLY_ONCE == checkpointMode) {
149+
rowsOfCurrentTransaction += rows.size();
150+
}
151+
}
152+
} else {
153+
throw new NoRestartException("copy mode only support data sync with out table");
154+
}
155+
}
156+
}
157+
158+
private void appendColumn(
159+
ColumnRowData colRowData, int pos, StringBuilder rowStr, boolean isLast) {
160+
Object col = colRowData.getField(pos);
161+
if (col == null) {
162+
rowStr.append(DEFAULT_NULL_VALUE);
163+
} else {
164+
rowStr.append(col);
165+
}
166+
if (!isLast) {
167+
rowStr.append(DEFAULT_FIELD_DELIMITER);
168+
}
169+
}
170+
171+
/**
172+
* \r \n \ 等特殊字符串需要转义
173+
*
174+
* @return
175+
*/
176+
private String copyModeReplace(String rowStr) {
177+
if (rowStr.contains("\\")) {
178+
rowStr = rowStr.replaceAll("\\\\", "\\\\\\\\");
179+
}
180+
if (rowStr.contains("\r")) {
181+
rowStr = rowStr.replaceAll("\r", "\\\\r");
182+
}
183+
184+
if (rowStr.contains("\n")) {
185+
rowStr = rowStr.replaceAll("\n", "\\\\n");
186+
}
187+
188+
// pg 字符串里含有\u0000 会报错 ERROR: invalid byte sequence for encoding "UTF8": 0x00
189+
if (rowStr.contains(SPACE)) {
190+
rowStr = rowStr.replaceAll(SPACE, "");
191+
}
192+
return rowStr;
193+
}
194+
195+
/** 数据源类型 * */
196+
public enum DbType {
197+
POSTGRESQL,
198+
ADB
199+
}
200+
201+
/**
202+
* 当mode为update时进行校验
203+
*
204+
* @return
205+
* @throws SQLException
206+
*/
207+
public void checkUpsert() throws SQLException {
208+
if (EWriteMode.UPDATE.name().equalsIgnoreCase(jdbcConf.getMode())) {
209+
try (Connection connection = getConnection()) {
210+
211+
// 效验版本
212+
String databaseProductVersion =
213+
connection.getMetaData().getDatabaseProductVersion();
214+
LOG.info("source version is {}", databaseProductVersion);
215+
String[] split = databaseProductVersion.split("\\.");
216+
// 10.1.12
217+
if (split.length > 2) {
218+
databaseProductVersion = split[0] + ConstantValue.POINT_SYMBOL + split[1];
219+
}
220+
221+
if (NumberUtils.isNumber(databaseProductVersion)) {
222+
BigDecimal sourceVersion = new BigDecimal(databaseProductVersion);
223+
if (dbType.equalsIgnoreCase(DbType.POSTGRESQL.name())) {
224+
// pg大于等于9.5
225+
if (sourceVersion.compareTo(new BigDecimal("9.5")) < 0) {
226+
throw new RuntimeException(
227+
"the postgreSql version is ["
228+
+ databaseProductVersion
229+
+ "] and must greater than or equal to 9.5 when you use update mode and source is "
230+
+ DbType.POSTGRESQL.name());
231+
}
232+
} else if (dbType.equalsIgnoreCase(DbType.ADB.name())) {
233+
// adb大于等于9.4
234+
if (sourceVersion.compareTo(new BigDecimal("9.4")) < 0) {
235+
throw new RuntimeException(
236+
"the postgreSql version is ["
237+
+ databaseProductVersion
238+
+ "] and must greater than or equal to 9.4 when you use update mode and source is "
239+
+ DbType.ADB.name());
240+
}
241+
}
242+
}
243+
}
244+
}
245+
}
246+
}

chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumSinkFactory.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,44 @@
2020

2121
import com.dtstack.chunjun.conf.SyncConf;
2222
import com.dtstack.chunjun.connector.greenplum.dialect.GreenplumDialect;
23-
import com.dtstack.chunjun.connector.postgresql.sink.PostgresqlSinkFactory;
23+
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormatBuilder;
24+
import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory;
25+
import com.dtstack.chunjun.connector.postgresql.dialect.PostgresqlDialect;
26+
27+
import org.apache.commons.lang.StringUtils;
28+
29+
import static com.dtstack.chunjun.connector.greenplum.sink.GreenplumOutputFormat.INSERT_SQL_MODE_TYPE;
2430

2531
/**
2632
* company www.dtstack.com
2733
*
2834
* @author jier
2935
*/
30-
public class GreenplumSinkFactory extends PostgresqlSinkFactory {
36+
public class GreenplumSinkFactory extends JdbcSinkFactory {
3137

3238
public GreenplumSinkFactory(SyncConf syncConf) {
33-
super(syncConf, new GreenplumDialect());
39+
super(syncConf, null);
40+
if (syncConf.getWriter().getParameter().get("insertSqlMode") != null
41+
&& INSERT_SQL_MODE_TYPE.equalsIgnoreCase(
42+
syncConf.getWriter().getParameter().get("insertSqlMode").toString())) {
43+
this.jdbcDialect = new PostgresqlDialect();
44+
String pgUrl = changeToPostgresqlUrl(this.jdbcConf.getJdbcUrl());
45+
this.jdbcConf.setJdbcUrl(pgUrl);
46+
} else {
47+
this.jdbcDialect = new GreenplumDialect();
48+
}
49+
}
50+
51+
@Override
52+
protected JdbcOutputFormatBuilder getBuilder() {
53+
return new JdbcOutputFormatBuilder(new GreenplumOutputFormat());
54+
}
55+
56+
private String changeToPostgresqlUrl(String gpUrl) {
57+
String pgUrl =
58+
StringUtils.replaceOnce(
59+
gpUrl, GreenplumDialect.URL_START, PostgresqlDialect.URL_START);
60+
pgUrl = StringUtils.replaceOnce(pgUrl, GreenplumDialect.DATABASE_NAME, "/");
61+
return pgUrl;
3462
}
3563
}

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ public class JdbcConf extends ChunJunCommonConf implements Serializable {
106106
/** upsert 写数据库时,是否null覆盖原来的值 */
107107
protected boolean allReplace = false;
108108

109+
protected boolean isAutoCommit = false;
110+
111+
private boolean defineColumnTypeForStatement = false;
112+
109113
public Boolean getInitReporter() {
110114
return initReporter;
111115
}
@@ -419,6 +423,14 @@ public void setAllReplace(boolean allReplace) {
419423
this.allReplace = allReplace;
420424
}
421425

426+
public boolean isAutoCommit() {
427+
return isAutoCommit;
428+
}
429+
430+
public boolean isDefineColumnTypeForStatement() {
431+
return defineColumnTypeForStatement;
432+
}
433+
422434
public String getSplitStrategy() {
423435
return splitStrategy;
424436
}
@@ -485,9 +497,13 @@ public String toString() {
485497
+ increment
486498
+ ", polling="
487499
+ polling
500+
+ ", pollingFromMax="
501+
+ pollingFromMax
488502
+ ", increColumn='"
489503
+ increColumn
490504
+ '\''
505+
+ ", isOrderBy="
506+
+ isOrderBy
491507
+ ", increColumnIndex="
492508
+ increColumnIndex
493509
+ ", increColumnType='"
@@ -508,6 +524,8 @@ public String toString() {
508524
+ restoreColumnIndex
509525
+ ", useMaxFunc="
510526
+ useMaxFunc
527+
+ ", initReporter="
528+
+ initReporter
511529
+ ", mode='"
512530
+ mode
513531
+ '\''
@@ -521,6 +539,10 @@ public String toString() {
521539
+ updateKey
522540
+ ", allReplace="
523541
+ allReplace
542+
+ ", isAutoCommit="
543+
+ isAutoCommit
544+
+ ", defineColumnTypeForStatement="
545+
+ defineColumnTypeForStatement
524546
+ '}';
525547
}
526548
}

0 commit comments

Comments
 (0)