Skip to content

Commit d3db08b

Browse files
authored
[Feature-#1451] add column replace for hbase column (#1452)
1 parent bc190ce commit d3db08b

File tree

2 files changed

+83
-5
lines changed

2 files changed

+83
-5
lines changed

chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/FunctionParser.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,4 +172,15 @@ public static String replaceColToStringFunc(String express) {
172172

173173
return express;
174174
}
175+
176+
public static List<String> getRegexColumnName(String qualifier) {
177+
Matcher matcher = COL_PATTERN.matcher(qualifier);
178+
ArrayList<String> columnQualifier = new ArrayList<>();
179+
while (matcher.find()) {
180+
String columnGroup = matcher.group();
181+
String column = columnGroup.substring(2, columnGroup.length() - 1);
182+
columnQualifier.add(column);
183+
}
184+
return columnQualifier;
185+
}
175186
}

chunjun-connectors/chunjun-connector-hbase-base/src/main/java/com/dtstack/chunjun/connector/hbase/converter/HBaseColumnConverter.java

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@
6060
import java.text.SimpleDateFormat;
6161
import java.time.LocalTime;
6262
import java.util.ArrayList;
63+
import java.util.Arrays;
6364
import java.util.Date;
6465
import java.util.HashMap;
66+
import java.util.HashSet;
6567
import java.util.List;
6668
import java.util.Map;
6769

@@ -96,6 +98,8 @@ public class HBaseColumnConverter
9698

9799
private final List<String> columnNames = new ArrayList<>();
98100

101+
private final List<String> columnNamesWithoutcf = new ArrayList<>();
102+
99103
private final String encoding;
100104

101105
private final HBaseConf hBaseConf;
@@ -106,7 +110,13 @@ public class HBaseColumnConverter
106110

107111
private final List<FieldConf> fieldList;
108112

109-
private final byte[][][] familyAndQualifier;
113+
private byte[][][] familyAndQualifier;
114+
115+
private final byte[][][] familyAndQualifierBack;
116+
117+
private final ArrayList<HashMap<String, Integer>> columnConfig;
118+
119+
private final HashSet<Integer> columnConfigIndex;
110120

111121
public HBaseColumnConverter(HBaseConf hBaseConf, RowType rowType) {
112122
super(rowType, hBaseConf);
@@ -123,6 +133,9 @@ public HBaseColumnConverter(HBaseConf hBaseConf, RowType rowType) {
123133
createExternalConverter(rowType.getTypeAt(i)), rowType.getTypeAt(i)));
124134
}
125135
this.familyAndQualifier = new byte[rowType.getFieldCount()][][];
136+
this.familyAndQualifierBack = new byte[rowType.getFieldCount()][][];
137+
this.columnConfig = new ArrayList<>(rowType.getFieldCount());
138+
this.columnConfigIndex = new HashSet<>(rowType.getFieldCount());
126139
for (int i = 0; i < hBaseConf.getColumn().size(); i++) {
127140
FieldConf fieldConf = hBaseConf.getColumn().get(i);
128141
String name = fieldConf.getName();
@@ -131,20 +144,26 @@ public HBaseColumnConverter(HBaseConf hBaseConf, RowType rowType) {
131144
if (cfAndQualifier.length == 2
132145
&& StringUtils.isNotBlank(cfAndQualifier[0])
133146
&& StringUtils.isNotBlank(cfAndQualifier[1])) {
134-
147+
columnNamesWithoutcf.add(cfAndQualifier[1]);
135148
byte[][] qualifierKeys = new byte[2][];
136-
qualifierKeys[0] = Bytes.toBytes(cfAndQualifier[0]);
137-
qualifierKeys[1] = Bytes.toBytes(cfAndQualifier[1]);
149+
qualifierKeys[0] = Bytes.toBytes(cfAndQualifier[0]); // 列族
150+
qualifierKeys[1] = Bytes.toBytes(cfAndQualifier[1]); // 列名
151+
columnConfig.add(i, handleColumnConfig(cfAndQualifier[1]));
138152
familyAndQualifier[i] = qualifierKeys;
153+
familyAndQualifierBack[i] = Arrays.copyOf(qualifierKeys, qualifierKeys.length);
139154
} else if (KEY_ROW_KEY.equals(name)) {
140155
rowKeyIndex = i;
156+
columnNamesWithoutcf.add(KEY_ROW_KEY);
157+
columnConfig.add(i, null);
141158
} else if (!StringUtils.isBlank(fieldConf.getValue())) {
142159
familyAndQualifier[i] = new byte[2][];
160+
familyAndQualifierBack[i] = new byte[2][];
143161
} else {
144162
throw new IllegalArgumentException(
145163
"hbase 中,column 的列配置格式应该是:列族:列名. 您配置的列错误:" + name);
146164
}
147165
}
166+
148167
fieldList = hBaseConf.getColumnMetaInfos();
149168

150169
this.hBaseConf = hBaseConf;
@@ -193,10 +212,28 @@ public Mutation toExternal(RowData rowData, Mutation output) throws Exception {
193212
}
194213

195214
for (int i = 0; i < rowData.getArity(); i++) {
196-
if (rowKeyIndex == i) {
215+
if (rowKeyIndex == i || columnConfigIndex.contains(i)) {
197216
continue;
198217
}
218+
if (columnConfig.get(i) != null) {
219+
byte[][] qualifier = familyAndQualifier[i];
220+
qualifier[1] =
221+
fillColumnConfig(new String(qualifier[1]), columnConfig.get(i), rowData);
222+
familyAndQualifier[i] = qualifier;
223+
}
199224
toExternalConverters.get(i).serialize(rowData, i, put);
225+
if (i == rowData.getArity() - 1) {
226+
for (int x = 0; x < familyAndQualifierBack.length; x++) {
227+
familyAndQualifier[x] =
228+
Arrays.copyOf(
229+
familyAndQualifierBack[x], familyAndQualifierBack[x].length);
230+
if (x + 1 < familyAndQualifierBack.length
231+
&& familyAndQualifierBack[x + 1] == null) {
232+
familyAndQualifier[x + 1] = null;
233+
x = x + 1;
234+
}
235+
}
236+
}
200237
}
201238
return put;
202239
}
@@ -594,4 +631,34 @@ private static SimpleDateFormat getSimpleDateFormat(String sign) {
594631
}
595632
return format;
596633
}
634+
635+
private HashMap<String, Integer> handleColumnConfig(String qualifier) {
636+
HashMap<String, Integer> columnConfigMap = new HashMap<>(columnNames.size());
637+
List<String> regexColumnNameList = FunctionParser.getRegexColumnName(qualifier);
638+
if (!regexColumnNameList.isEmpty()) {
639+
for (int i = 0; i < regexColumnNameList.size(); i++) {
640+
columnConfigMap.put(
641+
regexColumnNameList.get(i),
642+
columnNamesWithoutcf.indexOf(regexColumnNameList.get(i)));
643+
columnConfigIndex.add(columnNamesWithoutcf.indexOf(regexColumnNameList.get(i)));
644+
}
645+
} else {
646+
columnConfigMap = null;
647+
}
648+
return columnConfigMap;
649+
}
650+
651+
private byte[] fillColumnConfig(
652+
String columnValue, HashMap<String, Integer> columnConfigMap, RowData rowData) {
653+
List<String> regexColumnNameList = FunctionParser.getRegexColumnName(columnValue);
654+
for (String regrexColumn : regexColumnNameList) {
655+
Integer columnIndex = columnConfigMap.get(regrexColumn);
656+
columnValue =
657+
StringUtils.replace(
658+
columnValue,
659+
"$(" + regrexColumn + ")",
660+
rowData.getString(columnIndex).toString());
661+
}
662+
return Bytes.toBytes(columnValue);
663+
}
597664
}

0 commit comments

Comments
 (0)