Skip to content

Commit 0d07c94

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_4.0.x_30855' into 1.8_release_4.0.x
2 parents ab80aab + db353c5 commit 0d07c94

File tree

7 files changed

+234
-6
lines changed

7 files changed

+234
-6
lines changed

flinkx-core/src/main/java/com/dtstack/flinkx/options/Options.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,15 @@ public class Options {
7676
@OptionRequired(description = "plugin load mode, by classpath or shipfile")
7777
private String pluginLoadMode = "shipfile";
7878

79+
@OptionRequired(description = "kerberos krb5conf")
80+
private String krb5conf ;
81+
82+
@OptionRequired(description = "kerberos keytabPath")
83+
private String keytab ;
84+
85+
@OptionRequired(description = "kerberos principal")
86+
private String principal ;
87+
7988
public String getS() {
8089
return s;
8190
}
@@ -195,4 +204,51 @@ public String getPluginLoadMode() {
195204
public void setPluginLoadMode(String pluginLoadMode) {
196205
this.pluginLoadMode = pluginLoadMode;
197206
}
207+
208+
public String getKrb5conf() {
209+
return krb5conf;
210+
}
211+
212+
public void setKrb5conf(String krb5conf) {
213+
this.krb5conf = krb5conf;
214+
}
215+
216+
public String getKeytab() {
217+
return keytab;
218+
}
219+
220+
public void setKeytab(String keytab) {
221+
this.keytab = keytab;
222+
}
223+
224+
public String getPrincipal() {
225+
return principal;
226+
}
227+
228+
public void setPrincipal(String principal) {
229+
this.principal = principal;
230+
}
231+
232+
@Override
233+
public String toString() {
234+
return "Options{" +
235+
"mode='" + mode + '\'' +
236+
", job='" + job + '\'' +
237+
", monitor='" + monitor + '\'' +
238+
", jobid='" + jobid + '\'' +
239+
", flinkconf='" + flinkconf + '\'' +
240+
", pluginRoot='" + pluginRoot + '\'' +
241+
", yarnconf='" + yarnconf + '\'' +
242+
", parallelism='" + parallelism + '\'' +
243+
", priority='" + priority + '\'' +
244+
", queue='" + queue + '\'' +
245+
", flinkLibJar='" + flinkLibJar + '\'' +
246+
", confProp='" + confProp + '\'' +
247+
", s='" + s + '\'' +
248+
", pluginLoadMode='" + pluginLoadMode + '\'' +
249+
", krb5conf='" + krb5conf + '\'' +
250+
", keytab='" + keytab + '\'' +
251+
", principal='" + principal + '\'' +
252+
'}';
253+
}
198254
}

flinkx-core/src/main/java/com/dtstack/flinkx/util/FileSystemUtil.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ public static void setHadoopUserName(Configuration conf){
6969
}
7070

7171
try {
72-
String ticketCachePath = conf.get("hadoop.security.kerberos.ticket.cache.path");
73-
UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, hadoopUserName);
72+
String previousUserName = UserGroupInformation.getLoginUser().getUserName();
73+
LOG.info("Hadoop user from '{}' switch to '{}' with SIMPLE auth", previousUserName, hadoopUserName);
74+
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hadoopUserName);
7475
UserGroupInformation.setLoginUser(ugi);
7576
} catch (Exception e) {
7677
LOG.warn("Set hadoop user name error:", e);
@@ -87,7 +88,6 @@ public static boolean isOpenKerberos(Map<String, Object> hadoopConfig){
8788

8889
private static FileSystem getFsWithKerberos(Map<String, Object> hadoopConfig, String defaultFs) throws Exception{
8990
UserGroupInformation ugi = getUGI(hadoopConfig, defaultFs);
90-
UserGroupInformation.setLoginUser(ugi);
9191

9292
return ugi.doAs(new PrivilegedAction<FileSystem>() {
9393
@Override
@@ -110,7 +110,6 @@ public static UserGroupInformation getUGI(Map<String, Object> hadoopConfig, Stri
110110
KerberosUtil.refreshConfig();
111111

112112
UserGroupInformation ugi = KerberosUtil.loginAndReturnUgi(getConfiguration(hadoopConfig, defaultFs), principal, keytabFileName);
113-
UserGroupInformation.setLoginUser(ugi);
114113

115114
return ugi;
116115
}

flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/util/HiveDbUtil.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public final class HiveDbUtil {
6363
public static final String PARAM_DELIMITER = "&";
6464
public static final String KEY_PRINCIPAL = "principal";
6565

66-
public static Pattern HIVE_JDBC_PATTERN = Pattern.compile("(?i)jdbc:hive2://(?<host>[0-9a-zA-Z\\.]+):(?<port>\\d+)/(?<db>[0-9a-z_%]+)(?<param>[\\?;#].*)*");
66+
public static Pattern HIVE_JDBC_PATTERN = Pattern.compile("(?i)jdbc:hive2://(?<host>[^:]+):(?<port>\\d+)/(?<db>[^;]+)(?<param>[\\?;#].*)*");
6767
public static final String HOST_KEY = "host";
6868
public static final String PORT_KEY = "port";
6969
public static final String DB_KEY = "db";
@@ -264,7 +264,8 @@ public static String parseIpAndPort(String url) {
264264
if (matcher.find()) {
265265
addr = matcher.group(HOST_KEY) + ":" + matcher.group(PORT_KEY);
266266
} else {
267-
addr = url.substring(url.indexOf("//") + 2, url.lastIndexOf("/"));
267+
addr = url.substring(url.indexOf("//") + 2);
268+
addr= addr.substring(0,addr.indexOf("/"));
268269
}
269270
return addr;
270271
}

flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ public static ClusterClient createYarnClient(Options launcherOptions) throws Exc
9191
config.setString(ConfigConstants.PATH_HADOOP_CONFIG, yarnConfDir);
9292
FileSystem.initialize(config);
9393

94+
//进行kerberos验证 如果需要的话
95+
KerberosInfo kerberosInfo = new KerberosInfo(launcherOptions.getKrb5conf(), launcherOptions.getKeytab(), launcherOptions.getPrincipal(), config);
96+
kerberosInfo.verify();
97+
9498
YarnConfiguration yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
9599

96100
try (YarnClient yarnClient = YarnClient.createYarnClient()) {
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flinkx.launcher;
19+
20+
import com.dtstack.flinkx.util.ExceptionUtil;
21+
import org.apache.commons.lang3.StringUtils;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.configuration.SecurityOptions;
24+
import org.apache.flink.runtime.util.HadoopUtils;
25+
import org.apache.hadoop.security.SecurityUtil;
26+
import org.apache.hadoop.security.UserGroupInformation;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.io.File;
31+
import java.io.IOException;
32+
33+
/**
34+
* KerberosInfo
35+
*
36+
* @author by [email protected]
37+
* @Date 2020/8/21
38+
*/
39+
public class KerberosInfo {
40+
41+
private static final Logger LOG = LoggerFactory.getLogger(KerberosInfo.class);
42+
43+
44+
private final String krb5confPath;
45+
private final String keytab;
46+
private final String principal;
47+
private final Configuration config;
48+
private final org.apache.hadoop.conf.Configuration hadoopConfiguration;
49+
50+
public KerberosInfo(String krb5confPath, String keytab, String principal, Configuration config) {
51+
this.krb5confPath = krb5confPath;
52+
this.config = config;
53+
this.hadoopConfiguration = HadoopUtils.getHadoopConfiguration(this.config);
54+
55+
//keytab, launcherOptions.getKeytab() 比flinkConfiguration里配置的优先级高
56+
if (StringUtils.isBlank(keytab)) {
57+
this.keytab = this.config.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
58+
} else {
59+
this.keytab = keytab;
60+
}
61+
//principal信息, launcherOptions.getPrincipal() 比flinkConfiguration里配置的优先级高
62+
if (StringUtils.isBlank(principal)) {
63+
this.principal = this.config.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
64+
} else {
65+
this.principal = principal;
66+
}
67+
}
68+
69+
public void verify() {
70+
if (!isVerify()) {
71+
return;
72+
}
73+
74+
check();
75+
76+
//如果指定了Krb5conf位置
77+
if (StringUtils.isNotBlank(this.getKrb5confPath())) {
78+
System.setProperty("java.security.krb5.conf", this.getKrb5confPath());
79+
}
80+
81+
String keyTabpath;
82+
try {
83+
keyTabpath = (new File(keytab)).getAbsolutePath();
84+
} catch (Exception e) {
85+
String message = String.format("can not get the file 【%s】,error info-> %s ",
86+
keytab,
87+
ExceptionUtil.getErrorMessage(e));
88+
LOG.error("{}", message);
89+
throw new RuntimeException(message, e);
90+
}
91+
92+
93+
LOG.info("kerberos info:Krb5confPath ->{}, Principal ->{}, keytab->{}", System.getProperty("java.security.krb5.conf"), principal, keyTabpath);
94+
95+
//开始kerberos验证
96+
UserGroupInformation.setConfiguration(hadoopConfiguration);
97+
try {
98+
UserGroupInformation.getCurrentUser().setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS);
99+
} catch (IOException e) {
100+
String message = "UserGroupInformation getCurrentUser has error," + ExceptionUtil.getErrorMessage(e);
101+
LOG.error("{}", message);
102+
throw new RuntimeException(message, e);
103+
}
104+
105+
106+
try {
107+
UserGroupInformation.loginUserFromKeytab(principal, keyTabpath);
108+
} catch (IOException e) {
109+
String message = String.format("Unable to set the Hadoop login principal【%s】,keytab 【%s】error info-> %s ",
110+
principal,
111+
keyTabpath,
112+
ExceptionUtil.getErrorMessage(e));
113+
LOG.error("{}", message);
114+
throw new RuntimeException(message, e);
115+
}
116+
}
117+
118+
//是否需要kerberos验证
119+
public boolean isVerify() {
120+
UserGroupInformation.AuthenticationMethod authenticationMethod = SecurityUtil.getAuthenticationMethod(hadoopConfiguration);
121+
return UserGroupInformation.AuthenticationMethod.SIMPLE != authenticationMethod;
122+
}
123+
124+
protected void check() {
125+
if (StringUtils.isBlank(getKeytab())) {
126+
throw new RuntimeException("keytabPath can not be null");
127+
}
128+
129+
if (StringUtils.isBlank(getPrincipal())) {
130+
throw new RuntimeException("principal can not be null");
131+
}
132+
}
133+
134+
135+
public String getKrb5confPath() {
136+
return krb5confPath;
137+
}
138+
139+
public String getKeytab() {
140+
return keytab;
141+
}
142+
143+
144+
public String getPrincipal() {
145+
return principal;
146+
}
147+
148+
149+
}

flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package com.dtstack.flinkx.launcher.perjob;
1919

20+
import com.dtstack.flinkx.launcher.KerberosInfo;
2021
import com.dtstack.flinkx.launcher.YarnConfLoader;
2122
import com.dtstack.flinkx.options.Options;
2223
import com.google.common.base.Strings;
@@ -58,6 +59,9 @@ public class PerJobClusterClientBuilder {
5859

5960
private Configuration flinkConfig;
6061

62+
//kerberos验证信息
63+
private KerberosInfo kerberosInfo;
64+
6165
/**
6266
* init yarnClient
6367
* @param yarnConfDir the path of yarnconf
@@ -69,6 +73,10 @@ public void init(String yarnConfDir, Configuration flinkConfig, Properties userC
6973
}
7074
userConf.forEach((key, val) -> flinkConfig.setString(key.toString(), val.toString()));
7175
this.flinkConfig = flinkConfig;
76+
if(userConf.size()>0){
77+
this.kerberosInfo = new KerberosInfo(kerberosInfo.getKrb5confPath(),kerberosInfo.getKeytab(),kerberosInfo.getPrincipal(),this.flinkConfig);
78+
}
79+
kerberosInfo.verify();
7280
SecurityUtils.install(new SecurityConfiguration(flinkConfig));
7381

7482
yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
@@ -135,4 +143,13 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties co
135143
descriptor.addShipFiles(shipFiles);
136144
return descriptor;
137145
}
146+
147+
148+
public KerberosInfo getKerberosInfo() {
149+
return kerberosInfo;
150+
}
151+
152+
public void setKerberosInfo(KerberosInfo kerberosInfo) {
153+
this.kerberosInfo = kerberosInfo;
154+
}
138155
}

flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/perjob/PerJobSubmitter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flinkx.launcher.perjob;
2020

21+
import com.dtstack.flinkx.launcher.KerberosInfo;
2122
import com.dtstack.flinkx.options.Options;
2223
import com.dtstack.flinkx.util.MapUtil;
2324
import org.apache.commons.lang.StringUtils;
@@ -54,6 +55,7 @@ public static String submit(Options options, JobGraph jobGraph) throws Exception
5455
ClusterSpecification clusterSpecification = FlinkPerJobResourceUtil.createClusterSpecification(conProp);
5556
PerJobClusterClientBuilder perJobClusterClientBuilder = new PerJobClusterClientBuilder();
5657
Configuration config = StringUtils.isEmpty(options.getFlinkconf()) ? new Configuration() : GlobalConfiguration.loadConfiguration(options.getFlinkconf());
58+
perJobClusterClientBuilder.setKerberosInfo(new KerberosInfo(options.getKrb5conf(),options.getKeytab(),options.getPrincipal(),config));
5759
perJobClusterClientBuilder.init(options.getYarnconf(), config, conProp);
5860

5961
AbstractYarnClusterDescriptor descriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(conProp, options, jobGraph);

0 commit comments

Comments
 (0)