Skip to content

Commit ae06a43

Browse files
committed
Merge branch 'v1.9.0_dev_kerberos' into 'v1.9.0_dev'
[kerberos][在启动yarnclient之前添加认证信息] See merge request !166
2 parents 03f1f67 + 4d3552f commit ae06a43

File tree

5 files changed

+80
-5
lines changed

5 files changed

+80
-5
lines changed

core/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@
6767
</dependency>
6868

6969
<dependency>
70-
<groupId>org.apache.flink</groupId>
71-
<artifactId>flink-yarn_2.11</artifactId>
72-
<version>${flink.version}</version>
70+
<groupId>commons-codec</groupId>
71+
<artifactId>commons-codec</artifactId>
72+
<version>1.10</version>
7373
</dependency>
7474

7575
<dependency>

launcher/pom.xml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,42 @@
2222
<groupId>org.apache.flink</groupId>
2323
<artifactId>flink-yarn_2.11</artifactId>
2424
<version>${flink.version}</version>
25+
<exclusions>
26+
<exclusion>
27+
<groupId>org.apache.flink</groupId>
28+
<artifactId>flink-shaded-hadoop-2</artifactId>
29+
</exclusion>
30+
</exclusions>
31+
</dependency>
32+
33+
<dependency>
34+
<groupId>org.apache.hadoop</groupId>
35+
<artifactId>hadoop-common</artifactId>
36+
<version>${hadoop.version}</version>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>org.apache.hadoop</groupId>
41+
<artifactId>hadoop-hdfs</artifactId>
42+
<version>${hadoop.version}</version>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>org.apache.hadoop</groupId>
47+
<artifactId>hadoop-yarn-common</artifactId>
48+
<version>${hadoop.version}</version>
49+
</dependency>
50+
51+
<dependency>
52+
<groupId>org.apache.hadoop</groupId>
53+
<artifactId>hadoop-yarn-client</artifactId>
54+
<version>${hadoop.version}</version>
55+
</dependency>
56+
57+
<dependency>
58+
<groupId>org.apache.hadoop</groupId>
59+
<artifactId>hadoop-mapreduce-client-core</artifactId>
60+
<version>${hadoop.version}</version>
2561
</dependency>
2662

2763
<dependency>

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,14 @@
2929
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
3030
import org.apache.flink.yarn.YarnClusterDescriptor;
3131
import org.apache.hadoop.fs.Path;
32+
import org.apache.hadoop.security.UserGroupInformation;
3233
import org.apache.hadoop.yarn.client.api.YarnClient;
3334
import org.apache.hadoop.yarn.conf.YarnConfiguration;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
3437

3538
import java.io.File;
39+
import java.io.IOException;
3640
import java.net.MalformedURLException;
3741
import java.net.URL;
3842
import java.util.ArrayList;
@@ -48,16 +52,31 @@
4852
*/
4953

5054
public class PerJobClusterClientBuilder {
55+
56+
private static final Logger LOG = LoggerFactory.getLogger(PerJobClusterClientBuilder.class);
57+
58+
private static String KEYTAB = "security.kerberos.login.keytab";
59+
60+
private static String PRINCIPAL = "security.kerberos.login.principal";
61+
5162
private YarnClient yarnClient;
5263

5364
private YarnConfiguration yarnConf;
5465

55-
public void init(String yarnConfDir){
66+
public void init(String yarnConfDir, Properties conf) throws IOException {
67+
5668
if(Strings.isNullOrEmpty(yarnConfDir)) {
5769
throw new RuntimeException("parameters of yarn is required");
5870
}
5971

6072
yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
73+
74+
if (isKerberos(conf)){
75+
String keytab = (String) conf.get(KEYTAB);
76+
String principal = (String) conf.get(PRINCIPAL);
77+
login(yarnConf, keytab, principal);
78+
}
79+
6180
yarnClient = YarnClient.createYarnClient();
6281
yarnClient.init(yarnConf);
6382
yarnClient.start();
@@ -141,4 +160,23 @@ private AbstractYarnClusterDescriptor getClusterDescriptor(
141160
yarnClient,
142161
false);
143162
}
163+
164+
private boolean isKerberos(Properties conf){
165+
String keytab = (String) conf.get(KEYTAB);
166+
if (StringUtils.isNotBlank(keytab)){
167+
return true;
168+
} else {
169+
return false;
170+
}
171+
}
172+
173+
private void login(org.apache.hadoop.conf.Configuration conf, String keytab, String principal) throws IOException {
174+
if (StringUtils.isEmpty(principal)){
175+
throw new RuntimeException(PRINCIPAL + " must not be null!");
176+
}
177+
UserGroupInformation.setConfiguration(conf);
178+
UserGroupInformation.loginUserFromKeytab(principal, keytab);
179+
LOG.info("login successfully! keytab: " + keytab + "principal: " + principal);
180+
LOG.info("UGI: " + UserGroupInformation.getCurrentUser());
181+
}
144182
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public static String submit(Options launcherOptions, JobGraph jobGraph) throws E
6161
ClusterSpecification clusterSpecification = FLinkPerJobResourceUtil.createClusterSpecification(confProperties);
6262

6363
PerJobClusterClientBuilder perJobClusterClientBuilder = new PerJobClusterClientBuilder();
64-
perJobClusterClientBuilder.init(launcherOptions.getYarnconf());
64+
perJobClusterClientBuilder.init(launcherOptions.getYarnconf(), confProperties);
6565

6666
String flinkJarPath = launcherOptions.getFlinkJarPath();
6767

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
<properties>
3737
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3838
<flink.version>1.9.1</flink.version>
39+
<hadoop.version>2.7.3</hadoop.version>
3940
</properties>
4041

4142
<build>

0 commit comments

Comments
 (0)