Skip to content

Commit 6ec486b

Browse files
jefftlintiezhu
authored andcommitted
[fix][connectors redis]redis sink values保留字段类型方便反序列化
1 parent 788bc76 commit 6ec486b

File tree

1 file changed

+15
-1
lines changed

1 file changed

+15
-1
lines changed

flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisColumnConverter.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.dtstack.flinkx.element.column.SqlDateColumn;
2929
import com.dtstack.flinkx.element.column.StringColumn;
3030
import com.dtstack.flinkx.element.column.TimestampColumn;
31+
import com.dtstack.flinkx.util.JsonUtil;
3132

3233
import org.apache.flink.table.data.RowData;
3334
import org.apache.flink.table.types.logical.LogicalType;
@@ -38,7 +39,10 @@
3839

3940
import java.text.SimpleDateFormat;
4041
import java.util.ArrayList;
42+
import java.util.HashMap;
4143
import java.util.List;
44+
import java.util.Map;
45+
import java.util.Objects;
4246

4347
import static com.dtstack.flinkx.connector.redis.options.RedisOptions.REDIS_CRITICAL_TIME;
4448
import static com.dtstack.flinkx.connector.redis.options.RedisOptions.REDIS_KEY_VALUE_SIZE;
@@ -168,7 +172,17 @@ private String[] getValues(ColumnRowData row) {
168172
}
169173

170174
private String concatValues(ColumnRowData row) {
171-
return StringUtils.join(getValues(row), redisConf.getValueFieldDelimiter());
175+
List<FieldConf> columns = redisConf.getColumn();
176+
Map<String, Object> fieldMap = new HashMap<>();
177+
int index = 0;
178+
179+
for (FieldConf fieldConf : columns) {
180+
if (Objects.nonNull(row.getField(index))) {
181+
fieldMap.put(fieldConf.getName(), row.getField(index).getData());
182+
}
183+
index++;
184+
}
185+
return JsonUtil.toJson(fieldMap);
172186
}
173187

174188
private String concatKey(ColumnRowData row) {

0 commit comments

Comments
 (0)