Skip to content

Commit 7a00d0d

Browse files
committed
Merge remote-tracking branch 'origin/1.5_v3.6.1' into v1.5.0_dev
# Conflicts: # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java
2 parents b3377b4 + 267cd3f commit 7a00d0d

File tree

5 files changed

+120
-3
lines changed

5 files changed

+120
-3
lines changed

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,13 @@ public void open(Configuration parameters) throws Exception {
6363
.put("driver_class", MYSQL_DRIVER)
6464
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
6565
.put("user", rdbSideTableInfo.getUserName())
66-
.put("password", rdbSideTableInfo.getPassword());
66+
.put("password", rdbSideTableInfo.getPassword())
67+
.put("provider_class", DT_PROVIDER_CLASS)
68+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
69+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
70+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
71+
72+
6773

6874
VertxOptions vo = new VertxOptions();
6975
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,11 @@ public void open(Configuration parameters) throws Exception {
5555
.put("driver_class", ORACLE_DRIVER)
5656
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
5757
.put("user", rdbSideTableInfo.getUserName())
58-
.put("password", rdbSideTableInfo.getPassword());
58+
.put("password", rdbSideTableInfo.getPassword())
59+
.put("provider_class", DT_PROVIDER_CLASS)
60+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
61+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
62+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);;
5963

6064
VertxOptions vo = new VertxOptions();
6165
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ public class RdbAsyncReqRow extends AsyncReqRow {
5757

5858
public final static int DEFAULT_MAX_DB_CONN_POOL_SIZE = DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE + DEFAULT_VERTX_WORKER_POOL_SIZE;
5959

60+
public final static int DEFAULT_IDLE_CONNECTION_TEST_PEROID = 60;
61+
62+
public final static boolean DEFAULT_TEST_CONNECTION_ON_CHECKIN = true;
63+
64+
public final static String DT_PROVIDER_CLASS = "com.dtstack.flink.sql.side.rdb.provider.DTC3P0DataSourceProvider";
65+
66+
public final static String PREFERRED_TEST_QUERY_SQL = "select 1 from dual";
67+
6068
private transient SQLClient rdbSQLClient;
6169

6270
public RdbAsyncReqRow(SideInfo sideInfo) {
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package com.dtstack.flink.sql.side.rdb.provider;
2+
3+
import com.mchange.v2.c3p0.ComboPooledDataSource;
4+
import io.vertx.core.json.JsonObject;
5+
import io.vertx.ext.jdbc.spi.impl.C3P0DataSourceProvider;
6+
7+
import javax.sql.DataSource;
8+
import java.beans.PropertyVetoException;
9+
import java.sql.SQLException;
10+
11+
public class DTC3P0DataSourceProvider extends C3P0DataSourceProvider {
12+
13+
@Override
14+
public DataSource getDataSource(JsonObject config) throws SQLException {
15+
String url = config.getString("url");
16+
if (url == null) throw new NullPointerException("url cannot be null");
17+
String driverClass = config.getString("driver_class");
18+
String user = config.getString("user");
19+
String password = config.getString("password");
20+
Integer maxPoolSize = config.getInteger("max_pool_size");
21+
Integer initialPoolSize = config.getInteger("initial_pool_size");
22+
Integer minPoolSize = config.getInteger("min_pool_size");
23+
Integer maxStatements = config.getInteger("max_statements");
24+
Integer maxStatementsPerConnection = config.getInteger("max_statements_per_connection");
25+
Integer maxIdleTime = config.getInteger("max_idle_time");
26+
Integer acquireRetryAttempts = config.getInteger("acquire_retry_attempts");
27+
Integer acquireRetryDelay = config.getInteger("acquire_retry_delay");
28+
Boolean breakAfterAcquireFailure = config.getBoolean("break_after_acquire_failure");
29+
30+
//add c3p0 params
31+
String preferredTestQuery = config.getString("preferred_test_query");
32+
Integer idleConnectionTestPeriod = config.getInteger("idle_connection_test_period");
33+
Boolean testConnectionOnCheckin = config.getBoolean("test_connection_on_checkin");
34+
35+
36+
37+
// If you want to configure any other C3P0 properties you can add a file c3p0.properties to the classpath
38+
ComboPooledDataSource cpds = new ComboPooledDataSource();
39+
cpds.setJdbcUrl(url);
40+
if (driverClass != null) {
41+
try {
42+
cpds.setDriverClass(driverClass);
43+
} catch (PropertyVetoException e) {
44+
throw new IllegalArgumentException(e);
45+
}
46+
}
47+
if (user != null) {
48+
cpds.setUser(user);
49+
}
50+
if (password != null) {
51+
cpds.setPassword(password);
52+
}
53+
if (maxPoolSize != null) {
54+
cpds.setMaxPoolSize(maxPoolSize);
55+
}
56+
if (minPoolSize != null) {
57+
cpds.setMinPoolSize(minPoolSize);
58+
}
59+
if (initialPoolSize != null) {
60+
cpds.setInitialPoolSize(initialPoolSize);
61+
}
62+
if (maxStatements != null) {
63+
cpds.setMaxStatements(maxStatements);
64+
}
65+
if (maxStatementsPerConnection != null) {
66+
cpds.setMaxStatementsPerConnection(maxStatementsPerConnection);
67+
}
68+
if (maxIdleTime != null) {
69+
cpds.setMaxIdleTime(maxIdleTime);
70+
}
71+
if(acquireRetryAttempts != null){
72+
cpds.setAcquireRetryAttempts(acquireRetryAttempts);
73+
}
74+
if(acquireRetryDelay != null){
75+
cpds.setAcquireRetryDelay(acquireRetryDelay);
76+
}
77+
if(breakAfterAcquireFailure != null){
78+
cpds.setBreakAfterAcquireFailure(breakAfterAcquireFailure);
79+
}
80+
81+
if (preferredTestQuery != null) {
82+
cpds.setPreferredTestQuery(preferredTestQuery);
83+
}
84+
85+
if (idleConnectionTestPeriod != null) {
86+
cpds.setIdleConnectionTestPeriod(idleConnectionTestPeriod);
87+
}
88+
89+
if (testConnectionOnCheckin != null) {
90+
cpds.setTestConnectionOnCheckin(testConnectionOnCheckin);
91+
}
92+
93+
return cpds;
94+
}
95+
}

sqlserver/sqlserver-side/sqlserver-async-side/src/main/java/com/dtstack/flink/sql/side/sqlserver/SqlserverAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@ public void open(Configuration parameters) throws Exception {
5353
.put("driver_class", SQLSERVER_DRIVER)
5454
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
5555
.put("user", rdbSideTableInfo.getUserName())
56-
.put("password", rdbSideTableInfo.getPassword());
56+
.put("password", rdbSideTableInfo.getPassword())
57+
.put("provider_class", DT_PROVIDER_CLASS)
58+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
59+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
60+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);;
5761

5862
VertxOptions vo = new VertxOptions();
5963
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);

0 commit comments

Comments
 (0)