Skip to content

Commit 9aaad89

Browse files
authored
[Feature-3973][*] Release 1.2.0-rc4 (#3981)
1 parent 721ef1f commit 9aaad89

File tree

11 files changed

+349
-42
lines changed

11 files changed

+349
-42
lines changed

.gitignore

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ dinky.db
3030
hs_err_pid*
3131
.idea/*
3232
!.idea/icon.svg
33-
!.idea/vcs.xml
3433
build
3534
target/*
3635
*.iml
@@ -63,4 +62,6 @@ tmp/*
6362
extends/*
6463
/.run/
6564

66-
.idea
65+
.idea
66+
.idea/vcs.xml
67+
dinky-web/package-lock.json

dinky-admin/src/main/java/org/dinky/init/FlinkHistoryServer.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919

2020
package org.dinky.init;
2121

22-
import org.dinky.data.model.ResourcesModelEnum;
23-
import org.dinky.data.model.S3Configuration;
2422
import org.dinky.data.model.SystemConfiguration;
25-
import org.dinky.data.properties.OssProperties;
23+
import org.dinky.resource.BaseResourceManager;
2624
import org.dinky.service.JobInstanceService;
2725
import org.dinky.service.SysConfigService;
2826

@@ -63,16 +61,7 @@ public FlinkHistoryServer(JobInstanceService jobInstanceService, SysConfigServic
6361
this.historyRunnable = () -> {
6462
Map<String, String> flinkHistoryServerConfiguration =
6563
SystemConfiguration.getInstances().getFlinkHistoryServerConfiguration();
66-
if (systemConfiguration.getResourcesEnable().getValue()) {
67-
if (systemConfiguration.getResourcesModel().getValue().equals(ResourcesModelEnum.OSS)) {
68-
OssProperties ossProperties = systemConfiguration.getOssProperties();
69-
flinkHistoryServerConfiguration.put(S3Configuration.ENDPOINT, ossProperties.getEndpoint());
70-
flinkHistoryServerConfiguration.put(S3Configuration.ACCESS_KEY, ossProperties.getAccessKey());
71-
flinkHistoryServerConfiguration.put(S3Configuration.SECRET_KEY, ossProperties.getSecretKey());
72-
flinkHistoryServerConfiguration.put(
73-
S3Configuration.PATH_STYLE_ACCESS, String.valueOf(ossProperties.getPathStyleAccess()));
74-
}
75-
}
64+
flinkHistoryServerConfiguration.putAll(BaseResourceManager.convertFlinkResourceConfig());
7665

7766
HistoryServerUtil.run(
7867
(jobId) -> {

dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/convert/DataTypeConverter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import java.time.LocalDateTime;
5050
import java.time.LocalTime;
5151
import java.time.ZoneId;
52-
import java.util.Optional;
5352
import java.util.concurrent.TimeUnit;
5453

5554
import javax.xml.bind.DatatypeConverter;
@@ -117,7 +116,7 @@ public static LogicalType getLogicalType(Column column) {
117116

118117
public static Object convertToRow(Object value, LogicalType logicalType, ZoneId timeZone) {
119118
if (Asserts.isNull(value)) {
120-
return Optional.empty();
119+
return null;
121120
}
122121
switch (logicalType.getTypeRoot()) {
123122
case BOOLEAN:
@@ -163,7 +162,7 @@ public static Object convertToRow(Object value, LogicalType logicalType, ZoneId
163162

164163
public static Object convertToRowData(Object value, LogicalType logicalType, ZoneId timeZone) {
165164
if (Asserts.isNull(value)) {
166-
return Optional.empty();
165+
return null;
167166
}
168167
switch (logicalType.getTypeRoot()) {
169168
case BOOLEAN:

dinky-client/dinky-client-base/src/main/java/org/dinky/resource/BaseResourceManager.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919

2020
package org.dinky.resource;
2121

22+
import org.dinky.assertion.Asserts;
2223
import org.dinky.data.exception.DinkyException;
24+
import org.dinky.data.model.ResourcesModelEnum;
2325
import org.dinky.data.model.ResourcesVO;
26+
import org.dinky.data.model.S3Configuration;
2427
import org.dinky.data.model.SystemConfiguration;
28+
import org.dinky.data.properties.OssProperties;
2529
import org.dinky.oss.OssTemplate;
2630
import org.dinky.resource.impl.HdfsResourceManager;
2731
import org.dinky.resource.impl.LocalResourceManager;
@@ -34,7 +38,9 @@
3438
import java.io.IOException;
3539
import java.io.InputStream;
3640
import java.nio.charset.Charset;
41+
import java.util.HashMap;
3742
import java.util.List;
43+
import java.util.Map;
3844

3945
import cn.hutool.core.io.FileUtil;
4046
import cn.hutool.core.io.IoUtil;
@@ -74,6 +80,9 @@ static BaseResourceManager getInstance() {
7480
}
7581

7682
static void initResourceManager() {
83+
if (Asserts.isNull(instances.getResourcesModel().getValue())) {
84+
return;
85+
}
7786
switch (instances.getResourcesModel().getValue()) {
7887
case LOCAL:
7988
Singleton.get(LocalResourceManager.class);
@@ -106,6 +115,37 @@ static void initResourceManager() {
106115
}
107116
}
108117

118+
static Map<String, String> convertFlinkResourceConfig() {
119+
Map<String, String> flinkConfig = new HashMap<>();
120+
if (!instances.getResourcesEnable().getValue()) {
121+
return flinkConfig;
122+
}
123+
if (instances.getResourcesModel().getValue().equals(ResourcesModelEnum.OSS)) {
124+
OssProperties ossProperties = instances.getOssProperties();
125+
flinkConfig.put(S3Configuration.ENDPOINT, ossProperties.getEndpoint());
126+
flinkConfig.put(S3Configuration.ACCESS_KEY, ossProperties.getAccessKey());
127+
flinkConfig.put(S3Configuration.SECRET_KEY, ossProperties.getSecretKey());
128+
flinkConfig.put(S3Configuration.PATH_STYLE_ACCESS, String.valueOf(ossProperties.getPathStyleAccess()));
129+
} else if (instances.getResourcesModel().getValue().equals(ResourcesModelEnum.HDFS)) {
130+
final Configuration configuration = new Configuration();
131+
Charset charset = Charset.defaultCharset();
132+
String coreSite = instances.getResourcesHdfsCoreSite().getValue();
133+
Opt.ofBlankAble(coreSite).ifPresent(x -> configuration.addResource(IoUtil.toStream(x, charset)));
134+
String hdfsSite = instances.getResourcesHdfsHdfsSite().getValue();
135+
Opt.ofBlankAble(hdfsSite).ifPresent(x -> configuration.addResource(IoUtil.toStream(x, charset)));
136+
configuration.reloadConfiguration();
137+
if (StrUtil.isEmpty(coreSite)) {
138+
configuration.set(
139+
"fs.defaultFS", instances.getResourcesHdfsDefaultFS().getValue());
140+
}
141+
Map<String, String> hadoopConfig = configuration.getValByRegex(".*");
142+
hadoopConfig.forEach((key, value) -> {
143+
flinkConfig.put("flink.hadoop." + key, value);
144+
});
145+
}
146+
return flinkConfig;
147+
}
148+
109149
default String getFilePath(String path) {
110150
return FileUtil.normalize(FileUtil.file(getBasePath(), path).toString());
111151
}

dinky-core/src/main/java/org/dinky/executor/Executor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.dinky.interceptor.FlinkInterceptor;
3232
import org.dinky.interceptor.FlinkInterceptorResult;
3333
import org.dinky.job.JobStatementPlan;
34+
import org.dinky.resource.BaseResourceManager;
3435
import org.dinky.trans.Operations;
3536
import org.dinky.utils.KerberosUtil;
3637

@@ -169,6 +170,7 @@ private void initClassloader(DinkyClassLoader classLoader) {
169170

170171
protected void init(DinkyClassLoader classLoader) {
171172
initClassloader(classLoader);
173+
initFileSystem();
172174
this.dinkyClassLoader = classLoader;
173175
Thread.currentThread().setContextClassLoader(classLoader);
174176
if (executorConfig.isValidParallelism()) {
@@ -195,6 +197,10 @@ protected void init(DinkyClassLoader classLoader) {
195197
isMockTest = false;
196198
}
197199

200+
private void initFileSystem() {
201+
BaseResourceManager.initResourceManager();
202+
}
203+
198204
abstract CustomTableEnvironment createCustomTableEnvironment(ClassLoader classLoader);
199205

200206
public JobStatementPlan parseStatementIntoJobStatementPlan(String[] statements) {

dinky-core/src/main/java/org/dinky/explainer/Explainer.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,24 +98,29 @@ public Explainer initialize(JobConfig config, String statement) {
9898
}
9999

100100
public JobStatementPlan parseStatements(String[] statements) {
101-
JobStatementPlan jobStatementPlanWithUDFAndMock = new JobStatementPlan();
101+
JobStatementPlan jobStatementPlanWithMock = new JobStatementPlan();
102+
generateUDFStatement(jobStatementPlanWithMock);
103+
104+
JobStatementPlan jobStatementPlan = executor.parseStatementIntoJobStatementPlan(statements);
105+
jobStatementPlanWithMock.getJobStatementList().addAll(jobStatementPlan.getJobStatementList());
106+
if (!jobManager.isPlanMode() && jobManager.getConfig().isMockSinkFunction()) {
107+
executor.setMockTest(true);
108+
MockStatementExplainer.build(executor.getCustomTableEnvironment())
109+
.jobStatementPlanMock(jobStatementPlanWithMock);
110+
}
111+
return jobStatementPlanWithMock;
112+
}
113+
114+
private void generateUDFStatement(JobStatementPlan jobStatementPlan) {
102115
List<String> udfStatements = new ArrayList<>();
103116
Optional.ofNullable(jobManager.getConfig().getUdfRefer())
104117
.ifPresent(t -> t.forEach((key, value) -> {
105118
String sql = String.format("create temporary function %s as '%s'", value, key);
106119
udfStatements.add(sql);
107120
}));
108121
for (String udfStatement : udfStatements) {
109-
jobStatementPlanWithUDFAndMock.addJobStatement(udfStatement, JobStatementType.DDL, SqlType.CREATE);
110-
}
111-
JobStatementPlan jobStatementPlan = executor.parseStatementIntoJobStatementPlan(statements);
112-
jobStatementPlanWithUDFAndMock.getJobStatementList().addAll(jobStatementPlan.getJobStatementList());
113-
if (!jobManager.isPlanMode() && jobManager.getConfig().isMockSinkFunction()) {
114-
executor.setMockTest(true);
115-
MockStatementExplainer.build(executor.getCustomTableEnvironment())
116-
.jobStatementPlanMock(jobStatementPlanWithUDFAndMock);
122+
jobStatementPlan.addJobStatement(udfStatement, JobStatementType.DDL, SqlType.CREATE);
117123
}
118-
return jobStatementPlanWithUDFAndMock;
119124
}
120125

121126
public List<UDF> parseUDFFromStatements(String[] statements) {

dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.dinky.job.Job;
3535
import org.dinky.job.JobConfig;
3636
import org.dinky.job.JobManager;
37+
import org.dinky.trans.parse.ExecuteJarParseStrategy;
3738
import org.dinky.utils.FlinkStreamEnvironmentUtil;
3839
import org.dinky.utils.LogUtil;
3940
import org.dinky.utils.SqlUtil;
@@ -66,6 +67,11 @@ public JobPipelineRunner(JobManager jobManager) {
6667

6768
@Override
6869
public void run(JobStatement jobStatement) throws Exception {
70+
if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) {
71+
JobJarRunner jobJarRunner = new JobJarRunner(jobManager);
72+
jobJarRunner.run(jobStatement);
73+
return;
74+
}
6975
statements.add(jobStatement);
7076
tableResult = jobManager.getExecutor().executeSql(jobStatement.getStatement());
7177
if (statements.size() == 1) {
@@ -83,6 +89,10 @@ public void run(JobStatement jobStatement) throws Exception {
8389

8490
@Override
8591
public SqlExplainResult explain(JobStatement jobStatement) {
92+
if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) {
93+
JobJarRunner jobJarRunner = new JobJarRunner(jobManager);
94+
return jobJarRunner.explain(jobStatement);
95+
}
8696
SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder();
8797
statements.add(jobStatement);
8898
// pipeline job execute to generate stream graph.
@@ -131,6 +141,10 @@ public SqlExplainResult explain(JobStatement jobStatement) {
131141

132142
@Override
133143
public StreamGraph getStreamGraph(JobStatement jobStatement) {
144+
if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) {
145+
JobJarRunner jobJarRunner = new JobJarRunner(jobManager);
146+
return jobJarRunner.getStreamGraph(jobStatement);
147+
}
134148
statements.add(jobStatement);
135149
// pipeline job execute to generate stream graph.
136150
jobManager.getExecutor().executeSql(jobStatement.getStatement());
@@ -144,6 +158,10 @@ public StreamGraph getStreamGraph(JobStatement jobStatement) {
144158

145159
@Override
146160
public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) {
161+
if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) {
162+
JobJarRunner jobJarRunner = new JobJarRunner(jobManager);
163+
return jobJarRunner.getJobPlanInfo(jobStatement);
164+
}
147165
statements.add(jobStatement);
148166
// pipeline job execute to generate stream graph.
149167
jobManager.getExecutor().executeSql(jobStatement.getStatement());

dinky-function/src/main/java/org/dinky/function/FlinkUDFDiscover.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,21 @@
2020
package org.dinky.function;
2121

2222
import org.dinky.function.data.model.UDF;
23+
import org.dinky.function.util.Reflections;
2324

2425
import org.apache.commons.collections.CollectionUtils;
2526
import org.apache.flink.table.catalog.FunctionLanguage;
2627
import org.apache.flink.table.functions.UserDefinedFunction;
2728

29+
import java.io.File;
30+
import java.net.URL;
31+
import java.util.ArrayList;
32+
import java.util.Collection;
2833
import java.util.List;
2934
import java.util.Set;
3035
import java.util.stream.Collectors;
3136

32-
import org.reflections.Reflections;
3337
import org.reflections.scanners.Scanners;
34-
import org.reflections.util.ClasspathHelper;
3538
import org.reflections.util.ConfigurationBuilder;
3639

3740
public class FlinkUDFDiscover {
@@ -42,9 +45,28 @@ public static List<UDF> getCustomStaticUDFs() {
4245
if (CollectionUtils.isNotEmpty(JAVA_STATIC_UDF_LIST)) {
4346
return JAVA_STATIC_UDF_LIST;
4447
}
48+
Collection<URL> urls = new ArrayList<>();
49+
String javaClassPath = System.getProperty("java.class.path");
50+
if (javaClassPath != null) {
51+
for (String path : javaClassPath.split(File.pathSeparator)) {
52+
if (path.contains("/*")) {
53+
continue;
54+
}
55+
if (!path.contains("extends") && !path.contains("customJar")) {
56+
continue;
57+
}
4558

46-
Reflections reflections =
47-
new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
59+
try {
60+
urls.add(new File(path).toURI().toURL());
61+
} catch (Exception e) {
62+
if (Reflections.log != null) {
63+
Reflections.log.warn("Could not get URL", e);
64+
}
65+
}
66+
}
67+
}
68+
69+
Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(urls));
4870
Set<Class<?>> operations =
4971
reflections.get(Scanners.SubTypes.of(UserDefinedFunction.class).asClass());
5072
return operations.stream()

0 commit comments

Comments
 (0)