Skip to content

Commit 78d34fd

Browse files
authored
[Fix-4162][flink]Fix set statement is not effective in application mode (#4163)
1 parent 226d052 commit 78d34fd

File tree

14 files changed

+238
-168
lines changed

14 files changed

+238
-168
lines changed

dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java

Lines changed: 23 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,25 @@
2020
package org.dinky.app.flinksql;
2121

2222
import org.dinky.app.db.DBUtil;
23-
import org.dinky.app.model.StatementParam;
2423
import org.dinky.app.model.SysConfig;
2524
import org.dinky.app.util.FlinkAppUtil;
2625
import org.dinky.assertion.Asserts;
2726
import org.dinky.classloader.DinkyClassLoader;
2827
import org.dinky.config.Dialect;
2928
import org.dinky.constant.CustomerConfigureOptions;
30-
import org.dinky.constant.FlinkSQLConstant;
3129
import org.dinky.data.app.AppParamConfig;
3230
import org.dinky.data.app.AppTask;
3331
import org.dinky.data.constant.DirConstant;
3432
import org.dinky.data.enums.GatewayType;
33+
import org.dinky.data.job.JobStatement;
3534
import org.dinky.data.job.SqlType;
3635
import org.dinky.data.model.SystemConfiguration;
3736
import org.dinky.executor.Executor;
3837
import org.dinky.executor.ExecutorConfig;
3938
import org.dinky.executor.ExecutorFactory;
39+
import org.dinky.explainer.Explainer;
40+
import org.dinky.job.JobRunnerFactory;
41+
import org.dinky.job.JobStatementPlan;
4042
import org.dinky.resource.BaseResourceManager;
4143
import org.dinky.trans.Operations;
4244
import org.dinky.trans.dml.ExecuteJarOperation;
@@ -60,7 +62,6 @@
6062
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
6163
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
6264
import org.apache.flink.streaming.api.graph.StreamGraph;
63-
import org.apache.flink.table.api.TableResult;
6465

6566
import java.io.File;
6667
import java.io.IOException;
@@ -72,7 +73,6 @@
7273
import java.nio.charset.Charset;
7374
import java.sql.SQLException;
7475
import java.time.LocalDateTime;
75-
import java.util.ArrayList;
7676
import java.util.Arrays;
7777
import java.util.HashMap;
7878
import java.util.List;
@@ -141,7 +141,7 @@ public static void submit(AppParamConfig config) throws SQLException {
141141
if (Dialect.FLINK_JAR == appTask.getDialect()) {
142142
jobClient = executeJarJob(appTask.getType(), executor, statements);
143143
} else {
144-
jobClient = executeJob(executor, statements);
144+
jobClient = executeJob(executor, sql);
145145
}
146146
} finally {
147147
log.info("Start Monitor Job");
@@ -305,85 +305,31 @@ public static Optional<JobClient> executeJarJob(String type, Executor executor,
305305
return jobClient;
306306
}
307307

308-
public static Optional<JobClient> executeJob(Executor executor, String[] statements) {
308+
public static Optional<JobClient> executeJob(Executor executor, String statements) {
309309
Optional<JobClient> jobClient = Optional.empty();
310310

311-
ExecutorConfig executorConfig = executor.getExecutorConfig();
312-
List<StatementParam> ddl = new ArrayList<>();
313-
List<StatementParam> trans = new ArrayList<>();
314-
List<StatementParam> execute = new ArrayList<>();
315-
316-
for (String item : statements) {
317-
if (item.isEmpty()) {
318-
continue;
319-
}
320-
321-
SqlType operationType = Operations.getOperationType(item);
322-
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
323-
trans.add(new StatementParam(item, operationType));
324-
if (!executorConfig.isUseStatementSet()) {
325-
break;
326-
}
327-
} else if (operationType.equals(SqlType.EXECUTE)) {
328-
execute.add(new StatementParam(item, operationType));
329-
if (!executorConfig.isUseStatementSet()) {
330-
break;
331-
}
332-
} else {
333-
ddl.add(new StatementParam(item, operationType));
334-
}
335-
}
336-
337-
for (StatementParam item : ddl) {
338-
log.info("Executing FlinkSQL: {}", item.getValue());
339-
executor.executeSql(item.getValue());
340-
log.info("Execution succeeded.");
341-
}
342-
343-
if (!trans.isEmpty()) {
344-
if (executorConfig.isUseStatementSet()) {
345-
List<String> inserts = new ArrayList<>();
346-
for (StatementParam item : trans) {
347-
if (item.getType().equals(SqlType.INSERT)) {
348-
inserts.add(item.getValue());
349-
}
350-
}
351-
log.info("Executing FlinkSQL statement set: {}", String.join(FlinkSQLConstant.SEPARATOR, inserts));
352-
TableResult tableResult = executor.executeStatementSet(inserts);
353-
jobClient = tableResult.getJobClient();
354-
log.info("Execution succeeded.");
355-
} else {
356-
// UseStatementSet defaults to true, where the logic is never executed
357-
StatementParam item = trans.get(0);
358-
log.info("Executing FlinkSQL: {}", item.getValue());
359-
TableResult tableResult = executor.executeSql(item.getValue());
360-
jobClient = tableResult.getJobClient();
311+
JobStatementPlan jobStatementPlan =
312+
Explainer.build(executor).parseStatementsForApplicationMode(SqlUtil.getStatements(statements));
313+
jobStatementPlan.buildFinalExecutableStatement();
314+
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(executor);
315+
String currentSql = "";
316+
try {
317+
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
318+
currentSql = jobStatement.getStatement();
319+
log.info("Executing FlinkSQL: {}", currentSql);
320+
Optional<JobClient> optionalJobClient = jobRunnerFactory
321+
.getJobRunner(jobStatement.getStatementType())
322+
.execute(jobStatement);
361323
log.info("Execution succeeded.");
362-
}
363-
}
364-
365-
if (!execute.isEmpty()) {
366-
List<String> executes = new ArrayList<>();
367-
for (StatementParam item : execute) {
368-
executes.add(item.getValue());
369-
executor.executeSql(item.getValue());
370-
if (!executorConfig.isUseStatementSet()) {
324+
if (optionalJobClient.isPresent()) {
325+
jobClient = optionalJobClient;
371326
break;
372327
}
373328
}
374-
375-
log.info(
376-
"The FlinkSQL statement set is being executed: {}",
377-
String.join(FlinkSQLConstant.SEPARATOR, executes));
378-
try {
379-
JobClient client = executor.executeAsync(executorConfig.getJobName());
380-
jobClient = Optional.of(client);
381-
log.info("The execution was successful");
382-
} catch (Exception e) {
383-
log.error("Execution failed, {}", e.getMessage(), e);
384-
}
329+
} catch (Exception e) {
330+
log.error("Execution failed. Current statement: {} \n Error: {}", currentSql, e.getMessage(), e);
385331
}
386-
log.info("{} The task is successfully submitted", LocalDateTime.now());
332+
log.info("The task is successfully submitted.");
387333
return jobClient;
388334
}
389335
}

dinky-core/src/main/java/org/dinky/explainer/Explainer.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,23 @@
6262
public class Explainer {
6363

6464
private Executor executor;
65-
private boolean useStatementSet;
6665
private JobManager jobManager;
6766

68-
public Explainer(Executor executor, boolean useStatementSet, JobManager jobManager) {
67+
public Explainer(Executor executor) {
68+
this.executor = executor;
69+
}
70+
71+
public Explainer(Executor executor, JobManager jobManager) {
6972
this.executor = executor;
70-
this.useStatementSet = useStatementSet;
7173
this.jobManager = jobManager;
7274
}
7375

74-
public static Explainer build(JobManager jobManager) {
75-
return new Explainer(jobManager.getExecutor(), true, jobManager);
76+
public static Explainer build(Executor executor) {
77+
return new Explainer(executor);
7678
}
7779

78-
public static Explainer build(Executor executor, boolean useStatementSet, JobManager jobManager) {
79-
return new Explainer(executor, useStatementSet, jobManager);
80+
public static Explainer build(JobManager jobManager) {
81+
return new Explainer(jobManager.getExecutor(), jobManager);
8082
}
8183

8284
public JobStatementPlan parseStatements(String[] statements) {
@@ -93,6 +95,10 @@ public JobStatementPlan parseStatements(String[] statements) {
9395
return jobStatementPlanWithMock;
9496
}
9597

98+
public JobStatementPlan parseStatementsForApplicationMode(String[] statements) {
99+
return executor.parseStatementIntoJobStatementPlan(statements);
100+
}
101+
96102
private void generateUDFStatement(JobStatementPlan jobStatementPlan) {
97103
List<String> udfStatements = new ArrayList<>();
98104
Optional.ofNullable(jobManager.getConfig().getUdfRefer())
@@ -185,7 +191,7 @@ public List<LineageRel> getLineage(String statement) {
185191
.type(GatewayType.LOCAL.getLongValue())
186192
.useRemote(false)
187193
.fragment(true)
188-
.statementSet(useStatementSet)
194+
.statementSet(false)
189195
.parallelism(1)
190196
.udfRefer(jobManager.getConfig().getUdfRefer())
191197
.configJson(executor.getTableConfig().getConfiguration().toMap())

dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,19 @@
4040
public class LineageBuilder {
4141

4242
public static LineageResult getColumnLineageByLogicalPlan(String statement, JobConfig jobConfig) {
43-
JobManager jobManager = JobManager.buildPlanMode(jobConfig);
44-
Explainer explainer = new Explainer(jobManager.getExecutor(), false, jobManager);
45-
return getColumnLineageByLogicalPlan(statement, explainer);
43+
Explainer explainer = Explainer.build(JobManager.buildPlanMode(jobConfig));
44+
return getColumnLineageByLogicalPlan(explainer.getLineage(statement));
4645
}
4746

4847
public static LineageResult getColumnLineageByLogicalPlan(String statement, ExecutorConfig executorConfig) {
4948
JobManager jobManager = JobManager.buildPlanMode(JobConfig.buildPlanConfig());
5049
Executor executor = ExecutorFactory.buildExecutor(executorConfig, jobManager.getDinkyClassLoader());
5150
jobManager.setExecutor(executor);
52-
Explainer explainer = new Explainer(executor, false, jobManager);
53-
return getColumnLineageByLogicalPlan(statement, explainer);
51+
Explainer explainer = Explainer.build(jobManager);
52+
return getColumnLineageByLogicalPlan(explainer.getLineage(statement));
5453
}
5554

56-
public static LineageResult getColumnLineageByLogicalPlan(String statement, Explainer explainer) {
57-
List<LineageRel> lineageRelList = explainer.getLineage(statement);
55+
public static LineageResult getColumnLineageByLogicalPlan(List<LineageRel> lineageRelList) {
5856
List<LineageRelation> relations = new ArrayList<>();
5957
Map<String, LineageTable> tableMap = new HashMap<>();
6058
int tableIndex = 1;

dinky-core/src/main/java/org/dinky/job/JobConfig.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -190,13 +190,6 @@ public class JobConfig {
190190
notes = "Flag indicating whether to mock sink function")
191191
private boolean mockSinkFunction;
192192

193-
@ApiModelProperty(
194-
value = "Flag indicating whether to be submission mode",
195-
dataType = "boolean",
196-
example = "true",
197-
notes = "Flag indicating whether to be submission mode")
198-
private boolean isSubmissionMode;
199-
200193
@ApiModelProperty(value = "Gateway configuration", dataType = "GatewayConfig", notes = "Gateway configuration")
201194
private GatewayConfig gatewayConfig;
202195

dinky-core/src/main/java/org/dinky/job/JobManager.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
import java.util.Map;
8080
import java.util.Objects;
8181
import java.util.Set;
82-
import java.util.stream.Collectors;
8382

8483
import com.fasterxml.jackson.databind.node.ObjectNode;
8584

@@ -242,17 +241,14 @@ public boolean close() {
242241

243242
@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
244243
public JobResult executeJarSql(String statement) throws Exception {
245-
List<String> statements = Arrays.stream(SqlUtil.getStatements(statement))
246-
.map(t -> executor.pretreatStatement(t))
247-
.collect(Collectors.toList());
248-
statement = String.join(";\n", statements);
249-
jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement));
250-
jobStatementPlan.buildFinalStatement();
251244
job = Job.build(runMode, config, executorConfig, executor, statement, useGateway);
252245
ready();
253-
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
254246
try {
247+
jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement));
248+
jobStatementPlan.buildFinalStatement();
249+
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
255250
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
251+
setCurrentSql(jobStatement.getStatement());
256252
jobRunnerFactory.getJobRunner(jobStatement.getStatementType()).run(jobStatement);
257253
}
258254
if (job.isFailed()) {
@@ -284,6 +280,7 @@ public JobResult executeSql(String statement) throws Exception {
284280
jobStatementPlan.buildFinalStatement();
285281
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
286282
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
283+
setCurrentSql(jobStatement.getStatement());
287284
jobRunnerFactory.getJobRunner(jobStatement.getStatementType()).run(jobStatement);
288285
}
289286
job.setEndTime(LocalDateTime.now());

dinky-core/src/main/java/org/dinky/job/JobRunner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,16 @@
2222
import org.dinky.data.job.JobStatement;
2323
import org.dinky.data.result.SqlExplainResult;
2424

25+
import org.apache.flink.core.execution.JobClient;
2526
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
2627
import org.apache.flink.streaming.api.graph.StreamGraph;
2728

29+
import java.util.Optional;
30+
2831
public interface JobRunner {
2932

33+
Optional<JobClient> execute(JobStatement jobStatement) throws Exception;
34+
3035
void run(JobStatement jobStatement) throws Exception;
3136

3237
SqlExplainResult explain(JobStatement jobStatement);

dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.dinky.job;
2121

2222
import org.dinky.data.job.JobStatementType;
23+
import org.dinky.executor.Executor;
2324
import org.dinky.job.runner.JobDDLRunner;
2425
import org.dinky.job.runner.JobJarRunner;
2526
import org.dinky.job.runner.JobPipelineRunner;
@@ -42,6 +43,14 @@ public JobRunnerFactory(JobManager jobManager) {
4243
this.jobJarRunner = new JobJarRunner(jobManager);
4344
}
4445

46+
public JobRunnerFactory(Executor executor) {
47+
this.jobSetRunner = new JobSetRunner(executor);
48+
this.jobSqlRunner = new JobSqlRunner(executor);
49+
this.jobPipelineRunner = new JobPipelineRunner(executor);
50+
this.jobDDLRunner = new JobDDLRunner(executor);
51+
this.jobJarRunner = new JobJarRunner(executor);
52+
}
53+
4554
public JobRunner getJobRunner(JobStatementType jobStatementType) {
4655
switch (jobStatementType) {
4756
case SET:
@@ -58,6 +67,10 @@ public JobRunner getJobRunner(JobStatementType jobStatementType) {
5867
}
5968
}
6069

70+
public static JobRunnerFactory create(Executor executor) {
71+
return new JobRunnerFactory(executor);
72+
}
73+
6174
public static JobRunnerFactory create(JobManager jobManager) {
6275
return new JobRunnerFactory(jobManager);
6376
}

dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,30 @@ public void buildFinalStatement() {
8383
}
8484
}
8585

86+
public void buildFinalExecutableStatement() {
87+
checkStatement();
88+
89+
int executableIndex = -1;
90+
for (int i = 0; i < jobStatementList.size(); i++) {
91+
if (jobStatementList.get(i).getSqlType().isPipeline()) {
92+
executableIndex = i;
93+
}
94+
}
95+
if (executableIndex >= 0) {
96+
jobStatementList.get(executableIndex).asFinalExecutableStatement();
97+
} else {
98+
// If there is no INSERT/CTAS/RTAS/CALL statement, use the first SELECT/WITH/SHOW/DESC SQL statement as the
99+
// final
100+
// statement.
101+
for (int i = 0; i < jobStatementList.size(); i++) {
102+
if (jobStatementList.get(i).getStatementType().equals(JobStatementType.SQL)) {
103+
jobStatementList.get(i).asFinalExecutableStatement();
104+
break;
105+
}
106+
}
107+
}
108+
}
109+
86110
public void checkStatement() {
87111
checkEmptyStatement();
88112
checkPipelineStatement();

0 commit comments

Comments
 (0)