Skip to content

Commit 609bd41

Browse files
author
yanxi0227
committed
[kerberos][在启动yarnclient之前添加认证信息]
1 parent 2d86ee2 commit 609bd41

File tree

2 files changed

+29
-2
lines changed

2 files changed

+29
-2
lines changed

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,15 @@
2929
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
3030
import org.apache.flink.yarn.YarnClusterDescriptor;
3131
import org.apache.hadoop.fs.Path;
32+
import org.apache.hadoop.security.UserGroupInformation;
3233
import org.apache.hadoop.yarn.client.api.YarnClient;
3334
import org.apache.hadoop.yarn.conf.YarnConfiguration;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
import sun.rmi.runtime.Log;
3438

3539
import java.io.File;
40+
import java.io.IOException;
3641
import java.net.MalformedURLException;
3742
import java.net.URL;
3843
import java.util.ArrayList;
@@ -48,16 +53,38 @@
4853
*/
4954

5055
public class PerJobClusterClientBuilder {
56+
57+
private static final Logger LOG = LoggerFactory.getLogger(PerJobClusterClientBuilder.class);
58+
59+
private static String SECURITY = "hadoop.security.authorization";
60+
61+
private static String KEYTAB = "security.kerberos.login.keytab";
62+
63+
private static String PRINCIPAL = "security.kerberos.login.principal";
64+
5165
private YarnClient yarnClient;
5266

5367
private YarnConfiguration yarnConf;
5468

55-
public void init(String yarnConfDir){
69+
public void init(String yarnConfDir, Properties conf) throws IOException {
70+
5671
if(Strings.isNullOrEmpty(yarnConfDir)) {
5772
throw new RuntimeException("parameters of yarn is required");
5873
}
5974

6075
yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
76+
77+
Boolean security = yarnConf.getBoolean(SECURITY, false);
78+
79+
String keytab = (String) conf.get(KEYTAB);
80+
String principal = (String) conf.get(PRINCIPAL);
81+
if (security && !Strings.isNullOrEmpty(keytab)){
82+
UserGroupInformation.setConfiguration(yarnConf);
83+
UserGroupInformation.loginUserFromKeytab(keytab, principal);
84+
LOG.info("login successfully! keytab: " + keytab + "principal: " + principal);
85+
LOG.info("UGI: " + UserGroupInformation.getCurrentUser());
86+
}
87+
6188
yarnClient = YarnClient.createYarnClient();
6289
yarnClient.init(yarnConf);
6390
yarnClient.start();

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public static String submit(Options launcherOptions, JobGraph jobGraph) throws E
6161
ClusterSpecification clusterSpecification = FLinkPerJobResourceUtil.createClusterSpecification(confProperties);
6262

6363
PerJobClusterClientBuilder perJobClusterClientBuilder = new PerJobClusterClientBuilder();
64-
perJobClusterClientBuilder.init(launcherOptions.getYarnconf());
64+
perJobClusterClientBuilder.init(launcherOptions.getYarnconf(), confProperties);
6565

6666
String flinkJarPath = launcherOptions.getFlinkJarPath();
6767

0 commit comments

Comments
 (0)