Skip to content

Commit 280d36f

Browse files
committed
Merge branch '1.8.0_dev_laucher' into 'v1.8.0_dev'
[LauncherMain启动依赖配置调整] See merge request !72
2 parents d1d204b + 2051007 commit 280d36f

File tree

4 files changed

+51
-35
lines changed

4 files changed

+51
-35
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
119
package com.dtstack.flink.sql.exec;
220

321
import org.apache.calcite.sql.SqlIdentifier;

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
import java.io.File;
3232
import java.net.MalformedURLException;
33-
import java.net.URL;
3433
import java.util.ArrayList;
3534
import java.util.List;
3635
import java.util.Properties;
@@ -75,23 +74,22 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties co
7574

7675
}
7776

78-
List<URL> classpaths = new ArrayList<>();
77+
List<File> shipFiles = new ArrayList<>();
7978
if (flinkJarPath != null) {
8079
File[] jars = new File(flinkJarPath).listFiles();
8180

8281
for (File file : jars){
8382
if (file.toURI().toURL().toString().contains("flink-dist")){
8483
clusterDescriptor.setLocalJarPath(new Path(file.toURI().toURL().toString()));
8584
} else {
86-
classpaths.add(file.toURI().toURL());
85+
shipFiles.add(file);
8786
}
8887
}
8988

9089
} else {
9190
throw new RuntimeException("The Flink jar path is null");
9291
}
93-
94-
//clusterDescriptor.setProvidedUserJarFiles(classpaths);
92+
clusterDescriptor.addShipFiles(shipFiles);
9593

9694
if(!Strings.isNullOrEmpty(queue)){
9795
clusterDescriptor.setQueue(queue);

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@
2222
import com.dtstack.flink.sql.util.PluginUtil;
2323
import org.apache.commons.io.Charsets;
2424
import org.apache.commons.lang3.StringUtils;
25+
import org.apache.flink.api.common.cache.DistributedCache;
2526
import org.apache.flink.client.deployment.ClusterSpecification;
2627
import org.apache.flink.client.program.ClusterClient;
2728
import org.apache.flink.core.fs.Path;
2829
import org.apache.flink.runtime.jobgraph.JobGraph;
29-
import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
30-
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
3130
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
3231
import org.apache.hadoop.yarn.api.records.ApplicationId;
3332
import org.slf4j.Logger;
@@ -36,7 +35,10 @@
3635
import java.net.MalformedURLException;
3736
import java.net.URL;
3837
import java.net.URLDecoder;
39-
import java.util.*;
38+
import java.util.Arrays;
39+
import java.util.List;
40+
import java.util.Map;
41+
import java.util.Properties;
4042

4143
/**
4244
* per job mode submitter
@@ -54,7 +56,7 @@ public static String submit(Options launcherOptions, JobGraph jobGraph) throws E
5456
fillJobGraphClassPath(jobGraph);
5557
if (!StringUtils.isBlank(launcherOptions.getAddjar())) {
5658
String addjarPath = URLDecoder.decode(launcherOptions.getAddjar(), Charsets.UTF_8.toString());
57-
List<String> paths = getJarPaths(addjarPath);
59+
List<String> paths = getJarPaths(addjarPath);
5860
paths.forEach( path -> {
5961
jobGraph.addJar(new Path("file://" + path));
6062
});
@@ -94,31 +96,11 @@ private static List<String> getJarPaths(String addjarPath) {
9496
}
9597

9698
private static void fillJobGraphClassPath(JobGraph jobGraph) throws MalformedURLException {
97-
Map<String, String> jobCacheFileConfig = jobGraph.getJobConfiguration().toMap();
98-
Set<String> classPathKeySet = Sets.newHashSet();
99-
100-
for(Map.Entry<String, String> tmp : jobCacheFileConfig.entrySet()){
101-
if(Strings.isNullOrEmpty(tmp.getValue())){
102-
continue;
103-
}
104-
105-
if(tmp.getValue().startsWith("class_path")){
106-
//DISTRIBUTED_CACHE_FILE_NAME_1
107-
//DISTRIBUTED_CACHE_FILE_PATH_1
108-
String key = tmp.getKey();
109-
String[] array = key.split("_");
110-
if(array.length < 5){
111-
continue;
112-
}
113-
114-
array[3] = "PATH";
115-
classPathKeySet.add(StringUtils.join(array, "_"));
116-
}
117-
}
118-
119-
for(String key : classPathKeySet){
120-
String pathStr = jobCacheFileConfig.get(key);
121-
jobGraph.getClasspaths().add(new URL("file:" + pathStr));
122-
}
99+
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
100+
for(Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()){
101+
if(tmp.getKey().startsWith("class_path")){
102+
jobGraph.getClasspaths().add(new URL("file:" + tmp.getValue().filePath));
103+
}
104+
}
123105
}
124106
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/provider/DTC3P0DataSourceProvider.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
119
package com.dtstack.flink.sql.side.rdb.provider;
220

321
import com.mchange.v2.c3p0.ComboPooledDataSource;

0 commit comments

Comments
 (0)