Skip to content

Commit 99fca76

Browse files
committed
Merge branch 'v1.5.0_dev_feature_redis' into 'v1.5.0_dev'
修改redis结果表和维表bug redis结果表和维表bug修改 See merge request !120
2 parents a82f900 + eb6e7e2 commit 99fca76

File tree

7 files changed

+110
-29
lines changed

7 files changed

+110
-29
lines changed

docs/redisSide.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
password = 'redisPwd',
1212
database = 'dbName',
1313
tableName ='sideTableName',
14+
redisType = '1',
1415
cache ='LRU',
1516
cacheSize ='10000',
1617
cacheTTLMs ='60000'
@@ -35,6 +36,8 @@
3536
| type | 表明维表的类型[hbase\|mysql\|redis]|||
3637
| url | redis 的地址;格式ip:port[,ip:port]|||
3738
| password | redis 的密码 |||
39+
| redisType | redis模式(1 单机,2 哨兵, 3 集群)||
40+
| masterName | 主节点名称(哨兵模式下为必填项) ||
3841
| database | reids 的数据库地址|||
3942
| tableName | redis 的表名称|||
4043
| cache | 维表缓存策略(NONE/LRU/ALL)||NONE|
@@ -60,6 +63,7 @@ create table sideTable(
6063
url='172.16.10.79:6379',
6164
password='abc123',
6265
database='0',
66+
redisType = '1',
6367
tableName='sidetest',
6468
cache = 'LRU',
6569
cacheTTLMs='10000'

docs/redisSink.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ CREATE TABLE tableName(
99
url = 'ip:port',
1010
database ='dbName',
1111
password ='pwd',
12+
redisType='1',
1213
tableName ='tableName',
1314
parallelism ='parllNum'
1415
);
@@ -32,12 +33,14 @@ redis5.0
3233

3334
|参数名称|含义|是否必填|默认值|
3435
|----|---|---|-----|
35-
|type | 表明 输出表类型[mysql\|hbase\|elasticsearch\|redis\]|||
36+
| type | 表明 输出表类型[mysql\|hbase\|elasticsearch\|redis\]|||
3637
| url | redis 的地址;格式ip:port[,ip:port]|||
3738
| password | redis 的密码 |||
39+
| redisType | redis模式(1 单机,2 哨兵, 3 集群)||
40+
| masterName | 主节点名称(哨兵模式下为必填项) ||
3841
| database | reids 的数据库地址|||
3942
| tableName | redis 的表名称|||
40-
|parallelism | 并行度设置||1|
43+
| parallelism | 并行度设置||1|
4144
4245

4346
## 5.样例:
@@ -51,7 +54,8 @@ redis5.0
5154
url='172.16.10.79:6379',
5255
password='abc123',
5356
database='0',
54-
tableName='sinktoredis',
57+
redisType='1',
58+
tableName='sinktoredis'
5559
);
5660
5761
```

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void flatMap(Row row, Collector<Row> out) throws Exception {
9898
out.collect(null);
9999
}
100100
String columnName = sideInfo.getEqualFieldList().get(conValIndex);
101-
inputParams.put(columnName, (String) equalObj);
101+
inputParams.put(columnName, equalObj.toString());
102102
}
103103
String key = buildKey(inputParams);
104104

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4040
import org.apache.flink.types.Row;
4141

42-
import java.sql.Timestamp;
42+
import java.util.ArrayList;
4343
import java.util.Collections;
4444
import java.util.List;
4545
import java.util.Map;
@@ -123,12 +123,18 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
123123
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
124124
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
125125
Object equalObj = input.getField(conValIndex);
126+
127+
String value = "";
128+
126129
if(equalObj == null){
127130
resultFuture.complete(null);
131+
value = "null";
132+
} else {
133+
value = equalObj.toString();
128134
}
129135

130136
keyData.add(sideInfo.getEqualFieldList().get(i));
131-
keyData.add((String) equalObj);
137+
keyData.add(value);
132138
}
133139

134140
String key = buildCacheKey(keyData);
@@ -158,29 +164,34 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
158164
Map<String, String> keyValue = Maps.newHashMap();
159165
List<String> value = async.keys(key + ":*").get();
160166
String[] values = value.toArray(new String[value.size()]);
161-
RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values);
162-
future.thenAccept(new Consumer<List<KeyValue<String, String>>>() {
163-
@Override
164-
public void accept(List<KeyValue<String, String>> keyValues) {
165-
if (keyValues.size() != 0){
166-
for (int i=0; i<keyValues.size(); i++){
167-
String[] splitKeys = keyValues.get(i).getKey().split(":");
168-
keyValue.put(splitKeys[1], splitKeys[2]);
169-
keyValue.put(splitKeys[3], keyValues.get(i).getValue());
170-
}
171-
Row row = fillData(input, keyValue);
172-
resultFuture.complete(Collections.singleton(row));
173-
if(openCache()){
174-
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue));
175-
}
176-
} else {
177-
dealMissKey(input, resultFuture);
178-
if(openCache()){
179-
putCache(key, CacheMissVal.getMissKeyObj());
167+
if (values.length == 0){
168+
Row row = fillData(input, null);
169+
resultFuture.complete(Collections.singleton(row));
170+
} else {
171+
RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values);
172+
future.thenAccept(new Consumer<List<KeyValue<String, String>>>() {
173+
@Override
174+
public void accept(List<KeyValue<String, String>> keyValues) {
175+
if (keyValues.size() != 0) {
176+
for (int i = 0; i < keyValues.size(); i++) {
177+
String[] splitKeys = keyValues.get(i).getKey().split(":");
178+
keyValue.put(splitKeys[1], splitKeys[2]);
179+
keyValue.put(splitKeys[3], keyValues.get(i).getValue());
180+
}
181+
Row row = fillData(input, keyValue);
182+
resultFuture.complete(Collections.singleton(row));
183+
if (openCache()) {
184+
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue));
185+
}
186+
} else {
187+
dealMissKey(input, resultFuture);
188+
if (openCache()) {
189+
putCache(key, CacheMissVal.getMissKeyObj());
190+
}
180191
}
181192
}
182-
}
183-
});
193+
});
194+
}
184195
}
185196

186197
private String buildCacheKey(List<String> keyData) {

redis5/redis5-side/redis-side-core/src/main/java/com/dtstack/flink/sql/side/redis/table/RedisSideReqRow.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.flink.types.Row;
2525

2626
import java.io.Serializable;
27+
import java.math.BigDecimal;
28+
import java.sql.Date;
2729
import java.sql.Timestamp;
2830
import java.util.Map;
2931

@@ -63,10 +65,50 @@ public Row fillData(Row input, Object sideInput) {
6365
row.setField(entry.getKey(), null);
6466
}else{
6567
String key = sideInfo.getSideFieldNameIndex().get(entry.getKey());
66-
row.setField(entry.getKey(), sideInputMap.get(key));
68+
setRowField(row, entry.getKey(), sideInfo, sideInputMap.get(key));
6769
}
6870
}
6971

7072
return row;
7173
}
74+
75+
public void setRowField(Row row, Integer index, SideInfo sideInfo, String value) {
76+
Integer keyIndex = sideInfo.getSideFieldIndex().get(index);
77+
String classType = sideInfo.getSideTableInfo().getFieldClassList().get(keyIndex).getName();
78+
switch (classType){
79+
case "java.lang.Integer":
80+
row.setField(index, Integer.valueOf(value));
81+
break;
82+
case "java.lang.String":
83+
row.setField(index, value);
84+
break;
85+
case "java.lang.Double":
86+
row.setField(index, Double.valueOf(value));
87+
break;
88+
case "java.lang.Long":
89+
row.setField(index, Long.valueOf(value));
90+
break;
91+
case "java.lang.Byte":
92+
row.setField(index, Byte.valueOf(value));
93+
break;
94+
case "java.lang.Short":
95+
row.setField(index, Short.valueOf(value));
96+
break;
97+
case "java.lang.Float":
98+
row.setField(index, Float.valueOf(value));
99+
break;
100+
case "java.math.BigDecimal":
101+
row.setField(index, BigDecimal.valueOf(Long.valueOf(value)));
102+
break;
103+
case "java.sql.Timestamp":
104+
row.setField(index, Timestamp.valueOf(value));
105+
break;
106+
case "java.sql.Date":
107+
row.setField(index, Date.valueOf(value));
108+
break;
109+
default:
110+
throw new RuntimeException("no support field type. the type: " + classType);
111+
}
112+
}
113+
72114
}

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,13 @@ public void writeRecord(Tuple2 record) throws IOException {
171171
for (int i = 0; i < fieldNames.length; i++) {
172172
StringBuilder key = new StringBuilder();
173173
key.append(tableName).append(":").append(perKey).append(":").append(fieldNames[i]);
174-
jedis.set(key.toString(), row.getField(i).toString());
174+
175+
String value = "null";
176+
Object field = row.getField(i);
177+
if (field != null) {
178+
value = field.toString();
179+
}
180+
jedis.set(key.toString(), value);
175181
}
176182

177183
if (outRecords.getCount()%rowLenth == 0){

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import com.dtstack.flink.sql.table.AbsTableParser;
2222
import com.dtstack.flink.sql.table.TableInfo;
2323
import com.dtstack.flink.sql.util.MathUtil;
24+
import org.apache.commons.lang3.StringUtils;
2425

26+
import java.util.ArrayList;
27+
import java.util.Arrays;
2528
import java.util.Map;
2629

2730
public class RedisSinkParser extends AbsTableParser {
@@ -42,6 +45,17 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4245
redisTableInfo.setMinIdle(MathUtil.getString(props.get(RedisTableInfo.MINIDLE.toLowerCase())));
4346
redisTableInfo.setRedisType(MathUtil.getString(props.get(RedisTableInfo.REDIS_TYPE.toLowerCase())));
4447
redisTableInfo.setMasterName(MathUtil.getString(props.get(RedisTableInfo.MASTER_NAME.toLowerCase())));
48+
49+
String primaryKeysStr = MathUtil.getString(props.get("primarykeys"));
50+
ArrayList<String> primaryKeysList = null;
51+
if (!StringUtils.isEmpty(primaryKeysStr)) {
52+
String[] primaryKeysArray = primaryKeysStr.split(",");
53+
primaryKeysList = new ArrayList<String>(Arrays.asList(primaryKeysArray));
54+
} else {
55+
primaryKeysList = new ArrayList<>();
56+
}
57+
redisTableInfo.setPrimaryKeys(primaryKeysList);
58+
4559
return redisTableInfo;
4660
}
4761
}

0 commit comments

Comments
 (0)