Skip to content

Commit e3c19ad

Browse files
committed
Merge branch 'v1.5.0_dev' into v1.5.0_release
2 parents 5926e47 + 075064a commit e3c19ad

File tree

9 files changed

+61
-30
lines changed

9 files changed

+61
-30
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.commons.cli.DefaultParser;
4343
import org.apache.commons.cli.Options;
4444
import org.apache.commons.io.Charsets;
45+
import org.apache.flink.api.common.ExecutionConfig;
4546
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
4647
import org.apache.flink.api.common.time.Time;
4748
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -52,6 +53,7 @@
5253
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
5354
import org.apache.flink.calcite.shaded.com.google.common.collect.Sets;
5455
import org.apache.flink.client.program.ContextEnvironment;
56+
import org.apache.flink.configuration.Configuration;
5557
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
5658
import org.apache.flink.streaming.api.datastream.DataStream;
5759
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
@@ -67,6 +69,7 @@
6769
import java.io.IOException;
6870
import java.lang.reflect.Field;
6971
import java.lang.reflect.InvocationTargetException;
72+
import java.lang.reflect.Method;
7073
import java.net.URL;
7174
import java.net.URLClassLoader;
7275
import java.net.URLDecoder;
@@ -316,12 +319,33 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
316319
}
317320
}
318321

319-
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException {
322+
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException, NoSuchMethodException {
320323
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
321324
StreamExecutionEnvironment.getExecutionEnvironment() :
322325
new MyLocalStreamEnvironment();
323326

324327
env.setParallelism(FlinkUtil.getEnvParallelism(confProperties));
328+
Configuration globalJobParameters = new Configuration();
329+
Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class);
330+
method.setAccessible(true);
331+
332+
confProperties.forEach((key,val) -> {
333+
try {
334+
method.invoke(globalJobParameters, key, val);
335+
} catch (IllegalAccessException e) {
336+
e.printStackTrace();
337+
} catch (InvocationTargetException e) {
338+
e.printStackTrace();
339+
}
340+
});
341+
342+
ExecutionConfig exeConfig = env.getConfig();
343+
if(exeConfig.getGlobalJobParameters() == null){
344+
exeConfig.setGlobalJobParameters(globalJobParameters);
345+
}else if(exeConfig.getGlobalJobParameters() instanceof Configuration){
346+
((Configuration) exeConfig.getGlobalJobParameters()).addAll(globalJobParameters);
347+
}
348+
325349

326350
if(FlinkUtil.getMaxEnvParallelism(confProperties) > 0){
327351
env.setMaxParallelism(FlinkUtil.getMaxEnvParallelism(confProperties));

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
109109
}
110110

111111
public static void dealPrimaryKey(Matcher matcher, TableInfo tableInfo){
112-
String primaryFields = matcher.group(1);
112+
String primaryFields = matcher.group(1).trim();
113113
String[] splitArry = primaryFields.split(",");
114114
List<String> primaryKes = Lists.newArrayList(splitArry);
115115
tableInfo.setPrimaryKeys(primaryKes);

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ private void loadData(Map<String, Map<String, Object>> tmpCache) throws SQLExcep
170170
StringBuilder key = new StringBuilder();
171171
key.append(family).append(":").append(qualifier);
172172

173-
kv.put(aliasNameInversion.get(key.toString().toUpperCase()), value);
173+
kv.put(aliasNameInversion.get(key.toString()), value);
174174
}
175175
tmpCache.put(new String(r.getRow()), kv);
176176
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
9898
String mapKey = cf + ":" + col;
9999

100100
//The table format defined using different data type conversion byte
101-
String colType = colRefType.get(mapKey.toUpperCase());
101+
String colType = colRefType.get(mapKey);
102102
Object val = HbaseUtils.convertByte(keyValue.value(), colType);
103-
sideMap.put(mapKey.toUpperCase(), val);
103+
sideMap.put(mapKey, val);
104104
}
105105

106106
if (oneRow.size() > 0) {

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFu
7373
String col = new String(keyValue.qualifier());
7474
String mapKey = cf + ":" + col;
7575
//The table format defined using different data type conversion byte
76-
String colType = colRefType.get(mapKey.toUpperCase());
76+
String colType = colRefType.get(mapKey);
7777
Object val = HbaseUtils.convertByte(keyValue.value(), colType);
78-
sideMap.put(mapKey.toUpperCase(), val);
78+
sideMap.put(mapKey, val);
7979
}
8080

8181
if(arg.size() > 0){

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,27 @@
2323
import avro.shaded.com.google.common.collect.Lists;
2424
import com.alibaba.fastjson.JSON;
2525
import com.alibaba.fastjson.TypeReference;
26+
import com.dtstack.flink.sql.ClusterMode;
2627
import com.dtstack.flink.sql.Main;
2728
import com.dtstack.flink.sql.launcher.perjob.PerJobSubmitter;
2829
import org.apache.flink.client.program.ClusterClient;
2930
import org.apache.flink.client.program.PackagedProgram;
30-
31-
import java.io.*;
32-
import java.util.LinkedList;
33-
import java.util.List;
34-
import java.util.Map;
35-
36-
import com.dtstack.flink.sql.ClusterMode;
3731
import org.apache.flink.client.program.PackagedProgramUtils;
3832
import org.apache.flink.configuration.Configuration;
3933
import org.apache.flink.configuration.GlobalConfiguration;
4034
import org.apache.flink.runtime.jobgraph.JobGraph;
41-
import org.apache.flink.table.shaded.org.apache.commons.lang.StringUtils;
4235
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
4336
import org.apache.flink.table.shaded.org.apache.commons.lang.BooleanUtils;
37+
import org.apache.flink.table.shaded.org.apache.commons.lang.StringUtils;
38+
39+
import java.io.BufferedReader;
40+
import java.io.File;
41+
import java.io.FileInputStream;
42+
import java.io.IOException;
43+
import java.io.InputStreamReader;
44+
import java.util.LinkedList;
45+
import java.util.List;
46+
import java.util.Map;
4447

4548
/**
4649
* Date: 2017/2/20

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
130130
Object equalObj = input.getField(conValIndex);
131131
if (equalObj == null) {
132132
resultFuture.complete(null);
133+
return;
133134
}
134135
basicDBObject.put(sideInfo.getEqualFieldList().get(i), equalObj);
135136
}
@@ -142,12 +143,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
142143
dealMissKey(input, resultFuture);
143144
return;
144145
} else if (ECacheContentType.MultiLine == val.getType()) {
145-
146+
List<Row> rowList = Lists.newArrayList();
146147
for (Object jsonArray : (List) val.getContent()) {
147148
Row row = fillData(input, jsonArray);
148-
resultFuture.complete(Collections.singleton(row));
149+
rowList.add(row);
149150
}
150-
151+
resultFuture.complete(rowList);
151152
} else {
152153
throw new RuntimeException("not support cache obj type " + val.getType());
153154
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,15 @@
2626
import io.vertx.core.json.JsonArray;
2727
import io.vertx.ext.sql.SQLClient;
2828
import io.vertx.ext.sql.SQLConnection;
29-
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
3029
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3130
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
32-
import org.apache.flink.configuration.Configuration;
3331
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3432
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3533
import org.apache.flink.types.Row;
3634
import org.slf4j.Logger;
3735
import org.slf4j.LoggerFactory;
3836

39-
import java.math.BigInteger;
4037
import java.sql.Timestamp;
41-
import java.util.Collections;
4238
import java.util.List;
4339
import java.util.Map;
4440

@@ -75,8 +71,8 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
7571
Object equalObj = input.getField(conValIndex);
7672
if (equalObj == null) {
7773
resultFuture.complete(null);
74+
return;
7875
}
79-
8076
inputParams.add(equalObj);
8177
}
8278

@@ -89,12 +85,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
8985
dealMissKey(input, resultFuture);
9086
return;
9187
} else if (ECacheContentType.MultiLine == val.getType()) {
92-
88+
List<Row> rowList = Lists.newArrayList();
9389
for (Object jsonArray : (List) val.getContent()) {
9490
Row row = fillData(input, jsonArray);
95-
resultFuture.complete(Collections.singleton(row));
91+
rowList.add(row);
9692
}
97-
93+
resultFuture.complete(rowList);
9894
} else {
9995
throw new RuntimeException("not support cache obj type " + val.getType());
10096
}
@@ -122,18 +118,21 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
122118

123119
int resultSize = rs.result().getResults().size();
124120
if (resultSize > 0) {
125-
for (JsonArray line : rs.result().getResults()) {
121+
List<Row> rowList = Lists.newArrayList();
126122

123+
for (JsonArray line : rs.result().getResults()) {
127124
Row row = fillData(input, line);
128125
if (openCache()) {
129126
cacheContent.add(line);
130127
}
131-
resultFuture.complete(Collections.singleton(row));
128+
rowList.add(row);
132129
}
133130

134131
if (openCache()) {
135132
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
136133
}
134+
135+
resultFuture.complete(rowList);
137136
} else {
138137
dealMissKey(input, resultFuture);
139138
if (openCache()) {

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
141141
dealMissKey(input, resultFuture);
142142
return;
143143
}else if(ECacheContentType.MultiLine == val.getType()){
144-
Row row = fillData(input, val.getContent());
145-
resultFuture.complete(Collections.singleton(row));
144+
List<Row> rowList = Lists.newArrayList();
145+
for (Object jsonArray : (List) val.getContent()) {
146+
Row row = fillData(input, val.getContent());
147+
rowList.add(row);
148+
}
149+
resultFuture.complete(rowList);
146150
}else{
147151
throw new RuntimeException("not support cache obj type " + val.getType());
148152
}

0 commit comments

Comments
 (0)