Skip to content

Commit 83c10de

Browse files
authored
[feat-#1498][main] support run sql job with batch mode when set runMode=batch (#1504)
1 parent d65793b commit 83c10de

File tree

5 files changed

+16
-14
lines changed

5 files changed

+16
-14
lines changed

chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/dialect/GreenplumDialect.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,6 @@
2424

2525
import java.util.Optional;
2626

27-
/**
28-
* company www.dtstack.com
29-
*
30-
* @author jier
31-
*/
3227
public class GreenplumDialect extends PostgresqlDialect {
3328

3429
private static final String DIALECT_NAME = "Greenplum";

chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumOutputFormat.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@
4141
import java.sql.Connection;
4242
import java.sql.SQLException;
4343

44-
/**
45-
* @program: flinkx
46-
* @author: jier
47-
*/
4844
public class GreenplumOutputFormat extends JdbcOutputFormat {
4945

5046
// pg 字符串里含有\u0000 会报错 ERROR: invalid byte sequence for encoding "UTF8": 0x00

chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumSinkFactory.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@
2828

2929
import static com.dtstack.chunjun.connector.greenplum.sink.GreenplumOutputFormat.INSERT_SQL_MODE_TYPE;
3030

31-
/**
32-
* company www.dtstack.com
33-
*
34-
* @author jier
35-
*/
3631
public class GreenplumSinkFactory extends JdbcSinkFactory {
3732

3833
public GreenplumSinkFactory(SyncConf syncConf) {

chunjun-core/src/main/java/com/dtstack/chunjun/Main.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import com.dtstack.chunjun.util.TableUtil;
5353

5454
import org.apache.flink.api.common.JobExecutionResult;
55+
import org.apache.flink.api.common.RuntimeExecutionMode;
5556
import org.apache.flink.api.common.typeinfo.TypeInformation;
5657
import org.apache.flink.core.execution.JobClient;
5758
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -143,6 +144,8 @@ private static void exeSqlJob(
143144
try {
144145
configStreamExecutionEnvironment(env, options, null);
145146
List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(options.getAddjar());
147+
String runMode = options.getRunMode();
148+
if ("batch".equalsIgnoreCase(runMode)) env.setRuntimeMode(RuntimeExecutionMode.BATCH);
146149
StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tableEnv);
147150
TableResult execute = statementSet.execute();
148151
if (env instanceof MyLocalStreamEnvironment) {

chunjun-core/src/main/java/com/dtstack/chunjun/options/Options.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ public class Options {
8888
@OptionRequired(description = "file add to ship file")
8989
private String addShipfile;
9090

91+
@OptionRequired(description = "flink run mode")
92+
private String runMode;
93+
9194
private Configuration flinkConfiguration = null;
9295

9396
public Configuration loadFlinkConfiguration() {
@@ -235,6 +238,14 @@ public void setJobType(String jobType) {
235238
this.jobType = jobType;
236239
}
237240

241+
public String getRunMode() {
242+
return runMode;
243+
}
244+
245+
public void setRunMode(String runMode) {
246+
this.runMode = runMode;
247+
}
248+
238249
@Override
239250
public String toString() {
240251
return new StringJoiner(", ", Options.class.getSimpleName() + "[", "]")
@@ -248,10 +259,12 @@ public String toString() {
248259
.add("flinkLibDir='" + flinkLibDir + "'")
249260
.add("confProp='" + confProp + "'")
250261
.add("p='" + p + "'")
262+
.add("pj='" + pj + "'")
251263
.add("pluginLoadMode='" + pluginLoadMode + "'")
252264
.add("remoteChunJunDistDir='" + remoteChunJunDistDir + "'")
253265
.add("addjar='" + addjar + "'")
254266
.add("addShipfile='" + addShipfile + "'")
267+
.add("runMode='" + runMode + "'")
255268
.add("flinkConfiguration=" + flinkConfiguration)
256269
.toString();
257270
}

0 commit comments

Comments
 (0)