Skip to content

Commit ec341ed

Browse files
committed
hbase upsert mode
1 parent 4a17212 commit ec341ed

File tree

4 files changed

+101
-36
lines changed

4 files changed

+101
-36
lines changed

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 77 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.sink.hbase;
2221

22+
import com.dtstack.flink.sql.enums.EUpdateMode;
2323
import com.dtstack.flink.sql.sink.MetricOutputFormat;
2424
import com.google.common.collect.Lists;
2525
import org.apache.commons.lang3.StringUtils;
@@ -31,10 +31,13 @@
3131
import org.apache.hadoop.hbase.TableName;
3232
import org.apache.hadoop.hbase.client.Connection;
3333
import org.apache.hadoop.hbase.client.ConnectionFactory;
34+
import org.apache.hadoop.hbase.client.Delete;
3435
import org.apache.hadoop.hbase.client.Put;
3536
import org.apache.hadoop.hbase.client.Table;
37+
import org.apache.hadoop.hbase.util.Bytes;
3638
import org.slf4j.Logger;
3739
import org.slf4j.LoggerFactory;
40+
3841
import java.io.IOException;
3942
import java.text.SimpleDateFormat;
4043
import java.util.List;
@@ -54,8 +57,9 @@ public class HbaseOutputFormat extends MetricOutputFormat<Tuple2> {
5457
private String[] rowkey;
5558
private String tableName;
5659
private String[] columnNames;
60+
private String updateMode;
5761
private String[] columnTypes;
58-
private Map<String,String> columnNameFamily;
62+
private Map<String, String> columnNameFamily;
5963

6064
private String[] families;
6165
private String[] qualifiers;
@@ -75,7 +79,7 @@ public void configure(Configuration parameters) {
7579
LOG.warn("---configure---");
7680
conf = HBaseConfiguration.create();
7781
conf.set("hbase.zookeeper.quorum", host);
78-
if(zkParent != null && !"".equals(zkParent)){
82+
if (zkParent != null && !"".equals(zkParent)) {
7983
conf.set("zookeeper.znode.parent", zkParent);
8084
}
8185
LOG.warn("---configure end ---");
@@ -91,38 +95,23 @@ public void open(int taskNumber, int numTasks) throws IOException {
9195
}
9296

9397
@Override
94-
public void writeRecord(Tuple2 tuple2) {
95-
98+
public void writeRecord(Tuple2 tuple2) {
9699
Tuple2<Boolean, Row> tupleTrans = tuple2;
97100
Boolean retract = tupleTrans.getField(0);
98-
if(!retract){
99-
//FIXME 暂时不处理hbase删除操作--->hbase要求有key,所有认为都是可以执行update查找
100-
return;
101+
if (!retract && StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) {
102+
dealDelete(tupleTrans);
103+
} else {
104+
dealInsert(tupleTrans);
101105
}
106+
}
102107

108+
protected void dealInsert(Tuple2<Boolean, Row> tupleTrans) {
103109
Row record = tupleTrans.getField(1);
104-
List<String> rowKeyValues = getRowKeyValues(record);
105-
// all rowkey not null
106-
if (rowKeyValues.size() != rowkey.length ) {
107-
LOG.error("row key value must not null,record is ..", record);
108-
outDirtyRecords.inc();
110+
Put put = getPutByRow(record);
111+
if (put == null) {
109112
return;
110113
}
111114

112-
String key = StringUtils.join(rowKeyValues, "-");
113-
Put put = new Put(key.getBytes());
114-
for(int i = 0; i < record.getArity(); ++i) {
115-
Object fieldVal = record.getField(i);
116-
if (fieldVal == null) {
117-
continue;
118-
}
119-
byte[] val = fieldVal.toString().getBytes();
120-
byte[] cf = families[i].getBytes();
121-
byte[] qualifier = qualifiers[i].getBytes();
122-
123-
put.addColumn(cf, qualifier, val);
124-
}
125-
126115
try {
127116
table.put(put);
128117
} catch (IOException e) {
@@ -137,14 +126,65 @@ public void writeRecord(Tuple2 tuple2) {
137126
LOG.info(record.toString());
138127
}
139128
outRecords.inc();
129+
}
140130

131+
protected void dealDelete(Tuple2<Boolean, Row> tupleTrans) {
132+
Row record = tupleTrans.getField(1);
133+
String rowKey = buildRowKey(record);
134+
if (!StringUtils.isEmpty(rowKey)) {
135+
Delete delete = new Delete(Bytes.toBytes(rowKey));
136+
try {
137+
table.delete(delete);
138+
} catch (IOException e) {
139+
outDirtyRecords.inc();
140+
if (outDirtyRecords.getCount() % dirtyDataPrintFrequency == 0 || LOG.isDebugEnabled()) {
141+
LOG.error("record insert failed ..", record.toString());
142+
LOG.error("", e);
143+
}
144+
}
145+
if (outRecords.getCount() % rowLenth == 0) {
146+
LOG.info(record.toString());
147+
}
148+
outRecords.inc();
149+
}
150+
}
151+
152+
private Put getPutByRow(Row record) {
153+
String rowKey = buildRowKey(record);
154+
if (StringUtils.isEmpty(rowKey)) {
155+
return null;
156+
}
157+
Put put = new Put(rowKey.getBytes());
158+
for (int i = 0; i < record.getArity(); ++i) {
159+
Object fieldVal = record.getField(i);
160+
if (fieldVal == null) {
161+
continue;
162+
}
163+
byte[] val = fieldVal.toString().getBytes();
164+
byte[] cf = families[i].getBytes();
165+
byte[] qualifier = qualifiers[i].getBytes();
166+
167+
put.addColumn(cf, qualifier, val);
168+
}
169+
return put;
170+
}
171+
172+
private String buildRowKey(Row record) {
173+
List<String> rowKeyValues = getRowKeyValues(record);
174+
// all rowkey not null
175+
if (rowKeyValues.size() != rowkey.length) {
176+
LOG.error("row key value must not null,record is ..", record);
177+
outDirtyRecords.inc();
178+
return "";
179+
}
180+
return StringUtils.join(rowKeyValues, "-");
141181
}
142182

143183
private List<String> getRowKeyValues(Row record) {
144184
List<String> rowKeyValues = Lists.newArrayList();
145185
for (int i = 0; i < rowkey.length; ++i) {
146186
String colName = rowkey[i];
147-
int rowKeyIndex = 0; //rowkey index
187+
int rowKeyIndex = 0;
148188
for (; rowKeyIndex < columnNames.length; ++rowKeyIndex) {
149189
if (columnNames[rowKeyIndex].equals(colName)) {
150190
break;
@@ -168,13 +208,14 @@ private List<String> getRowKeyValues(Row record) {
168208

169209
@Override
170210
public void close() throws IOException {
171-
if(conn != null) {
211+
if (conn != null) {
172212
conn.close();
173213
conn = null;
174214
}
175215
}
176216

177-
private HbaseOutputFormat() {}
217+
private HbaseOutputFormat() {
218+
}
178219

179220
public static HbaseOutputFormatBuilder buildHbaseOutputFormat() {
180221
return new HbaseOutputFormatBuilder();
@@ -193,7 +234,7 @@ public HbaseOutputFormatBuilder setHost(String host) {
193234
return this;
194235
}
195236

196-
public HbaseOutputFormatBuilder setZkParent(String parent){
237+
public HbaseOutputFormatBuilder setZkParent(String parent) {
197238
format.zkParent = parent;
198239
return this;
199240
}
@@ -209,6 +250,11 @@ public HbaseOutputFormatBuilder setRowkey(String[] rowkey) {
209250
return this;
210251
}
211252

253+
public HbaseOutputFormatBuilder setUpdateMode(String updateMode) {
254+
format.updateMode = updateMode;
255+
return this;
256+
}
257+
212258
public HbaseOutputFormatBuilder setColumnNames(String[] columnNames) {
213259
format.columnNames = columnNames;
214260
return this;

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class HbaseSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
5151
protected String port;
5252
protected String parent;
5353
protected String tableName;
54+
protected String updateMode;
5455
protected String[] rowkey;
5556

5657
public HbaseSink() {
@@ -66,17 +67,20 @@ public HbaseSink genStreamSink(TargetTableInfo targetTableInfo) {
6667
this.tableName = hbaseTableInfo.getTableName();
6768
this.rowkey = hbaseTableInfo.getRowkey();
6869
this.columnNameFamily = hbaseTableInfo.getColumnNameFamily();
70+
this.updateMode = hbaseTableInfo.getUpdateMode();
6971
return this;
7072
}
7173

7274
@Override
7375
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
7476
HbaseOutputFormat.HbaseOutputFormatBuilder builder = HbaseOutputFormat.buildHbaseOutputFormat();
75-
builder.setHost(this.zookeeperQuorum).setZkParent(this.parent).setTable(this.tableName);
76-
77-
builder.setRowkey(rowkey);
78-
builder.setColumnNames(fieldNames);
79-
builder.setColumnNameFamily(columnNameFamily);
77+
builder.setHost(this.zookeeperQuorum)
78+
.setZkParent(this.parent)
79+
.setTable(this.tableName)
80+
.setRowkey(rowkey)
81+
.setUpdateMode(updateMode)
82+
.setColumnNames(fieldNames)
83+
.setColumnNameFamily(columnNameFamily);
8084

8185
HbaseOutputFormat outputFormat = builder.finish();
8286
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.sink.hbase.table;
2222

2323

24+
import com.dtstack.flink.sql.enums.EUpdateMode;
2425
import com.dtstack.flink.sql.table.AbsTableParser;
2526
import com.dtstack.flink.sql.table.TableInfo;
2627
import com.dtstack.flink.sql.util.DtStringUtil;
@@ -50,6 +51,8 @@ public class HbaseSinkParser extends AbsTableParser {
5051

5152
public static final String TABLE_NAME_KEY = "tableName";
5253

54+
public static final String UPDATE_KEY = "updateMode";
55+
5356
@Override
5457
protected boolean fieldNameNeedsUpperCase() {
5558
return false;
@@ -66,6 +69,8 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
6669
hbaseTableInfo.setParent((String)props.get(ZOOKEEPER_PARENT.toLowerCase()));
6770
String rk = (String) props.get(HBASE_ROWKEY.toLowerCase());
6871
hbaseTableInfo.setRowkey(rk.split(","));
72+
String updateMode = (String) props.getOrDefault(UPDATE_KEY, EUpdateMode.APPEND.name());
73+
hbaseTableInfo.setUpdateMode(updateMode);
6974
return hbaseTableInfo;
7075
}
7176

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseTableInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public class HbaseTableInfo extends TargetTableInfo {
5252

5353
private String tableName;
5454

55+
private String updateMode;
56+
5557
public HbaseTableInfo(){
5658
setType(CURR_TYPE);
5759
}
@@ -128,6 +130,14 @@ public void setColumnTypes(String[] columnTypes) {
128130
this.columnTypes = columnTypes;
129131
}
130132

133+
public String getUpdateMode() {
134+
return updateMode;
135+
}
136+
137+
public void setUpdateMode(String updateMode) {
138+
this.updateMode = updateMode;
139+
}
140+
131141
@Override
132142
public boolean check() {
133143
Preconditions.checkNotNull(host, "hbase field of zookeeperQuorum is required");

0 commit comments

Comments
 (0)