Skip to content

Commit c956d25

Browse files
yanghuaiGitdujie
andauthored
[feature-#1955]jdbcConf adds default optimization parameters (#1959)
Co-authored-by: dujie <[email protected]>
1 parent e77873f commit c956d25

File tree

15 files changed

+72
-34
lines changed

15 files changed

+72
-34
lines changed

.github/workflows/build.yml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ jobs:
7171
distribution: 'temurin'
7272
cache: 'maven'
7373
- name: add dependencies
74-
run: |
75-
wget http://nexus.dev.dtstack.cn/nexus/content/repositories/dtstack-release/com/esen/jdbc/gbase/8.3.81.53/gbase-8.3.81.53.jar
76-
wget https://cdn.gbase.cn/products/27/czrl6z38BvTfEQS4uyQcS/gbasedbtjdbc_3.5.1.jar
77-
./mvnw install:install-file -DgroupId=com.esen.jdbc -DartifactId=gbase -Dversion=8.3.81.53 -Dpackaging=jar -Dfile=./gbase-8.3.81.53.jar
78-
./mvnw install:install-file -DgroupId=com.gbasedbt.jdbc.Driver -DartifactId=gbasedbt -Dversion=3.5.1_1_d0c87a -Dpackaging=jar -Dfile=./gbasedbtjdbc_3.5.1.jar
7974
- name: build project
8075
run: |
8176
./mvnw clean package -Dmaven.test.skip --no-snapshot-updates

chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/converter/ClickhouseRawTypeConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class ClickhouseRawTypeConverter {
4040
public static DataType apply(TypeConfig type) {
4141
switch (type.getType().toUpperCase(Locale.ENGLISH)) {
4242
case "BOOLEAN":
43+
case "BOOL":
4344
return DataTypes.BOOLEAN();
4445
case "TINYINT":
4546
case "INT8":

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/dialect/JdbcDialect.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,4 +525,18 @@ default Function<Tuple3<String, Integer, Integer>, TypeConfig> typeBuilder() {
525525
default TableIdentify getTableIdentify(String confSchema, String confTable) {
526526
return new TableIdentify(null, confSchema, confTable, this::quoteIdentifier, false);
527527
}
528+
529+
/**
530+
* Add additional parameters to jdbc properties,for reader only.
531+
*
532+
* @param jdbcConf jdbc datasource configuration
533+
*/
534+
default void putReaderExtParam(JdbcConfig jdbcConf) {}
535+
536+
/**
537+
* Add additional parameters to jdbc properties,for writer only.
538+
*
539+
* @param jdbcConf jdbc datasource configuration
540+
*/
541+
default void putWriterExtParam(JdbcConfig jdbcConf) {}
528542
}

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcSinkFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public JdbcSinkFactory(SyncConfig syncConfig, JdbcDialect jdbcDialect) {
9696

9797
@Override
9898
public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
99+
jdbcDialect.putWriterExtParam(jdbcConfig);
99100
JdbcOutputFormatBuilder builder = getBuilder();
100101
initColumnInfo();
101102
builder.setJdbcConf(jdbcConfig);

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcSourceFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ protected Class<? extends JdbcConfig> getConfClass() {
105105

106106
@Override
107107
public DataStream<RowData> createSource() {
108+
jdbcDialect.putReaderExtParam(jdbcConfig);
108109
initColumnInfo();
109110
initRestoreConfig();
110111
initPollingConfig();

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/table/JdbcDynamicTableFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ protected JdbcConfig getSinkConnectionConfig(
185185

186186
jdbcConfig.setUniqueKey(keyFields);
187187
resetTableInfo(jdbcConfig);
188+
getDialect().putWriterExtParam(jdbcConfig);
188189
return jdbcConfig;
189190
}
190191

@@ -257,6 +258,7 @@ protected JdbcConfig getSourceConnectionConfig(ReadableConfig readableConfig) {
257258
if (StringUtils.isBlank(jdbcConfig.getCustomSql())) {
258259
resetTableInfo(jdbcConfig);
259260
}
261+
getDialect().putReaderExtParam(jdbcConfig);
260262
return jdbcConfig;
261263
}
262264

chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/dialect/MysqlDialect.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.dtstack.chunjun.config.TypeConfig;
2222
import com.dtstack.chunjun.connector.jdbc.conf.TableIdentify;
23+
import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
2324
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
2425
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
2526
import com.dtstack.chunjun.connector.mysql.converter.MysqlRawTypeConverter;
@@ -37,6 +38,7 @@
3738
import java.util.Arrays;
3839
import java.util.Locale;
3940
import java.util.Optional;
41+
import java.util.Properties;
4042
import java.util.function.Function;
4143
import java.util.stream.Collectors;
4244

@@ -166,4 +168,28 @@ public Function<Tuple3<String, Integer, Integer>, TypeConfig> typeBuilder() {
166168
return typeConfig;
167169
});
168170
}
171+
172+
@Override
173+
public void putWriterExtParam(JdbcConfig jdbcConf) {
174+
Properties properties = jdbcConf.getProperties();
175+
if (properties == null) {
176+
properties = new Properties();
177+
}
178+
properties.putIfAbsent("useCursorFetch", "true");
179+
properties.putIfAbsent("rewriteBatchedStatements", "true");
180+
properties.put("tinyInt1isBit", "false");
181+
jdbcConf.setProperties(properties);
182+
}
183+
184+
@Override
185+
public void putReaderExtParam(JdbcConfig jdbcConf) {
186+
Properties properties = jdbcConf.getProperties();
187+
if (properties == null) {
188+
properties = new Properties();
189+
}
190+
properties.putIfAbsent("useCursorFetch", "true");
191+
properties.putIfAbsent("rewriteBatchedStatements", "true");
192+
properties.put("tinyInt1isBit", "false");
193+
jdbcConf.setProperties(properties);
194+
}
169195
}

chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/sink/MysqlSinkFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@
2222
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat;
2323
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormatBuilder;
2424
import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory;
25-
import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
2625
import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect;
2726

2827
public class MysqlSinkFactory extends JdbcSinkFactory {
2928

3029
public MysqlSinkFactory(SyncConfig syncConfig) {
3130
super(syncConfig, new MysqlDialect());
32-
JdbcUtil.putExtParam(jdbcConfig);
3331
}
3432

3533
@Override

chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/source/MysqlSourceFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat;
2323
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormatBuilder;
2424
import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory;
25-
import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
2625
import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect;
2726

2827
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -42,7 +41,6 @@ public MysqlSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env)
4241
&& jdbcConfig.getFetchSize() == 0) {
4342
jdbcConfig.setFetchSize(1000);
4443
}
45-
JdbcUtil.putExtParam(jdbcConfig);
4644
}
4745

4846
@Override

chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/table/MysqlDynamicTableFactory.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,14 @@
1818

1919
package com.dtstack.chunjun.connector.mysql.table;
2020

21-
import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
2221
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
2322
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat;
2423
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormatBuilder;
2524
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat;
2625
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormatBuilder;
2726
import com.dtstack.chunjun.connector.jdbc.table.JdbcDynamicTableFactory;
28-
import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
2927
import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect;
3028

31-
import org.apache.flink.configuration.ReadableConfig;
32-
import org.apache.flink.table.catalog.ResolvedSchema;
33-
3429
public class MysqlDynamicTableFactory extends JdbcDynamicTableFactory {
3530

3631
// 默认是Mysql流式拉取
@@ -54,21 +49,6 @@ protected int getDefaultFetchSize() {
5449
return DEFAULT_FETCH_SIZE;
5550
}
5651

57-
@Override
58-
protected JdbcConfig getSourceConnectionConfig(ReadableConfig readableConfig) {
59-
JdbcConfig jdbcConfig = super.getSourceConnectionConfig(readableConfig);
60-
JdbcUtil.putExtParam(jdbcConfig);
61-
return jdbcConfig;
62-
}
63-
64-
@Override
65-
protected JdbcConfig getSinkConnectionConfig(
66-
ReadableConfig readableConfig, ResolvedSchema schema) {
67-
JdbcConfig jdbcConfig = super.getSinkConnectionConfig(readableConfig, schema);
68-
JdbcUtil.putExtParam(jdbcConfig);
69-
return jdbcConfig;
70-
}
71-
7252
@Override
7353
protected JdbcInputFormatBuilder getInputFormatBuilder() {
7454
return new JdbcInputFormatBuilder(new JdbcInputFormat());

0 commit comments

Comments
 (0)