Skip to content

Commit 4b239c6

Browse files
committed
rdb side sql error
1 parent 690c80f commit 4b239c6

File tree

3 files changed

+22
-7
lines changed

3 files changed

+22
-7
lines changed

core/src/main/java/com/dtstack/flink/sql/util/ConfigConstrant.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,21 @@
2929
*/
3030
public class ConfigConstrant {
3131

32-
public static final String FLINK_CHECKPOINT_INTERVAL_KEY = "sql.checkpoint.interval";
32+
public static final String SQL_CHECKPOINT_INTERVAL_KEY = "sql.checkpoint.interval";
33+
// 兼容上层
34+
public static final String FLINK_CHECKPOINT_INTERVAL_KEY = "flink.checkpoint.interval";
3335

3436
public static final String FLINK_CHECKPOINT_MODE_KEY = "sql.checkpoint.mode";
3537

3638
public static final String FLINK_CHECKPOINT_TIMEOUT_KEY = "sql.checkpoint.timeout";
3739

3840
public static final String FLINK_MAXCONCURRENTCHECKPOINTS_KEY = "sql.max.concurrent.checkpoints";
3941

40-
public static final String FLINK_CHECKPOINT_CLEANUPMODE_KEY = "sql.checkpoint.cleanup.mode";
42+
public static final String SQL_CHECKPOINT_CLEANUPMODE_KEY = "sql.checkpoint.cleanup.mode";
43+
44+
public static final String FLINK_CHECKPOINT_CLEANUPMODE_KEY = "flink.checkpoint.cleanup.mode";
45+
46+
4147

4248
public static final String FLINK_CHECKPOINT_DATAURI_KEY = "flinkCheckpointDataURI";
4349

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,14 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
6767
}
6868

6969
//设置了时间间隔才表明开启了checkpoint
70-
if(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY) == null){
70+
if(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_INTERVAL_KEY) == null && properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY) == null){
7171
return;
7272
}else{
73-
Long interval = Long.valueOf(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY));
73+
Long sql_interval = Long.valueOf(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_INTERVAL_KEY,"0"));
74+
Long flink_interval = Long.valueOf(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY, "0"));
75+
long checkpointInterval = Math.max(sql_interval, flink_interval);
7476
//start checkpoint every ${interval}
75-
env.enableCheckpointing(interval);
77+
env.enableCheckpointing(checkpointInterval);
7678
}
7779

7880
String checkMode = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_MODE_KEY);
@@ -100,7 +102,14 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
100102
env.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrCheckpoints);
101103
}
102104

103-
String cleanupModeStr = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_CLEANUPMODE_KEY);
105+
Boolean sqlCleanMode = MathUtil.getBoolean(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_CLEANUPMODE_KEY), false);
106+
Boolean flinkCleanMode = MathUtil.getBoolean(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_CLEANUPMODE_KEY), false);
107+
108+
String cleanupModeStr = "false";
109+
if (sqlCleanMode || flinkCleanMode ){
110+
cleanupModeStr = "true";
111+
}
112+
104113
if ("true".equalsIgnoreCase(cleanupModeStr)){
105114
env.getCheckpointConfig().enableExternalizedCheckpoints(
106115
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
7272
for (int i = 0; i < equalFieldList.size(); i++) {
7373
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
7474

75-
sqlCondition += equalField + "\t" + sqlJoinCompareOperate.get(i) + " ";
75+
sqlCondition += equalField + " " + sqlJoinCompareOperate.get(i) + " ? ";
7676
if (i != equalFieldList.size() - 1) {
7777
sqlCondition += " and ";
7878
}

0 commit comments

Comments
 (0)