Skip to content

Commit 1f76c9f

Browse files
committed
add as operate
1 parent b8e4db4 commit 1f76c9f

File tree

2 files changed

+37
-38
lines changed

2 files changed

+37
-38
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
104104
List<String> fieldNames = null;
105105
for(FieldReplaceInfo replaceInfo : replaceInfoList){
106106
fieldNames = Lists.newArrayList();
107-
replaceFieldName(pollSqlNode, replaceInfo.getMappingTable(), replaceInfo.getTargetTableName(), replaceInfo.getTargetTableAlias(), fieldNames);
108-
dealMidConvertField(pollSqlNode, fieldNames);
107+
replaceFieldName(pollSqlNode, replaceInfo.getMappingTable(), replaceInfo.getTargetTableName(), replaceInfo.getTargetTableAlias());
108+
addAliasForFiledNode(pollSqlNode, fieldNames, replaceInfo.getMappingTable());
109109
}
110110
}
111111

@@ -130,16 +130,16 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
130130
}
131131

132132

133-
private void dealMidConvertField(SqlNode pollSqlNode, List<String> field) {
133+
private void addAliasForFiledNode(SqlNode pollSqlNode, List<String> fieldList, HashBasedTable<String, String, String> mappingTable) {
134134
SqlKind sqlKind = pollSqlNode.getKind();
135135
switch (sqlKind) {
136136
case INSERT:
137137
SqlNode source = ((SqlInsert) pollSqlNode).getSource();
138-
dealMidConvertField(source, field);
138+
addAliasForFiledNode(source, fieldList, mappingTable);
139139
break;
140140

141141
case AS:
142-
dealMidConvertField(((SqlBasicCall) pollSqlNode).getOperands()[0], field);
142+
addAliasForFiledNode(((SqlBasicCall) pollSqlNode).getOperands()[0], fieldList, mappingTable);
143143
break;
144144

145145
case SELECT:
@@ -152,14 +152,15 @@ private void dealMidConvertField(SqlNode pollSqlNode, List<String> field) {
152152
if (sqlIdentifier.names.size() == 1) {
153153
return;
154154
}
155-
String name = sqlIdentifier.names.get(1);
156-
if (!name.endsWith("0")) {
157-
field.add(name);
155+
156+
String filedName = sqlIdentifier.names.get(1);
157+
if (!filedName.endsWith("0") ) {
158+
fieldList.add(filedName);
158159
}
159160

160161
}
161162
});
162-
// convert
163+
163164
for (int i = 0; i < selectList.getList().size(); i++) {
164165
SqlNode node = selectList.get(i);
165166
if (node.getKind() == IDENTIFIER) {
@@ -169,7 +170,7 @@ private void dealMidConvertField(SqlNode pollSqlNode, List<String> field) {
169170
}
170171

171172
String name = sqlIdentifier.names.get(1);
172-
if (name.endsWith("0") && !field.contains(name)) {
173+
if (name.endsWith("0") && !fieldList.contains(name.substring(0, name.length() - 1)) && !mappingTable.columnMap().containsKey(name)) {
173174
SqlOperator operator = new SqlAsOperator();
174175
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
175176

@@ -227,16 +228,16 @@ public RowTypeInfo buildOutRowTypeInfo(List<FieldInfo> sideJoinFieldInfo, HashBa
227228
}
228229

229230
//需要考虑更多的情况
230-
private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, String> mappingTable, String targetTableName, String tableAlias, List<String> fieldNames) {
231+
private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, String> mappingTable, String targetTableName, String tableAlias) {
231232
SqlKind sqlKind = sqlNode.getKind();
232233
switch (sqlKind) {
233234
case INSERT:
234235
SqlNode sqlSource = ((SqlInsert) sqlNode).getSource();
235-
replaceFieldName(sqlSource, mappingTable, targetTableName, tableAlias, fieldNames);
236+
replaceFieldName(sqlSource, mappingTable, targetTableName, tableAlias);
236237
break;
237238
case AS:
238239
SqlNode asNode = ((SqlBasicCall) sqlNode).getOperands()[0];
239-
replaceFieldName(asNode, mappingTable, targetTableName, tableAlias, fieldNames);
240+
replaceFieldName(asNode, mappingTable, targetTableName, tableAlias);
240241
break;
241242
case SELECT:
242243
SqlSelect sqlSelect = (SqlSelect) filterNodeWithTargetName(sqlNode, targetTableName);
@@ -265,7 +266,7 @@ private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, St
265266
continue;
266267
}
267268

268-
SqlNode replaceNode = replaceSelectFieldName(selectNode, mappingTable, tableAlias, fieldNames);
269+
SqlNode replaceNode = replaceSelectFieldName(selectNode, mappingTable, tableAlias);
269270
if(replaceNode == null){
270271
continue;
271272
}
@@ -282,15 +283,15 @@ private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, St
282283
SqlNode[] sqlNodeList = ((SqlBasicCall)whereNode).getOperands();
283284
for(int i =0; i<sqlNodeList.length; i++) {
284285
SqlNode whereSqlNode = sqlNodeList[i];
285-
SqlNode replaceNode = replaceNodeInfo(whereSqlNode, mappingTable, tableAlias, fieldNames);
286+
SqlNode replaceNode = replaceNodeInfo(whereSqlNode, mappingTable, tableAlias);
286287
sqlNodeList[i] = replaceNode;
287288
}
288289
}
289290

290291
if(sqlGroup != null && CollectionUtils.isNotEmpty(sqlGroup.getList())){
291292
for( int i=0; i<sqlGroup.getList().size(); i++){
292293
SqlNode selectNode = sqlGroup.getList().get(i);
293-
SqlNode replaceNode = replaceNodeInfo(selectNode, mappingTable, tableAlias, fieldNames);
294+
SqlNode replaceNode = replaceNodeInfo(selectNode, mappingTable, tableAlias);
294295
sqlGroup.set(i, replaceNode);
295296
}
296297
}
@@ -310,7 +311,7 @@ private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, St
310311
}
311312
}
312313

313-
private SqlNode replaceNodeInfo(SqlNode groupNode, HashBasedTable<String, String, String> mappingTable, String tableAlias, List<String> fieldNames){
314+
private SqlNode replaceNodeInfo(SqlNode groupNode, HashBasedTable<String, String, String> mappingTable, String tableAlias){
314315
if(groupNode.getKind() == IDENTIFIER){
315316
SqlIdentifier sqlIdentifier = (SqlIdentifier) groupNode;
316317
String mappingFieldName = mappingTable.get(sqlIdentifier.getComponent(0).getSimple(), sqlIdentifier.getComponent(1).getSimple());
@@ -320,7 +321,7 @@ private SqlNode replaceNodeInfo(SqlNode groupNode, HashBasedTable<String, String
320321
SqlBasicCall sqlBasicCall = (SqlBasicCall) groupNode;
321322
for(int i=0; i<sqlBasicCall.getOperandList().size(); i++){
322323
SqlNode sqlNode = sqlBasicCall.getOperandList().get(i);
323-
SqlNode replaceNode = replaceSelectFieldName(sqlNode, mappingTable, tableAlias, fieldNames);
324+
SqlNode replaceNode = replaceSelectFieldName(sqlNode, mappingTable, tableAlias);
324325
sqlBasicCall.getOperands()[i] = replaceNode;
325326
}
326327

@@ -411,12 +412,10 @@ private List<SqlNode> replaceSelectStarFieldName(SqlNode selectNode, HashBasedTa
411412
}
412413
}
413414

414-
private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String, String, String> mappingTable, String tableAlias, List<String> fieldNames) {
415+
private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String, String, String> mappingTable, String tableAlias) {
415416
if (selectNode.getKind() == AS) {
416417
SqlNode leftNode = ((SqlBasicCall) selectNode).getOperands()[0];
417-
SqlNode rightNode = ((SqlBasicCall) selectNode).getOperands()[1];
418-
fieldNames.add(rightNode.toString());
419-
SqlNode replaceNode = replaceSelectFieldName(leftNode, mappingTable, tableAlias, fieldNames);
418+
SqlNode replaceNode = replaceSelectFieldName(leftNode, mappingTable, tableAlias);
420419
if (replaceNode != null) {
421420
((SqlBasicCall) selectNode).getOperands()[0] = replaceNode;
422421
}
@@ -484,7 +483,7 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
484483
continue;
485484
}
486485

487-
SqlNode replaceNode = replaceSelectFieldName(sqlNode, mappingTable, tableAlias, fieldNames);
486+
SqlNode replaceNode = replaceSelectFieldName(sqlNode, mappingTable, tableAlias);
488487
if(replaceNode == null){
489488
continue;
490489
}
@@ -502,21 +501,21 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
502501

503502
for(int i=0; i<whenOperands.size(); i++){
504503
SqlNode oneOperand = whenOperands.get(i);
505-
SqlNode replaceNode = replaceSelectFieldName(oneOperand, mappingTable, tableAlias, fieldNames);
504+
SqlNode replaceNode = replaceSelectFieldName(oneOperand, mappingTable, tableAlias);
506505
if (replaceNode != null) {
507506
whenOperands.set(i, replaceNode);
508507
}
509508
}
510509

511510
for(int i=0; i<thenOperands.size(); i++){
512511
SqlNode oneOperand = thenOperands.get(i);
513-
SqlNode replaceNode = replaceSelectFieldName(oneOperand, mappingTable, tableAlias, fieldNames);
512+
SqlNode replaceNode = replaceSelectFieldName(oneOperand, mappingTable, tableAlias);
514513
if (replaceNode != null) {
515514
thenOperands.set(i, replaceNode);
516515
}
517516
}
518517

519-
((SqlCase) selectNode).setOperand(3, replaceSelectFieldName(elseNode, mappingTable, tableAlias, fieldNames));
518+
((SqlCase) selectNode).setOperand(3, replaceSelectFieldName(elseNode, mappingTable, tableAlias));
520519
return selectNode;
521520
}else if(selectNode.getKind() == OTHER){
522521
//不处理
@@ -604,8 +603,8 @@ public void registerTmpTable(CreateTmpTableParser.SqlParserResult result,
604603
List<String> fieldNames = null;
605604
for (FieldReplaceInfo replaceInfo : replaceInfoList) {
606605
fieldNames = Lists.newArrayList();
607-
replaceFieldName(pollSqlNode, replaceInfo.getMappingTable(), replaceInfo.getTargetTableName(), replaceInfo.getTargetTableAlias(), fieldNames);
608-
dealMidConvertField(pollSqlNode, fieldNames);
606+
replaceFieldName(pollSqlNode, replaceInfo.getMappingTable(), replaceInfo.getTargetTableName(), replaceInfo.getTargetTableAlias());
607+
addAliasForFiledNode(pollSqlNode, fieldNames, replaceInfo.getMappingTable());
609608
}
610609
}
611610

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
6161
// trigger preparedStatement execute batch interval
6262
private long batchWaitInterval = 10000l;
6363
// PreparedStatement execute batch num
64-
private int batchNum = 1;
64+
private int batchNum = 100;
6565
private String insertQuery;
6666
public int[] typesArray;
6767

@@ -97,6 +97,15 @@ public void open(int taskNumber, int numTasks) throws IOException {
9797
establishConnection();
9898
initMetric();
9999

100+
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
101+
if (isReplaceInsertQuery()) {
102+
insertQuery = dbSink.buildUpdateSql(tableName, Arrays.asList(dbSink.getFieldNames()), realIndexes, fullField);
103+
}
104+
upload = dbConn.prepareStatement(insertQuery);
105+
} else {
106+
throw new SQLException("Table " + tableName + " doesn't exist");
107+
}
108+
100109
if (batchWaitInterval > 0) {
101110
LOG.info("open batch wait interval scheduled, interval is {} ms", batchWaitInterval);
102111

@@ -107,15 +116,6 @@ public void open(int taskNumber, int numTasks) throws IOException {
107116

108117
}
109118

110-
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
111-
if (isReplaceInsertQuery()) {
112-
insertQuery = dbSink.buildUpdateSql(tableName, Arrays.asList(dbSink.getFieldNames()), realIndexes, fullField);
113-
}
114-
upload = dbConn.prepareStatement(insertQuery);
115-
} else {
116-
throw new SQLException("Table " + tableName + " doesn't exist");
117-
}
118-
119119
} catch (SQLException sqe) {
120120
throw new IllegalArgumentException("open() failed.", sqe);
121121
} catch (ClassNotFoundException cnfe) {

0 commit comments

Comments
 (0)