Skip to content

Commit 907aacb

Browse files
author
dapeng
committed
hbase kerberos sink
1 parent 6bb8949 commit 907aacb

File tree

2 files changed

+63
-9
lines changed

2 files changed

+63
-9
lines changed

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
6262
private String[] columnTypes;
6363
private Map<String, String> columnNameFamily;
6464

65+
private boolean kerberosAuthEnable;
66+
private String regionserverKeytabFile;
67+
private String regionserverPrincipal;
68+
private String securityKrb5Conf;
69+
private String zookeeperSaslClient;
70+
6571
private String[] families;
6672
private String[] qualifiers;
6773

@@ -252,6 +258,31 @@ public HbaseOutputFormatBuilder setColumnNameFamily(Map<String, String> columnNa
252258
format.columnNameFamily = columnNameFamily;
253259
return this;
254260
}
261+
public HbaseOutputFormatBuilder setKerberosAuthEnable(boolean kerberosAuthEnable) {
262+
format.kerberosAuthEnable = kerberosAuthEnable;
263+
return this;
264+
}
265+
266+
public HbaseOutputFormatBuilder setRegionserverKeytabFile(String regionserverKeytabFile) {
267+
format.regionserverKeytabFile = regionserverKeytabFile;
268+
return this;
269+
}
270+
271+
public HbaseOutputFormatBuilder setRegionserverPrincipal(String regionserverPrincipal) {
272+
format.regionserverPrincipal = regionserverPrincipal;
273+
return this;
274+
}
275+
276+
public HbaseOutputFormatBuilder setSecurityKrb5Conf(String securityKrb5Conf) {
277+
format.securityKrb5Conf = securityKrb5Conf;
278+
return this;
279+
}
280+
281+
public HbaseOutputFormatBuilder setZookeeperSaslClient(String zookeeperSaslClient) {
282+
format.zookeeperSaslClient = zookeeperSaslClient;
283+
return this;
284+
}
285+
255286

256287
public HbaseOutputFormat finish() {
257288
Preconditions.checkNotNull(format.host, "zookeeperQuorum should be specified");
@@ -267,7 +298,7 @@ public HbaseOutputFormat finish() {
267298
String[] columns = keySet.toArray(new String[keySet.size()]);
268299
for (int i = 0; i < columns.length; ++i) {
269300
String col = columns[i];
270-
String[] part = StringUtils.split(col, ":");
301+
String[] part = col.split(":");
271302
families[i] = part[0];
272303
qualifiers[i] = part[1];
273304
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,15 @@ public class HbaseSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
5252
protected String tableName;
5353
protected String updateMode;
5454
protected String rowkey;
55+
protected String registerTabName;
56+
57+
protected boolean kerberosAuthEnable;
58+
protected String regionserverKeytabFile;
59+
protected String regionserverPrincipal;
60+
protected String securityKrb5Conf;
61+
protected String zookeeperSaslClient;
62+
private int parallelism = -1;
63+
5564

5665
public HbaseSink() {
5766
// TO DO NOTHING
@@ -66,20 +75,34 @@ public HbaseSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
6675
this.tableName = hbaseTableInfo.getTableName();
6776
this.rowkey = hbaseTableInfo.getRowkey();
6877
this.columnNameFamily = hbaseTableInfo.getColumnNameFamily();
69-
this.updateMode = hbaseTableInfo.getUpdateMode();
78+
this.registerTabName = hbaseTableInfo.getName();
79+
80+
this.kerberosAuthEnable = hbaseTableInfo.isKerberosAuthEnable();
81+
this.regionserverKeytabFile = hbaseTableInfo.getRegionserverKeytabFile();
82+
this.regionserverPrincipal = hbaseTableInfo.getRegionserverPrincipal();
83+
this.securityKrb5Conf = hbaseTableInfo.getSecurityKrb5Conf();
84+
this.zookeeperSaslClient = hbaseTableInfo.getZookeeperSaslClient();
85+
86+
Integer tmpSinkParallelism = hbaseTableInfo.getParallelism();
87+
if (tmpSinkParallelism != null) {
88+
this.parallelism = tmpSinkParallelism;
89+
}
7090
return this;
7191
}
7292

7393
@Override
7494
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
7595
HbaseOutputFormat.HbaseOutputFormatBuilder builder = HbaseOutputFormat.buildHbaseOutputFormat();
76-
builder.setHost(this.zookeeperQuorum)
77-
.setZkParent(this.parent)
78-
.setTable(this.tableName)
79-
.setRowkey(rowkey)
80-
.setUpdateMode(updateMode)
81-
.setColumnNames(fieldNames)
82-
.setColumnNameFamily(columnNameFamily);
96+
builder.setHost(this.zookeeperQuorum).setZkParent(this.parent).setTable(this.tableName);
97+
98+
builder.setRowkey(rowkey);
99+
builder.setColumnNames(fieldNames);
100+
builder.setColumnNameFamily(columnNameFamily);
101+
builder.setKerberosAuthEnable(kerberosAuthEnable);
102+
builder.setRegionserverKeytabFile(regionserverKeytabFile);
103+
builder.setRegionserverPrincipal(regionserverPrincipal);
104+
builder.setSecurityKrb5Conf(securityKrb5Conf);
105+
builder.setZookeeperSaslClient(zookeeperSaslClient);
83106

84107
HbaseOutputFormat outputFormat = builder.finish();
85108
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);

0 commit comments

Comments
 (0)