|
26 | 26 | import org.apache.flink.configuration.Configuration; |
27 | 27 | import com.google.common.base.Strings; |
28 | 28 | import org.apache.flink.runtime.jobgraph.JobGraph; |
| 29 | +import org.apache.flink.runtime.security.SecurityConfiguration; |
| 30 | +import org.apache.flink.runtime.security.SecurityUtils; |
29 | 31 | import org.apache.flink.yarn.AbstractYarnClusterDescriptor; |
30 | 32 | import org.apache.flink.yarn.YarnClusterDescriptor; |
31 | 33 | import org.apache.hadoop.fs.Path; |
@@ -55,45 +57,42 @@ public class PerJobClusterClientBuilder { |
55 | 57 |
|
56 | 58 | private static final Logger LOG = LoggerFactory.getLogger(PerJobClusterClientBuilder.class); |
57 | 59 |
|
58 | | - private static String KEYTAB = "security.kerberos.login.keytab"; |
59 | | - |
60 | | - private static String PRINCIPAL = "security.kerberos.login.principal"; |
| 60 | + private static final String DEFAULT_CONF_DIR = "./"; |
61 | 61 |
|
62 | 62 | private YarnClient yarnClient; |
63 | 63 |
|
64 | 64 | private YarnConfiguration yarnConf; |
65 | 65 |
|
66 | | - public void init(String yarnConfDir, Properties conf) throws IOException { |
| 66 | + private Configuration flinkConfig; |
| 67 | + |
| 68 | + public void init(String yarnConfDir, Configuration flinkConfig, Properties userConf) throws Exception { |
67 | 69 |
|
68 | 70 | if(Strings.isNullOrEmpty(yarnConfDir)) { |
69 | 71 | throw new RuntimeException("parameters of yarn is required"); |
70 | 72 | } |
71 | 73 |
|
72 | | - yarnConf = YarnConfLoader.getYarnConf(yarnConfDir); |
| 74 | + userConf.forEach((key, val) -> flinkConfig.setString(key.toString(), val.toString())); |
73 | 75 |
|
74 | | - if (isKerberos(conf)){ |
75 | | - String keytab = (String) conf.get(KEYTAB); |
76 | | - String principal = (String) conf.get(PRINCIPAL); |
77 | | - login(yarnConf, keytab, principal); |
78 | | - } |
| 76 | + this.flinkConfig = flinkConfig; |
| 77 | + SecurityUtils.install(new SecurityConfiguration(flinkConfig)); |
79 | 78 |
|
| 79 | + yarnConf = YarnConfLoader.getYarnConf(yarnConfDir); |
80 | 80 | yarnClient = YarnClient.createYarnClient(); |
81 | 81 | yarnClient.init(yarnConf); |
82 | 82 | yarnClient.start(); |
83 | 83 |
|
84 | 84 | System.out.println("----init yarn success ----"); |
85 | 85 | } |
86 | 86 |
|
87 | | - public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties confProp, String flinkJarPath, Options launcherOptions, JobGraph jobGraph, Configuration flinkConfig) |
| 87 | + public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJarPath, Options launcherOptions, JobGraph jobGraph) |
88 | 88 | throws MalformedURLException { |
89 | 89 |
|
90 | | - confProp.forEach((key, val) -> flinkConfig.setString(key.toString(), val.toString())); |
91 | | - String flinkConf = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? "" : launcherOptions.getFlinkconf(); |
| 90 | + String flinkConf = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? DEFAULT_CONF_DIR : launcherOptions.getFlinkconf(); |
92 | 91 | AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(flinkConfig, yarnConf, flinkConf); |
93 | 92 |
|
94 | 93 | if (StringUtils.isNotBlank(flinkJarPath)) { |
95 | 94 | if (!new File(flinkJarPath).exists()) { |
96 | | - throw new RuntimeException("The Flink jar path is not exist"); |
| 95 | + throw new RuntimeException("The param '-flinkJarPath' ref dir is not exist"); |
97 | 96 | } |
98 | 97 | } |
99 | 98 |
|
@@ -163,22 +162,4 @@ private AbstractYarnClusterDescriptor getClusterDescriptor( |
163 | 162 | false); |
164 | 163 | } |
165 | 164 |
|
166 | | - private boolean isKerberos(Properties conf){ |
167 | | - String keytab = (String) conf.get(KEYTAB); |
168 | | - if (StringUtils.isNotBlank(keytab)){ |
169 | | - return true; |
170 | | - } else { |
171 | | - return false; |
172 | | - } |
173 | | - } |
174 | | - |
175 | | - private void login(org.apache.hadoop.conf.Configuration conf, String keytab, String principal) throws IOException { |
176 | | - if (StringUtils.isEmpty(principal)){ |
177 | | - throw new RuntimeException(PRINCIPAL + " must not be null!"); |
178 | | - } |
179 | | - UserGroupInformation.setConfiguration(conf); |
180 | | - UserGroupInformation.loginUserFromKeytab(principal, keytab); |
181 | | - LOG.info("login successfully! keytab: " + keytab + "principal: " + principal); |
182 | | - LOG.info("UGI: " + UserGroupInformation.getCurrentUser()); |
183 | | - } |
184 | 165 | } |
0 commit comments