Skip to content

Commit 9d80379

Browse files
author
dapeng
committed
fix 补充参数
1 parent 37e7e62 commit 9d80379

File tree

2 files changed

+135
-0
lines changed

2 files changed

+135
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.hbase;
20+
21+
import org.apache.commons.collections.MapUtils;
22+
import org.apache.commons.lang3.StringUtils;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.hbase.HBaseConfiguration;
25+
import org.apache.hadoop.security.UserGroupInformation;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.io.File;
30+
import java.io.IOException;
31+
import java.util.Arrays;
32+
import java.util.List;
33+
import java.util.Map;
34+
35+
/**
36+
*
37+
* The utility class of HBase connection
38+
*
39+
* Date: 2019/12/24
40+
* Company: www.dtstack.com
41+
* @author maqi
42+
*/
43+
public class HbaseConfigUtils {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(HbaseConfigUtils.class);
46+
// sync side kerberos
47+
public final static String KEY_HBASE_SECURITY_AUTHENTICATION = "hbase.security.authentication";
48+
public final static String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization";
49+
public final static String KEY_HBASE_MASTER_KEYTAB_FILE = "hbase.master.keytab.file";
50+
public final static String KEY_HBASE_MASTER_KERBEROS_PRINCIPAL = "hbase.master.kerberos.principal";
51+
public final static String KEY_HBASE_REGIONSERVER_KEYTAB_FILE = "hbase.regionserver.keytab.file";
52+
public final static String KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL = "hbase.regionserver.kerberos.principal";
53+
54+
public final static String KEY_HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
55+
public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM = "hbase.zookeeper.znode.parent";
56+
57+
58+
public static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
59+
public static final String KEY_ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
60+
61+
public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException {
62+
if (conf == null) {
63+
throw new IllegalArgumentException("kerberos conf can not be null");
64+
}
65+
66+
if (org.apache.commons.lang.StringUtils.isEmpty(principal)) {
67+
throw new IllegalArgumentException("principal can not be null");
68+
}
69+
70+
if (org.apache.commons.lang.StringUtils.isEmpty(keytab)) {
71+
throw new IllegalArgumentException("keytab can not be null");
72+
}
73+
74+
conf.set("hadoop.security.authentication", "Kerberos");
75+
UserGroupInformation.setConfiguration(conf);
76+
77+
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
78+
}
79+
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,13 @@
3737
import org.apache.hadoop.hbase.client.Put;
3838
import org.apache.hadoop.hbase.client.Table;
3939
import org.apache.hadoop.hbase.util.Bytes;
40+
import org.apache.hadoop.security.UserGroupInformation;
4041
import org.slf4j.Logger;
4142
import org.slf4j.LoggerFactory;
4243

44+
import java.io.File;
4345
import java.io.IOException;
46+
import java.security.PrivilegedAction;
4447
import java.util.List;
4548
import java.util.Map;
4649
import java.util.Set;
@@ -91,6 +94,29 @@ public void open(int taskNumber, int numTasks) throws IOException {
9194
LOG.warn("---open---");
9295
conn = ConnectionFactory.createConnection(conf);
9396
table = conn.getTable(TableName.valueOf(tableName));
97+
if (kerberosAuthEnable) {
98+
conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, host);
99+
conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent);
100+
fillSyncKerberosConfig(conf, regionserverKeytabFile, regionserverPrincipal, zookeeperSaslClient, securityKrb5Conf);
101+
102+
UserGroupInformation userGroupInformation = HbaseConfigUtils.loginAndReturnUGI(conf, regionserverPrincipal, regionserverKeytabFile);
103+
org.apache.hadoop.conf.Configuration finalConf = conf;
104+
conn = userGroupInformation.doAs(new PrivilegedAction<Connection>() {
105+
@Override
106+
public Connection run() {
107+
try {
108+
return ConnectionFactory.createConnection(finalConf);
109+
} catch (IOException e) {
110+
LOG.error("Get connection fail with config:{}", finalConf);
111+
throw new RuntimeException(e);
112+
}
113+
}
114+
});
115+
} else {
116+
conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, host);
117+
conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent);
118+
conn = ConnectionFactory.createConnection(conf);
119+
}
94120
LOG.warn("---open end(get table from hbase) ---");
95121
initMetric();
96122
}
@@ -311,5 +337,35 @@ public HbaseOutputFormat finish() {
311337

312338
}
313339

340+
private void fillSyncKerberosConfig( org.apache.hadoop.conf.Configuration config, String regionserverKeytabFile, String regionserverPrincipal,
341+
String zookeeperSaslClient, String securityKrb5Conf) throws IOException {
342+
if (StringUtils.isEmpty(regionserverKeytabFile)) {
343+
throw new IllegalArgumentException("Must provide regionserverKeytabFile when authentication is Kerberos");
344+
}
345+
String regionserverKeytabFilePath = System.getProperty("user.dir") + File.separator + regionserverKeytabFile;
346+
LOG.info("regionserverKeytabFilePath:{}",regionserverKeytabFilePath);
347+
config.set(HbaseConfigUtils.KEY_HBASE_MASTER_KEYTAB_FILE, regionserverKeytabFilePath);
348+
config.set(HbaseConfigUtils.KEY_HBASE_REGIONSERVER_KEYTAB_FILE, regionserverKeytabFilePath);
349+
350+
if (StringUtils.isEmpty(regionserverPrincipal)) {
351+
throw new IllegalArgumentException("Must provide regionserverPrincipal when authentication is Kerberos");
352+
}
353+
config.set(HbaseConfigUtils.KEY_HBASE_MASTER_KERBEROS_PRINCIPAL, regionserverPrincipal);
354+
config.set(HbaseConfigUtils.KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL, regionserverPrincipal);
355+
config.set(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHORIZATION, "true");
356+
config.set(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHENTICATION, "kerberos");
357+
358+
359+
if (!StringUtils.isEmpty(zookeeperSaslClient)) {
360+
System.setProperty(HbaseConfigUtils.KEY_ZOOKEEPER_SASL_CLIENT, zookeeperSaslClient);
361+
}
362+
363+
if (!StringUtils.isEmpty(securityKrb5Conf)) {
364+
String krb5ConfPath = System.getProperty("user.dir") + File.separator + securityKrb5Conf;
365+
LOG.info("krb5ConfPath:{}", krb5ConfPath);
366+
System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, krb5ConfPath);
367+
}
368+
}
369+
314370

315371
}

0 commit comments

Comments
 (0)