Skip to content

Commit 97977ce

Browse files
committed
readme format
2 parents 9ede309 + d2c2ef3 commit 97977ce

File tree

7 files changed

+107
-30
lines changed

7 files changed

+107
-30
lines changed

docs/redisSide.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
password = 'redisPwd',
1212
database = 'dbName',
1313
tableName ='sideTableName',
14+
redisType = '1',
1415
cache ='LRU',
1516
cacheSize ='10000',
1617
cacheTTLMs ='60000'
@@ -35,6 +36,8 @@
3536
| type | 表明维表的类型[hbase|mysql|redis]|||
3637
| url | redis 的地址;格式ip:port[,ip:port]|||
3738
| password | redis 的密码 |||
39+
| redisType | redis模式(1 单机,2 哨兵, 3 集群)||
40+
| masterName | 主节点名称(哨兵模式下为必填项) ||
3841
| database | reids 的数据库地址|||
3942
| tableName | redis 的表名称|||
4043
| cache | 维表缓存策略(NONE/LRU/ALL)||NONE|
@@ -60,6 +63,7 @@ create table sideTable(
6063
url='172.16.10.79:6379',
6164
password='abc123',
6265
database='0',
66+
redisType = '1',
6367
tableName='sidetest',
6468
cache = 'LRU',
6569
cacheTTLMs='10000'

docs/redisSink.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ CREATE TABLE tableName(
99
url = 'ip:port',
1010
database ='dbName',
1111
password ='pwd',
12+
redisType='1',
1213
tableName ='tableName',
1314
parallelism ='parllNum'
1415
);
@@ -35,9 +36,11 @@ redis5.0
3536
|type | 表明 输出表类型[mysql|hbase|elasticsearch|redis]|||
3637
| url | redis 的地址;格式ip:port[,ip:port]|||
3738
| password | redis 的密码 |||
39+
| redisType | redis模式(1 单机,2 哨兵, 3 集群)||
40+
| masterName | 主节点名称(哨兵模式下为必填项) ||
3841
| database | reids 的数据库地址|||
3942
| tableName | redis 的表名称|||
40-
|parallelism | 并行度设置||1|
43+
| parallelism | 并行度设置||1|
4144
4245

4346
## 5.样例:
@@ -51,7 +54,8 @@ redis5.0
5154
url='172.16.10.79:6379',
5255
password='abc123',
5356
database='0',
54-
tableName='sinktoredis',
57+
redisType='1',
58+
tableName='sinktoredis'
5559
);
5660
5761
```

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void flatMap(Row row, Collector<Row> out) throws Exception {
9898
out.collect(null);
9999
}
100100
String columnName = sideInfo.getEqualFieldList().get(conValIndex);
101-
inputParams.put(columnName, (String) equalObj);
101+
inputParams.put(columnName, equalObj.toString());
102102
}
103103
String key = buildKey(inputParams);
104104

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.lettuce.core.api.async.RedisStringAsyncCommands;
3232
import io.lettuce.core.cluster.RedisClusterClient;
3333
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
34+
import org.apache.calcite.sql.JoinType;
3435
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3536
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
3637
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
@@ -39,7 +40,7 @@
3940
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4041
import org.apache.flink.types.Row;
4142

42-
import java.sql.Timestamp;
43+
import java.util.ArrayList;
4344
import java.util.Collections;
4445
import java.util.List;
4546
import java.util.Map;
@@ -123,12 +124,14 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
123124
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
124125
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
125126
Object equalObj = input.getField(conValIndex);
127+
126128
if(equalObj == null){
127-
resultFuture.complete(null);
129+
dealMissKey(input, resultFuture);
130+
return;
128131
}
129-
132+
String value = equalObj.toString();
130133
keyData.add(sideInfo.getEqualFieldList().get(i));
131-
keyData.add((String) equalObj);
134+
keyData.add(value);
132135
}
133136

134137
String key = buildCacheKey(keyData);
@@ -158,29 +161,33 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
158161
Map<String, String> keyValue = Maps.newHashMap();
159162
List<String> value = async.keys(key + ":*").get();
160163
String[] values = value.toArray(new String[value.size()]);
161-
RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values);
162-
future.thenAccept(new Consumer<List<KeyValue<String, String>>>() {
163-
@Override
164-
public void accept(List<KeyValue<String, String>> keyValues) {
165-
if (keyValues.size() != 0){
166-
for (int i=0; i<keyValues.size(); i++){
167-
String[] splitKeys = keyValues.get(i).getKey().split(":");
168-
keyValue.put(splitKeys[1], splitKeys[2]);
169-
keyValue.put(splitKeys[3], keyValues.get(i).getValue());
170-
}
171-
Row row = fillData(input, keyValue);
172-
resultFuture.complete(Collections.singleton(row));
173-
if(openCache()){
174-
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue));
175-
}
176-
} else {
177-
dealMissKey(input, resultFuture);
178-
if(openCache()){
179-
putCache(key, CacheMissVal.getMissKeyObj());
164+
if (values.length == 0){
165+
dealMissKey(input, resultFuture);
166+
} else {
167+
RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values);
168+
future.thenAccept(new Consumer<List<KeyValue<String, String>>>() {
169+
@Override
170+
public void accept(List<KeyValue<String, String>> keyValues) {
171+
if (keyValues.size() != 0) {
172+
for (int i = 0; i < keyValues.size(); i++) {
173+
String[] splitKeys = keyValues.get(i).getKey().split(":");
174+
keyValue.put(splitKeys[1], splitKeys[2]);
175+
keyValue.put(splitKeys[3], keyValues.get(i).getValue());
176+
}
177+
Row row = fillData(input, keyValue);
178+
resultFuture.complete(Collections.singleton(row));
179+
if (openCache()) {
180+
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue));
181+
}
182+
} else {
183+
dealMissKey(input, resultFuture);
184+
if (openCache()) {
185+
putCache(key, CacheMissVal.getMissKeyObj());
186+
}
180187
}
181188
}
182-
}
183-
});
189+
});
190+
}
184191
}
185192

186193
private String buildCacheKey(List<String> keyData) {

redis5/redis5-side/redis-side-core/src/main/java/com/dtstack/flink/sql/side/redis/table/RedisSideReqRow.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.flink.types.Row;
2525

2626
import java.io.Serializable;
27+
import java.math.BigDecimal;
28+
import java.sql.Date;
2729
import java.sql.Timestamp;
2830
import java.util.Map;
2931

@@ -63,10 +65,50 @@ public Row fillData(Row input, Object sideInput) {
6365
row.setField(entry.getKey(), null);
6466
}else{
6567
String key = sideInfo.getSideFieldNameIndex().get(entry.getKey());
66-
row.setField(entry.getKey(), sideInputMap.get(key));
68+
setRowField(row, entry.getKey(), sideInfo, sideInputMap.get(key));
6769
}
6870
}
6971

7072
return row;
7173
}
74+
75+
public void setRowField(Row row, Integer index, SideInfo sideInfo, String value) {
76+
Integer keyIndex = sideInfo.getSideFieldIndex().get(index);
77+
String classType = sideInfo.getSideTableInfo().getFieldClassList().get(keyIndex).getName();
78+
switch (classType){
79+
case "java.lang.Integer":
80+
row.setField(index, Integer.valueOf(value));
81+
break;
82+
case "java.lang.String":
83+
row.setField(index, value);
84+
break;
85+
case "java.lang.Double":
86+
row.setField(index, Double.valueOf(value));
87+
break;
88+
case "java.lang.Long":
89+
row.setField(index, Long.valueOf(value));
90+
break;
91+
case "java.lang.Byte":
92+
row.setField(index, Byte.valueOf(value));
93+
break;
94+
case "java.lang.Short":
95+
row.setField(index, Short.valueOf(value));
96+
break;
97+
case "java.lang.Float":
98+
row.setField(index, Float.valueOf(value));
99+
break;
100+
case "java.math.BigDecimal":
101+
row.setField(index, BigDecimal.valueOf(Long.valueOf(value)));
102+
break;
103+
case "java.sql.Timestamp":
104+
row.setField(index, Timestamp.valueOf(value));
105+
break;
106+
case "java.sql.Date":
107+
row.setField(index, Date.valueOf(value));
108+
break;
109+
default:
110+
throw new RuntimeException("no support field type. the type: " + classType);
111+
}
112+
}
113+
72114
}

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,13 @@ public void writeRecord(Tuple2 record) throws IOException {
171171
for (int i = 0; i < fieldNames.length; i++) {
172172
StringBuilder key = new StringBuilder();
173173
key.append(tableName).append(":").append(perKey).append(":").append(fieldNames[i]);
174-
jedis.set(key.toString(), row.getField(i).toString());
174+
175+
String value = "null";
176+
Object field = row.getField(i);
177+
if (field != null) {
178+
value = field.toString();
179+
}
180+
jedis.set(key.toString(), value);
175181
}
176182

177183
if (outRecords.getCount()%rowLenth == 0){

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import com.dtstack.flink.sql.table.AbsTableParser;
2222
import com.dtstack.flink.sql.table.TableInfo;
2323
import com.dtstack.flink.sql.util.MathUtil;
24+
import org.apache.commons.lang3.StringUtils;
2425

26+
import java.util.ArrayList;
27+
import java.util.Arrays;
2528
import java.util.Map;
2629

2730
public class RedisSinkParser extends AbsTableParser {
@@ -42,6 +45,17 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4245
redisTableInfo.setMinIdle(MathUtil.getString(props.get(RedisTableInfo.MINIDLE.toLowerCase())));
4346
redisTableInfo.setRedisType(MathUtil.getString(props.get(RedisTableInfo.REDIS_TYPE.toLowerCase())));
4447
redisTableInfo.setMasterName(MathUtil.getString(props.get(RedisTableInfo.MASTER_NAME.toLowerCase())));
48+
49+
String primaryKeysStr = MathUtil.getString(props.get("primarykeys"));
50+
ArrayList<String> primaryKeysList = null;
51+
if (!StringUtils.isEmpty(primaryKeysStr)) {
52+
String[] primaryKeysArray = primaryKeysStr.split(",");
53+
primaryKeysList = new ArrayList<String>(Arrays.asList(primaryKeysArray));
54+
} else {
55+
primaryKeysList = new ArrayList<>();
56+
}
57+
redisTableInfo.setPrimaryKeys(primaryKeysList);
58+
4559
return redisTableInfo;
4660
}
4761
}

0 commit comments

Comments
 (0)