Skip to content

Commit 9c62b3f

Browse files
committed
opt launcher code
1 parent 2d86ee2 commit 9c62b3f

File tree

7 files changed

+53
-753
lines changed

7 files changed

+53
-753
lines changed

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 44 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,22 @@
2020

2121
import com.dtstack.flink.sql.option.Options;
2222
import com.dtstack.flink.sql.util.PluginUtil;
23-
import com.dtstack.flink.sql.yarn.JobParameter;
24-
import com.dtstack.flink.sql.yarn.YarnClusterConfiguration;
2523
import org.apache.commons.io.Charsets;
2624
import org.apache.commons.lang.StringUtils;
2725
import org.apache.flink.client.program.ClusterClient;
2826
import org.apache.flink.client.program.MiniClusterClient;
2927
import org.apache.flink.configuration.ConfigConstants;
3028
import org.apache.flink.configuration.Configuration;
3129
import org.apache.flink.configuration.GlobalConfiguration;
30+
import org.apache.flink.configuration.HighAvailabilityOptions;
3231
import org.apache.flink.configuration.JobManagerOptions;
3332
import org.apache.flink.core.fs.FileSystem;
3433
import org.apache.flink.runtime.akka.AkkaUtils;
34+
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
3535
import org.apache.flink.runtime.minicluster.MiniCluster;
3636
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
3737
import org.apache.flink.runtime.util.LeaderConnectionInfo;
38+
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
3839
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
3940
import org.apache.flink.yarn.YarnClusterDescriptor;
4041
import org.apache.hadoop.fs.Path;
@@ -44,6 +45,7 @@
4445
import org.apache.hadoop.yarn.client.api.YarnClient;
4546
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
4647
import org.apache.hadoop.yarn.conf.YarnConfiguration;
48+
4749
import java.io.File;
4850
import java.net.InetSocketAddress;
4951
import java.net.URLDecoder;
@@ -56,6 +58,7 @@
5658
import java.io.IOException;
5759
import java.util.stream.Collectors;
5860
import java.util.stream.Stream;
61+
5962
import static java.util.Objects.requireNonNull;
6063

6164
/**
@@ -65,10 +68,10 @@ public class ClusterClientFactory {
6568

6669
public static ClusterClient createClusterClient(Options launcherOptions) throws Exception {
6770
String mode = launcherOptions.getMode();
68-
if(mode.equals(ClusterMode.standalone.name())) {
71+
if (mode.equals(ClusterMode.standalone.name())) {
6972
return createStandaloneClient(launcherOptions);
70-
} else if(mode.equals(ClusterMode.yarn.name())) {
71-
return createYarnClient(launcherOptions,mode);
73+
} else if (mode.equals(ClusterMode.yarn.name())) {
74+
return createYarnClient(launcherOptions, mode);
7275
}
7376
throw new IllegalArgumentException("Unsupported cluster client type: ");
7477
}
@@ -90,120 +93,48 @@ public static ClusterClient createStandaloneClient(Options launcherOptions) thro
9093

9194
public static ClusterClient createYarnClient(Options launcherOptions, String mode) {
9295
String flinkConfDir = launcherOptions.getFlinkconf();
93-
Configuration flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir);
96+
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
9497
String yarnConfDir = launcherOptions.getYarnconf();
95-
YarnConfiguration yarnConf;
96-
if(StringUtils.isNotBlank(yarnConfDir)) {
98+
if (StringUtils.isNotBlank(yarnConfDir)) {
9799
try {
98-
flinkConf.setString(ConfigConstants.PATH_HADOOP_CONFIG, yarnConfDir);
99-
FileSystem.initialize(flinkConf);
100-
101-
File dir = new File(yarnConfDir);
102-
if(dir.exists() && dir.isDirectory()) {
103-
yarnConf = loadYarnConfiguration(yarnConfDir);
104-
105-
YarnClient yarnClient = YarnClient.createYarnClient();
106-
haYarnConf(yarnConf);
107-
yarnClient.init(yarnConf);
108-
yarnClient.start();
109-
110-
String confProp = launcherOptions.getConfProp();
111-
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
112-
System.out.println("confProp="+confProp);
113-
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
114-
115-
ApplicationId applicationId = null;
116-
ClusterClient clusterClient = null;
117-
if(mode.equals(ClusterMode.yarn.name())) {//on yarn cluster mode
118-
String yarnSessionConf = launcherOptions.getYarnSessionConf();
119-
yarnSessionConf = URLDecoder.decode(yarnSessionConf, Charsets.UTF_8.toString());
120-
Properties yarnSessionConfProperties = PluginUtil.jsonStrToObject(yarnSessionConf, Properties.class);
121-
String yid = yarnSessionConfProperties.get("yid").toString();
122-
if(StringUtils.isNotBlank(yid)){
123-
applicationId = toApplicationId(yid);
124-
}else{
125-
applicationId = getYarnClusterApplicationId(yarnClient);
126-
}
127-
System.out.println("applicationId="+applicationId.toString());
128-
129-
AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
130-
flinkConf, yarnConf, ".", yarnClient, false);
131-
clusterClient = clusterDescriptor.retrieve(applicationId);
132-
133-
System.out.println("applicationId="+applicationId.toString()+" has retrieve!");
134-
} else {//on yarn per-job mode
135-
applicationId = createApplication(yarnClient);
136-
System.out.println("applicationId="+applicationId.toString());
137-
138-
YarnClusterConfiguration clusterConf = getYarnClusterConfiguration(flinkConf,yarnConf,flinkConfDir);
139-
//jobmanager+taskmanager param
140-
JobParameter appConf = new JobParameter(confProperties);
141-
142-
com.dtstack.flink.sql.yarn.YarnClusterDescriptor clusterDescriptor = new com.dtstack.flink.sql.yarn.YarnClusterDescriptor(
143-
clusterConf, yarnClient, appConf,applicationId, launcherOptions.getName(),null );
144-
clusterClient = clusterDescriptor.deploy();
100+
config.setString(ConfigConstants.PATH_HADOOP_CONFIG, yarnConfDir);
101+
FileSystem.initialize(config);
102+
103+
YarnConfiguration yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
104+
YarnClient yarnClient = YarnClient.createYarnClient();
105+
yarnClient.init(yarnConf);
106+
yarnClient.start();
107+
ApplicationId applicationId = null;
108+
109+
String yarnSessionConf = launcherOptions.getYarnSessionConf();
110+
yarnSessionConf = URLDecoder.decode(yarnSessionConf, Charsets.UTF_8.toString());
111+
Properties yarnSessionConfProperties = PluginUtil.jsonStrToObject(yarnSessionConf, Properties.class);
112+
Object yid = yarnSessionConfProperties.get("yid");
113+
114+
if (null != yid) {
115+
applicationId = toApplicationId(yid.toString());
116+
} else {
117+
applicationId = getYarnClusterApplicationId(yarnClient);
118+
}
119+
System.out.println("applicationId=" + applicationId.toString());
145120

146-
System.out.println("applicationId="+applicationId.toString()+" has deploy!");
147-
}
148-
clusterClient.setDetached(true);
149-
yarnClient.stop();
150-
return clusterClient;
121+
if (StringUtils.isEmpty(applicationId.toString())) {
122+
throw new RuntimeException("No flink session found on yarn cluster.");
151123
}
152-
} catch(Exception e) {
124+
125+
AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(config, yarnConf, flinkConfDir, yarnClient, false);
126+
ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId);
127+
clusterClient.setDetached(true);
128+
return clusterClient;
129+
} catch (Exception e) {
153130
throw new RuntimeException(e);
154131
}
155132
}
156133
throw new UnsupportedOperationException("Haven't been developed yet!");
157134
}
158135

159-
private static YarnConfiguration loadYarnConfiguration(String yarnConfDir)
160-
{
161-
org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
162-
hadoopConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
163136

164-
Stream.of("yarn-site.xml", "core-site.xml", "hdfs-site.xml").forEach(file -> {
165-
File site = new File(requireNonNull(yarnConfDir, "ENV HADOOP_CONF_DIR is not setting"), file);
166-
if (site.exists() && site.isFile()) {
167-
hadoopConf.addResource(new org.apache.hadoop.fs.Path(site.toURI()));
168-
}
169-
else {
170-
throw new RuntimeException(site + " not exists");
171-
}
172-
});
173-
174-
YarnConfiguration yarnConf = new YarnConfiguration(hadoopConf);
175-
// try (PrintWriter pw = new PrintWriter(new FileWriter(yarnSite))) { //write local file
176-
// yarnConf.writeXml(pw);
177-
// }
178-
return yarnConf;
179-
}
180-
181-
public static YarnClusterConfiguration getYarnClusterConfiguration(Configuration flinkConf,YarnConfiguration yarnConf,String flinkConfDir)
182-
{
183-
Path flinkJar = new Path(getFlinkJarFile(flinkConfDir).toURI());
184-
@SuppressWarnings("ConstantConditions") final Set<Path> resourcesToLocalize = Stream
185-
.of("flink-conf.yaml", "log4j.properties")
186-
.map(x -> new Path(new File(flinkConfDir, x).toURI()))
187-
.collect(Collectors.toSet());
188-
189-
return new YarnClusterConfiguration(flinkConf, yarnConf, "", flinkJar, resourcesToLocalize);
190-
}
191-
192-
public static final String FLINK_DIST = "flink-dist";
193-
private static File getFlinkJarFile(String flinkConfDir)
194-
{
195-
String errorMessage = "error not search " + FLINK_DIST + "*.jar";
196-
File[] files = requireNonNull(new File(flinkConfDir, "/../lib").listFiles(), errorMessage);
197-
Optional<File> file = Arrays.stream(files)
198-
.filter(f -> f.getName().startsWith(FLINK_DIST)).findFirst();
199-
return file.orElseThrow(() -> new IllegalArgumentException(errorMessage));
200-
}
201-
202-
private static ApplicationId createApplication(YarnClient yarnClient)throws IOException, YarnException {
203-
YarnClientApplication app = yarnClient.createApplication();
204-
return app.getApplicationSubmissionContext().getApplicationId();
205-
}
206-
private static ApplicationId getYarnClusterApplicationId(YarnClient yarnClient) throws Exception{
137+
private static ApplicationId getYarnClusterApplicationId(YarnClient yarnClient) throws Exception {
207138
ApplicationId applicationId = null;
208139

209140
Set<String> set = new HashSet<>();
@@ -214,51 +145,31 @@ private static ApplicationId getYarnClusterApplicationId(YarnClient yarnClient)
214145

215146
int maxMemory = -1;
216147
int maxCores = -1;
217-
for(ApplicationReport report : reportList) {
218-
if(!report.getName().startsWith("Flink session")){
148+
for (ApplicationReport report : reportList) {
149+
if (!report.getName().startsWith("Flink session")) {
219150
continue;
220151
}
221152

222-
if(!report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
153+
if (!report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
223154
continue;
224155
}
225156

226157
int thisMemory = report.getApplicationResourceUsageReport().getNeededResources().getMemory();
227158
int thisCores = report.getApplicationResourceUsageReport().getNeededResources().getVirtualCores();
228-
if(thisMemory > maxMemory || thisMemory == maxMemory && thisCores > maxCores) {
159+
if (thisMemory > maxMemory || thisMemory == maxMemory && thisCores > maxCores) {
229160
maxMemory = thisMemory;
230161
maxCores = thisCores;
231162
applicationId = report.getApplicationId();
232163
}
233164

234165
}
235166

236-
if(StringUtils.isEmpty(applicationId.toString())) {
167+
if (StringUtils.isEmpty(applicationId.toString())) {
237168
throw new RuntimeException("No flink session found on yarn cluster.");
238169
}
239170
return applicationId;
240171
}
241172

242-
/**
243-
* 处理yarn HA的配置项
244-
*/
245-
private static org.apache.hadoop.conf.Configuration haYarnConf(org.apache.hadoop.conf.Configuration yarnConf) {
246-
Iterator<Map.Entry<String, String>> iterator = yarnConf.iterator();
247-
while(iterator.hasNext()) {
248-
Map.Entry<String,String> entry = iterator.next();
249-
String key = entry.getKey();
250-
String value = entry.getValue();
251-
if(key.startsWith("yarn.resourcemanager.hostname.")) {
252-
String rm = key.substring("yarn.resourcemanager.hostname.".length());
253-
String addressKey = "yarn.resourcemanager.address." + rm;
254-
if(yarnConf.get(addressKey) == null) {
255-
yarnConf.set(addressKey, value + ":" + YarnConfiguration.DEFAULT_RM_PORT);
256-
}
257-
}
258-
}
259-
return yarnConf;
260-
}
261-
262173
private static ApplicationId toApplicationId(String appIdStr) {
263174
Iterator<String> it = StringHelper._split(appIdStr).iterator();
264175
if (!(it.next()).equals("application")) {

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public static void main(String[] args) throws Exception {
9494
String flinkConfDir = launcherOptions.getFlinkconf();
9595
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
9696
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, config, 1);
97-
PerJobSubmitter.submit(launcherOptions, jobGraph);
97+
PerJobSubmitter.submit(launcherOptions, jobGraph, config);
9898
} else {
9999
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
100100
clusterClient.run(program, 1);

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ public void init(String yarnConfDir){
6565
System.out.println("----init yarn success ----");
6666
}
6767

68-
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties confProp, String flinkJarPath, Options launcherOptions, JobGraph jobGraph) throws MalformedURLException {
69-
Configuration newConf = new Configuration();
70-
confProp.forEach((key, val) -> newConf.setString(key.toString(), val.toString()));
68+
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties confProp, String flinkJarPath, Options launcherOptions, JobGraph jobGraph, Configuration flinkConfig)
69+
throws MalformedURLException {
7170

72-
AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(newConf, yarnConf, ".");
71+
confProp.forEach((key, val) -> flinkConfig.setString(key.toString(), val.toString()));
72+
AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(flinkConfig, yarnConf, launcherOptions.getFlinkconf());
7373

7474
if (StringUtils.isNotBlank(flinkJarPath)) {
7575
if (!new File(flinkJarPath).exists()) {
@@ -103,6 +103,7 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties co
103103
}
104104

105105
clusterDescriptor.addShipFiles(shipFiles);
106+
clusterDescriptor.setName(launcherOptions.getName());
106107
String queue = launcherOptions.getQueue();
107108
if (!Strings.isNullOrEmpty(queue)) {
108109
clusterDescriptor.setQueue(queue);

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.commons.lang3.StringUtils;
2525
import org.apache.flink.client.deployment.ClusterSpecification;
2626
import org.apache.flink.client.program.ClusterClient;
27+
import org.apache.flink.configuration.Configuration;
2728
import org.apache.flink.core.fs.Path;
2829
import org.apache.flink.runtime.jobgraph.JobGraph;
2930
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
@@ -46,7 +47,7 @@ public class PerJobSubmitter {
4647

4748
private static final Logger LOG = LoggerFactory.getLogger(PerJobSubmitter.class);
4849

49-
public static String submit(Options launcherOptions, JobGraph jobGraph) throws Exception {
50+
public static String submit(Options launcherOptions, JobGraph jobGraph, Configuration flinkConfig) throws Exception {
5051
if (!StringUtils.isBlank(launcherOptions.getAddjar())) {
5152
String addjarPath = URLDecoder.decode(launcherOptions.getAddjar(), Charsets.UTF_8.toString());
5253
List<String> paths = getJarPaths(addjarPath);
@@ -65,7 +66,7 @@ public static String submit(Options launcherOptions, JobGraph jobGraph) throws E
6566

6667
String flinkJarPath = launcherOptions.getFlinkJarPath();
6768

68-
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(confProperties, flinkJarPath, launcherOptions, jobGraph);
69+
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(confProperties, flinkJarPath, launcherOptions, jobGraph, flinkConfig);
6970
ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph,true);
7071

7172
String applicationId = clusterClient.getClusterId().toString();

0 commit comments

Comments
 (0)