Skip to content

Commit c098bd9

Browse files
committed
Merge branch 'v1.5.0_oracle_field_case' into 'v1.5.0_dev'
oracle 小写字段,表名转义 See merge request !30
2 parents 48eca2a + bd42178 commit c098bd9

File tree

10 files changed

+101
-22
lines changed

10 files changed

+101
-22
lines changed

mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class MysqlAllReqRow extends RdbAllReqRow {
5050
private static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
5151

5252
public MysqlAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
53-
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
53+
super(new MysqlAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
5454
}
5555

5656
@Override

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class MysqlAsyncReqRow extends RdbAsyncReqRow {
5050
private final static String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
5151

5252
public MysqlAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
53-
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
53+
super(new MysqlAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
5454
}
5555

5656

oracle/oracle-side/oracle-all-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class OracleAllReqRow extends RdbAllReqRow {
4343
private static final String ORACLE_DRIVER = "oracle.jdbc.driver.OracleDriver";
4444

4545
public OracleAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
46-
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
46+
super(new OracleAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
4747
}
4848

4949
@Override

oracle/oracle-side/oracle-all-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAllSideInfo.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.dtstack.flink.sql.side.JoinInfo;
2222
import com.dtstack.flink.sql.side.SideTableInfo;
2323
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
24+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
25+
import org.apache.commons.lang3.StringUtils;
2426
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2527

2628
import java.util.List;
@@ -30,4 +32,32 @@ public class OracleAllSideInfo extends RdbAllSideInfo {
3032
public OracleAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
3133
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
3234
}
35+
36+
@Override
37+
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
38+
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideTableInfo;
39+
40+
sqlCondition = "select ${selectField} from ${tableName} ";
41+
42+
43+
sqlCondition = sqlCondition.replace("${tableName}", dealFiled(rdbSideTableInfo.getTableName())).replace("${selectField}", dealLowerSelectFiled(sideSelectFields));
44+
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
45+
}
46+
47+
48+
private String dealFiled(String field) {
49+
return "\"" + field + "\"";
50+
}
51+
52+
private String dealLowerSelectFiled(String fieldsStr) {
53+
StringBuilder sb = new StringBuilder();
54+
String[] fields = fieldsStr.split(",");
55+
56+
for(String f : fields) {
57+
sb.append("\"").append(f).append("\"").append(",");
58+
}
59+
60+
sb.deleteCharAt(sb.lastIndexOf(","));
61+
return sb.toString();
62+
}
3363
}

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncReqRow.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@ public class OracleAsyncReqRow extends RdbAsyncReqRow {
4343
private static final String ORACLE_DRIVER = "oracle.jdbc.driver.OracleDriver";
4444

4545
public OracleAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
46-
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
46+
super(new OracleAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
4747
}
4848

4949
@Override
5050
public void open(Configuration parameters) throws Exception {
5151
super.open(parameters);
52-
JsonObject sqlserverClientConfig = new JsonObject();
52+
JsonObject oracleClientConfig = new JsonObject();
5353
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
54-
sqlserverClientConfig.put("url", rdbSideTableInfo.getUrl())
54+
oracleClientConfig.put("url", rdbSideTableInfo.getUrl())
5555
.put("driver_class", ORACLE_DRIVER)
5656
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
5757
.put("user", rdbSideTableInfo.getUserName())
@@ -61,6 +61,6 @@ public void open(Configuration parameters) throws Exception {
6161
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
6262
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
6363
Vertx vertx = Vertx.vertx(vo);
64-
setRdbSQLClient(JDBCClient.createNonShared(vertx, sqlserverClientConfig));
64+
setRdbSQLClient(JDBCClient.createNonShared(vertx, oracleClientConfig));
6565
}
6666
}

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,14 @@
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.SideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.ParseUtils;
27+
import org.apache.calcite.sql.SqlNode;
28+
import org.apache.commons.lang3.StringUtils;
2529
import org.apache.flink.api.java.typeutils.RowTypeInfo;
30+
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
2631

32+
import java.util.Arrays;
2733
import java.util.List;
2834

2935

@@ -32,4 +38,50 @@ public class OracleAsyncSideInfo extends RdbAsyncSideInfo {
3238
public OracleAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
3339
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
3440
}
41+
42+
@Override
43+
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
44+
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideTableInfo;
45+
46+
String sideTableName = joinInfo.getSideTableName();
47+
48+
SqlNode conditionNode = joinInfo.getCondition();
49+
50+
List<SqlNode> sqlNodeList = Lists.newArrayList();
51+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
52+
53+
for (SqlNode sqlNode : sqlNodeList) {
54+
dealOneEqualCon(sqlNode, sideTableName);
55+
}
56+
57+
sqlCondition = "select ${selectField} from ${tableName} where ";
58+
for (int i = 0; i < equalFieldList.size(); i++) {
59+
String equalField = equalFieldList.get(i);
60+
61+
sqlCondition += dealLowerFiled(equalField) + "=? ";
62+
if (i != equalFieldList.size() - 1) {
63+
sqlCondition += " and ";
64+
}
65+
}
66+
67+
sqlCondition = sqlCondition.replace("${tableName}", dealLowerFiled(rdbSideTableInfo.getTableName())).replace("${selectField}", dealLowerSelectFiled(sideSelectFields));
68+
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
69+
}
70+
71+
private String dealLowerFiled(String field) {
72+
return "\"" + field + "\"";
73+
}
74+
75+
private String dealLowerSelectFiled(String fieldsStr) {
76+
StringBuilder sb = new StringBuilder();
77+
String[] fields = fieldsStr.split(",");
78+
79+
for(String f : fields) {
80+
sb.append("\"").append(f).append("\"").append(",");
81+
}
82+
83+
sb.deleteCharAt(sb.lastIndexOf(","));
84+
return sb.toString();
85+
}
86+
3587
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllReqRow.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,11 @@
1919
package com.dtstack.flink.sql.side.rdb.all;
2020

2121
import com.dtstack.flink.sql.side.AllReqRow;
22-
import com.dtstack.flink.sql.side.FieldInfo;
23-
import com.dtstack.flink.sql.side.JoinInfo;
24-
import com.dtstack.flink.sql.side.SideTableInfo;
22+
import com.dtstack.flink.sql.side.SideInfo;
2523
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26-
import com.dtstack.flink.sql.side.rdb.util.MathUtil;
2724
import com.dtstack.flink.sql.side.rdb.util.SwitchUtil;
2825
import org.apache.calcite.sql.JoinType;
2926
import org.apache.commons.collections.CollectionUtils;
30-
import org.apache.flink.api.common.typeinfo.TypeInformation;
31-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3227
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
3328
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
3429
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
@@ -37,8 +32,11 @@
3732
import org.slf4j.Logger;
3833
import org.slf4j.LoggerFactory;
3934

40-
import java.math.BigDecimal;
41-
import java.sql.*;
35+
import java.sql.Connection;
36+
import java.sql.ResultSet;
37+
import java.sql.SQLException;
38+
import java.sql.Statement;
39+
import java.sql.Timestamp;
4240
import java.util.Calendar;
4341
import java.util.List;
4442
import java.util.Map;
@@ -62,9 +60,8 @@ public abstract class RdbAllReqRow extends AllReqRow {
6260

6361
private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();
6462

65-
66-
public RdbAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
67-
super(new RdbAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
63+
public RdbAllReqRow(SideInfo sideInfo) {
64+
super(sideInfo);
6865
}
6966

7067
@Override

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public class RdbAsyncReqRow extends AsyncReqRow {
5959

6060
private transient SQLClient rdbSQLClient;
6161

62-
public RdbAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
63-
super(new RdbAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
62+
public RdbAsyncReqRow(SideInfo sideInfo) {
63+
super(sideInfo);
6464
}
6565

6666
@Override

sqlserver/sqlserver-side/sqlserver-all-side/src/main/java/com/dtstack/flink/sql/side/sqlserver/SqlserverAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class SqlserverAllReqRow extends RdbAllReqRow {
4343
private static final String SQLSERVER_DRIVER = "net.sourceforge.jtds.jdbc.Driver";
4444

4545
public SqlserverAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
46-
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
46+
super(new SqlserverAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
4747
}
4848

4949
@Override

sqlserver/sqlserver-side/sqlserver-async-side/src/main/java/com/dtstack/flink/sql/side/sqlserver/SqlserverAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class SqlserverAsyncReqRow extends RdbAsyncReqRow {
4141
private final static String SQLSERVER_DRIVER = "net.sourceforge.jtds.jdbc.Driver";
4242

4343
public SqlserverAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
44-
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
44+
super(new SqlserverAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
4545
}
4646

4747
@Override

0 commit comments

Comments
 (0)