Skip to content

Commit faea88d

Browse files
committed
Merge remote-tracking branch 'origin/v1.5.0_dev' into v1.8.0_dev
# Conflicts: # redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java
2 parents faffa35 + d2c2ef3 commit faea88d

File tree

7 files changed

+104
-32
lines changed

7 files changed

+104
-32
lines changed

docs/redisSide.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
password = 'redisPwd',
1212
database = 'dbName',
1313
tableName ='sideTableName',
14+
redisType = '1',
1415
cache ='LRU',
1516
cacheSize ='10000',
1617
cacheTTLMs ='60000'
@@ -35,6 +36,8 @@
3536
| type | 表明维表的类型[hbase\|mysql\|redis]|||
3637
| url | redis 的地址;格式ip:port[,ip:port]|||
3738
| password | redis 的密码 |||
39+
| redisType | redis模式(1 单机,2 哨兵, 3 集群)||
40+
| masterName | 主节点名称(哨兵模式下为必填项) ||
3841
| database | reids 的数据库地址|||
3942
| tableName | redis 的表名称|||
4043
| cache | 维表缓存策略(NONE/LRU/ALL)||NONE|
@@ -60,6 +63,7 @@ create table sideTable(
6063
url='172.16.10.79:6379',
6164
password='abc123',
6265
database='0',
66+
redisType = '1',
6367
tableName='sidetest',
6468
cache = 'LRU',
6569
cacheTTLMs='10000'

docs/redisSink.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ CREATE TABLE tableName(
99
url = 'ip:port',
1010
database ='dbName',
1111
password ='pwd',
12+
redisType='1',
1213
tableName ='tableName',
1314
parallelism ='parllNum'
1415
);
@@ -32,12 +33,14 @@ redis5.0
3233

3334
|参数名称|含义|是否必填|默认值|
3435
|----|---|---|-----|
35-
|type | 表明 输出表类型[mysql\|hbase\|elasticsearch\|redis\]|||
36+
| type | 表明 输出表类型[mysql\|hbase\|elasticsearch\|redis\]|||
3637
| url | redis 的地址;格式ip:port[,ip:port]|||
3738
| password | redis 的密码 |||
39+
| redisType | redis模式(1 单机,2 哨兵, 3 集群)||
40+
| masterName | 主节点名称(哨兵模式下为必填项) ||
3841
| database | reids 的数据库地址|||
3942
| tableName | redis 的表名称|||
40-
|parallelism | 并行度设置||1|
43+
| parallelism | 并行度设置||1|
4144
4245

4346
## 5.样例:
@@ -51,7 +54,8 @@ redis5.0
5154
url='172.16.10.79:6379',
5255
password='abc123',
5356
database='0',
54-
tableName='sinktoredis',
57+
redisType='1',
58+
tableName='sinktoredis'
5559
);
5660
5761
```

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void flatMap(Row row, Collector<Row> out) throws Exception {
9898
out.collect(null);
9999
}
100100
String columnName = sideInfo.getEqualFieldList().get(conValIndex);
101-
inputParams.put(columnName, (String) equalObj);
101+
inputParams.put(columnName, equalObj.toString());
102102
}
103103
String key = buildKey(inputParams);
104104

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,8 @@
3636
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
3737
import org.apache.flink.configuration.Configuration;
3838
import org.apache.flink.streaming.api.functions.async.ResultFuture;
39-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4039
import org.apache.flink.types.Row;
4140

42-
import java.sql.Timestamp;
4341
import java.util.Collections;
4442
import java.util.List;
4543
import java.util.Map;
@@ -124,12 +122,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
124122
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
125123
Object equalObj = input.getField(conValIndex);
126124
if(equalObj == null){
127-
resultFuture.complete(null);
125+
dealMissKey(input, resultFuture);
128126
return;
129127
}
130-
128+
String value = equalObj.toString();
131129
keyData.add(sideInfo.getEqualFieldList().get(i));
132-
keyData.add((String) equalObj);
130+
keyData.add(value);
133131
}
134132

135133
String key = buildCacheKey(keyData);
@@ -159,29 +157,33 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
159157
Map<String, String> keyValue = Maps.newHashMap();
160158
List<String> value = async.keys(key + ":*").get();
161159
String[] values = value.toArray(new String[value.size()]);
162-
RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values);
163-
future.thenAccept(new Consumer<List<KeyValue<String, String>>>() {
164-
@Override
165-
public void accept(List<KeyValue<String, String>> keyValues) {
166-
if (keyValues.size() != 0){
167-
for (int i=0; i<keyValues.size(); i++){
168-
String[] splitKeys = keyValues.get(i).getKey().split(":");
169-
keyValue.put(splitKeys[1], splitKeys[2]);
170-
keyValue.put(splitKeys[3], keyValues.get(i).getValue());
171-
}
172-
Row row = fillData(input, keyValue);
173-
resultFuture.complete(Collections.singleton(row));
174-
if(openCache()){
175-
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue));
176-
}
177-
} else {
178-
dealMissKey(input, resultFuture);
179-
if(openCache()){
180-
putCache(key, CacheMissVal.getMissKeyObj());
160+
if (values.length == 0){
161+
dealMissKey(input, resultFuture);
162+
} else {
163+
RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values);
164+
future.thenAccept(new Consumer<List<KeyValue<String, String>>>() {
165+
@Override
166+
public void accept(List<KeyValue<String, String>> keyValues) {
167+
if (keyValues.size() != 0) {
168+
for (int i = 0; i < keyValues.size(); i++) {
169+
String[] splitKeys = keyValues.get(i).getKey().split(":");
170+
keyValue.put(splitKeys[1], splitKeys[2]);
171+
keyValue.put(splitKeys[3], keyValues.get(i).getValue());
172+
}
173+
Row row = fillData(input, keyValue);
174+
resultFuture.complete(Collections.singleton(row));
175+
if (openCache()) {
176+
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue));
177+
}
178+
} else {
179+
dealMissKey(input, resultFuture);
180+
if (openCache()) {
181+
putCache(key, CacheMissVal.getMissKeyObj());
182+
}
181183
}
182184
}
183-
}
184-
});
185+
});
186+
}
185187
}
186188

187189
private String buildCacheKey(List<String> keyData) {

redis5/redis5-side/redis-side-core/src/main/java/com/dtstack/flink/sql/side/redis/table/RedisSideReqRow.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.flink.types.Row;
2525

2626
import java.io.Serializable;
27+
import java.math.BigDecimal;
28+
import java.sql.Date;
2729
import java.sql.Timestamp;
2830
import java.util.Map;
2931

@@ -63,10 +65,50 @@ public Row fillData(Row input, Object sideInput) {
6365
row.setField(entry.getKey(), null);
6466
}else{
6567
String key = sideInfo.getSideFieldNameIndex().get(entry.getKey());
66-
row.setField(entry.getKey(), sideInputMap.get(key));
68+
setRowField(row, entry.getKey(), sideInfo, sideInputMap.get(key));
6769
}
6870
}
6971

7072
return row;
7173
}
74+
75+
public void setRowField(Row row, Integer index, SideInfo sideInfo, String value) {
76+
Integer keyIndex = sideInfo.getSideFieldIndex().get(index);
77+
String classType = sideInfo.getSideTableInfo().getFieldClassList().get(keyIndex).getName();
78+
switch (classType){
79+
case "java.lang.Integer":
80+
row.setField(index, Integer.valueOf(value));
81+
break;
82+
case "java.lang.String":
83+
row.setField(index, value);
84+
break;
85+
case "java.lang.Double":
86+
row.setField(index, Double.valueOf(value));
87+
break;
88+
case "java.lang.Long":
89+
row.setField(index, Long.valueOf(value));
90+
break;
91+
case "java.lang.Byte":
92+
row.setField(index, Byte.valueOf(value));
93+
break;
94+
case "java.lang.Short":
95+
row.setField(index, Short.valueOf(value));
96+
break;
97+
case "java.lang.Float":
98+
row.setField(index, Float.valueOf(value));
99+
break;
100+
case "java.math.BigDecimal":
101+
row.setField(index, BigDecimal.valueOf(Long.valueOf(value)));
102+
break;
103+
case "java.sql.Timestamp":
104+
row.setField(index, Timestamp.valueOf(value));
105+
break;
106+
case "java.sql.Date":
107+
row.setField(index, Date.valueOf(value));
108+
break;
109+
default:
110+
throw new RuntimeException("no support field type. the type: " + classType);
111+
}
112+
}
113+
72114
}

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,13 @@ public void writeRecord(Tuple2 record) throws IOException {
171171
for (int i = 0; i < fieldNames.length; i++) {
172172
StringBuilder key = new StringBuilder();
173173
key.append(tableName).append(":").append(perKey).append(":").append(fieldNames[i]);
174-
jedis.set(key.toString(), row.getField(i).toString());
174+
175+
String value = "null";
176+
Object field = row.getField(i);
177+
if (field != null) {
178+
value = field.toString();
179+
}
180+
jedis.set(key.toString(), value);
175181
}
176182

177183
if (outRecords.getCount()%rowLenth == 0){

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import com.dtstack.flink.sql.table.AbsTableParser;
2222
import com.dtstack.flink.sql.table.TableInfo;
2323
import com.dtstack.flink.sql.util.MathUtil;
24+
import org.apache.commons.lang3.StringUtils;
2425

26+
import java.util.ArrayList;
27+
import java.util.Arrays;
2528
import java.util.Map;
2629

2730
public class RedisSinkParser extends AbsTableParser {
@@ -42,6 +45,17 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4245
redisTableInfo.setMinIdle(MathUtil.getString(props.get(RedisTableInfo.MINIDLE.toLowerCase())));
4346
redisTableInfo.setRedisType(MathUtil.getString(props.get(RedisTableInfo.REDIS_TYPE.toLowerCase())));
4447
redisTableInfo.setMasterName(MathUtil.getString(props.get(RedisTableInfo.MASTER_NAME.toLowerCase())));
48+
49+
String primaryKeysStr = MathUtil.getString(props.get("primarykeys"));
50+
ArrayList<String> primaryKeysList = null;
51+
if (!StringUtils.isEmpty(primaryKeysStr)) {
52+
String[] primaryKeysArray = primaryKeysStr.split(",");
53+
primaryKeysList = new ArrayList<String>(Arrays.asList(primaryKeysArray));
54+
} else {
55+
primaryKeysList = new ArrayList<>();
56+
}
57+
redisTableInfo.setPrimaryKeys(primaryKeysList);
58+
4559
return redisTableInfo;
4660
}
4761
}

0 commit comments

Comments
 (0)