Skip to content

Commit e9c2dc4

Browse files
author
toutian
committed
Merge branch 'v1.9.0_dev_conf' into 'v1.9.0_dev'
统一参数基于flink-conf.yaml,覆盖参数基于confProp 统一参数基于flink-conf.yaml,覆盖参数基于confProp See merge request !171
2 parents 0fae6be + 1500ec2 commit e9c2dc4

File tree

2 files changed

+15
-35
lines changed

2 files changed

+15
-35
lines changed

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.flink.configuration.Configuration;
2727
import com.google.common.base.Strings;
2828
import org.apache.flink.runtime.jobgraph.JobGraph;
29+
import org.apache.flink.runtime.security.SecurityConfiguration;
30+
import org.apache.flink.runtime.security.SecurityUtils;
2931
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
3032
import org.apache.flink.yarn.YarnClusterDescriptor;
3133
import org.apache.hadoop.fs.Path;
@@ -55,45 +57,42 @@ public class PerJobClusterClientBuilder {
5557

5658
private static final Logger LOG = LoggerFactory.getLogger(PerJobClusterClientBuilder.class);
5759

58-
private static String KEYTAB = "security.kerberos.login.keytab";
59-
60-
private static String PRINCIPAL = "security.kerberos.login.principal";
60+
private static final String DEFAULT_CONF_DIR = "./";
6161

6262
private YarnClient yarnClient;
6363

6464
private YarnConfiguration yarnConf;
6565

66-
public void init(String yarnConfDir, Properties conf) throws IOException {
66+
private Configuration flinkConfig;
67+
68+
public void init(String yarnConfDir, Configuration flinkConfig, Properties userConf) throws Exception {
6769

6870
if(Strings.isNullOrEmpty(yarnConfDir)) {
6971
throw new RuntimeException("parameters of yarn is required");
7072
}
7173

72-
yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
74+
userConf.forEach((key, val) -> flinkConfig.setString(key.toString(), val.toString()));
7375

74-
if (isKerberos(conf)){
75-
String keytab = (String) conf.get(KEYTAB);
76-
String principal = (String) conf.get(PRINCIPAL);
77-
login(yarnConf, keytab, principal);
78-
}
76+
this.flinkConfig = flinkConfig;
77+
SecurityUtils.install(new SecurityConfiguration(flinkConfig));
7978

79+
yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
8080
yarnClient = YarnClient.createYarnClient();
8181
yarnClient.init(yarnConf);
8282
yarnClient.start();
8383

8484
System.out.println("----init yarn success ----");
8585
}
8686

87-
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties confProp, String flinkJarPath, Options launcherOptions, JobGraph jobGraph, Configuration flinkConfig)
87+
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJarPath, Options launcherOptions, JobGraph jobGraph)
8888
throws MalformedURLException {
8989

90-
confProp.forEach((key, val) -> flinkConfig.setString(key.toString(), val.toString()));
91-
String flinkConf = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? "" : launcherOptions.getFlinkconf();
90+
String flinkConf = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? DEFAULT_CONF_DIR : launcherOptions.getFlinkconf();
9291
AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(flinkConfig, yarnConf, flinkConf);
9392

9493
if (StringUtils.isNotBlank(flinkJarPath)) {
9594
if (!new File(flinkJarPath).exists()) {
96-
throw new RuntimeException("The Flink jar path is not exist");
95+
throw new RuntimeException("The param '-flinkJarPath' ref dir is not exist");
9796
}
9897
}
9998

@@ -163,22 +162,4 @@ private AbstractYarnClusterDescriptor getClusterDescriptor(
163162
false);
164163
}
165164

166-
private boolean isKerberos(Properties conf){
167-
String keytab = (String) conf.get(KEYTAB);
168-
if (StringUtils.isNotBlank(keytab)){
169-
return true;
170-
} else {
171-
return false;
172-
}
173-
}
174-
175-
private void login(org.apache.hadoop.conf.Configuration conf, String keytab, String principal) throws IOException {
176-
if (StringUtils.isEmpty(principal)){
177-
throw new RuntimeException(PRINCIPAL + " must not be null!");
178-
}
179-
UserGroupInformation.setConfiguration(conf);
180-
UserGroupInformation.loginUserFromKeytab(principal, keytab);
181-
LOG.info("login successfully! keytab: " + keytab + "principal: " + principal);
182-
LOG.info("UGI: " + UserGroupInformation.getCurrentUser());
183-
}
184165
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,10 @@ public static String submit(Options launcherOptions, JobGraph jobGraph, Configur
6262
ClusterSpecification clusterSpecification = FLinkPerJobResourceUtil.createClusterSpecification(confProperties);
6363

6464
PerJobClusterClientBuilder perJobClusterClientBuilder = new PerJobClusterClientBuilder();
65-
perJobClusterClientBuilder.init(launcherOptions.getYarnconf(), confProperties);
65+
perJobClusterClientBuilder.init(launcherOptions.getYarnconf(), flinkConfig, confProperties);
6666

6767
String flinkJarPath = launcherOptions.getFlinkJarPath();
68-
69-
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(confProperties, flinkJarPath, launcherOptions, jobGraph, flinkConfig);
68+
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(flinkJarPath, launcherOptions, jobGraph);
7069
ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph,true);
7170

7271
String applicationId = clusterClient.getClusterId().toString();

0 commit comments

Comments
 (0)