Skip to content

Commit 7f87bd7

Browse files
committed
reslove conflict
2 parents 654d499 + a7d94df commit 7f87bd7

File tree

13 files changed

+99
-15
lines changed

13 files changed

+99
-15
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ public static void main(String[] args) throws Exception {
205205
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
206206
}else{
207207
tableEnv.sqlUpdate(result.getExecSql());
208+
if(LOG.isInfoEnabled()){
209+
LOG.info("exec sql: " + result.getExecSql());
210+
}
208211
}
209212
}
210213
}
@@ -287,6 +290,9 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
287290

288291
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
289292
tableEnv.registerTable(tableInfo.getName(), regTable);
293+
if(LOG.isInfoEnabled()){
294+
LOG.info("registe table {} success.", tableInfo.getName());
295+
}
290296
registerTableCache.put(tableInfo.getName(), regTable);
291297
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
292298
} else if (tableInfo instanceof TargetTableInfo) {

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import org.apache.flink.table.api.Table;
5151
import org.apache.flink.table.api.java.StreamTableEnvironment;
5252
import org.apache.flink.types.Row;
53+
import org.slf4j.Logger;
54+
import org.slf4j.LoggerFactory;
5355

5456
import java.util.*;
5557

@@ -64,6 +66,8 @@
6466

6567
public class SideSqlExec {
6668

69+
private static final Logger LOG = LoggerFactory.getLogger(SideSqlExec.class);
70+
6771
private String localSqlPluginPath = null;
6872

6973
private String tmpFields = null;
@@ -102,6 +106,9 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
102106

103107
if(pollSqlNode.getKind() == INSERT){
104108
tableEnv.sqlUpdate(pollSqlNode.toString());
109+
if(LOG.isInfoEnabled()){
110+
LOG.info("exec sql: " + pollSqlNode.toString());
111+
}
105112
}else if(pollSqlNode.getKind() == AS){
106113
AliasInfo aliasInfo = parseASNode(pollSqlNode);
107114
Table table = tableEnv.sql(aliasInfo.getName());
@@ -531,6 +538,9 @@ public void registerTmpTable(CreateTmpTableParser.SqlParserResult result,
531538
AliasInfo aliasInfo = parseASNode(pollSqlNode);
532539
Table table = tableEnv.sql(aliasInfo.getName());
533540
tableEnv.registerTable(aliasInfo.getAlias(), table);
541+
if(LOG.isInfoEnabled()){
542+
LOG.info("Register Table {} by {}", aliasInfo.getAlias(), aliasInfo.getName());
543+
}
534544
localTableCache.put(aliasInfo.getAlias(), table);
535545
} else if (pollSqlNode.getKind() == SELECT){
536546
Table table = tableEnv.sqlQuery(pollObj.toString());

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,16 +101,14 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
101101
}
102102

103103
String cleanupModeStr = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_CLEANUPMODE_KEY);
104-
if(cleanupModeStr != null){//设置在cancel job情况下checkpoint是否被保存
105-
if("true".equalsIgnoreCase(cleanupModeStr)){
106-
env.getCheckpointConfig().enableExternalizedCheckpoints(
107-
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
108-
}else if("false".equalsIgnoreCase(cleanupModeStr)){
109-
env.getCheckpointConfig().enableExternalizedCheckpoints(
110-
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
111-
}else{
112-
throw new RuntimeException("not support value of cleanup mode :" + cleanupModeStr);
113-
}
104+
if ("true".equalsIgnoreCase(cleanupModeStr)){
105+
env.getCheckpointConfig().enableExternalizedCheckpoints(
106+
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
107+
} else if("false".equalsIgnoreCase(cleanupModeStr) || cleanupModeStr == null){
108+
env.getCheckpointConfig().enableExternalizedCheckpoints(
109+
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
110+
} else{
111+
throw new RuntimeException("not support value of cleanup mode :" + cleanupModeStr);
114112
}
115113

116114
String backendPath = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_DATAURI_KEY);

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class HbaseOutputFormat extends MetricOutputFormat {
6464
public final SimpleDateFormat ROWKEY_DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss");
6565
public final SimpleDateFormat FIELD_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
6666

67+
private static int rowLenth = 1000;
68+
6769
@Override
6870
public void configure(Configuration parameters) {
6971
LOG.warn("---configure---");
@@ -133,6 +135,10 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
133135
}
134136

135137
table.put(put);
138+
139+
if (outRecords.getCount()%rowLenth == 0){
140+
LOG.info(record.toString());
141+
}
136142
outRecords.inc();
137143

138144
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6262

6363
private static final long serialVersionUID = 2385115520960444192L;
6464

65+
private static int rowLenth = 1000;
66+
6567
private final ObjectMapper objectMapper = new ObjectMapper();
6668

6769
/** Type information describing the result type. */
@@ -109,10 +111,14 @@ public Row deserialize(byte[] message) throws IOException {
109111
}
110112

111113
try {
114+
JsonNode root = objectMapper.readTree(message);
115+
if (numInRecord.getCount()%rowLenth == 0){
116+
LOG.info(root.toString());
117+
}
118+
112119
numInRecord.inc();
113120
numInBytes.inc(message.length);
114121

115-
JsonNode root = objectMapper.readTree(message);
116122
parseTree(root, null);
117123
Row row = new Row(fieldNames.length);
118124

@@ -137,6 +143,9 @@ public Row deserialize(byte[] message) throws IOException {
137143
return row;
138144
} catch (Throwable t) {
139145
//add metric of dirty data
146+
if (dirtyDataCounter.getCount()%rowLenth == 0){
147+
LOG.info("dirtyData: " + new String(message));
148+
}
140149
dirtyDataCounter.inc();
141150
return null;
142151
}finally {

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.dtstack.flink.sql.table.TableInfo;
2626
import com.dtstack.flink.sql.util.ClassUtil;
2727
import com.dtstack.flink.sql.util.MathUtil;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2830

2931
import java.util.Map;
3032
import java.util.regex.Matcher;
@@ -39,6 +41,8 @@
3941

4042
public class KafkaSourceParser extends AbsSourceParser {
4143

44+
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceParser.class);
45+
4246
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4347

4448
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
@@ -59,6 +63,9 @@ static void dealNestField(Matcher matcher, TableInfo tableInfo) {
5963
tableInfo.addField(mappingField);
6064
tableInfo.addFieldClass(fieldClass);
6165
tableInfo.addFieldType(fieldType);
66+
if(LOG.isInfoEnabled()){
67+
LOG.info(physicalField + "--->" + mappingField + " Class: " + fieldClass.toString());
68+
}
6269
}
6370

6471
@Override

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6262

6363
private static final long serialVersionUID = 2385115520960444192L;
6464

65+
private static int rowLenth = 1000;
66+
6567
private final ObjectMapper objectMapper = new ObjectMapper();
6668

6769
/** Type information describing the result type. */
@@ -108,10 +110,15 @@ public Row deserialize(byte[] message) throws IOException {
108110
}
109111

110112
try {
113+
JsonNode root = objectMapper.readTree(message);
114+
115+
if (numInRecord.getCount()%rowLenth == 0){
116+
LOG.info(root.toString());
117+
}
118+
111119
numInRecord.inc();
112120
numInBytes.inc(message.length);
113121

114-
JsonNode root = objectMapper.readTree(message);
115122
parseTree(root, null);
116123
Row row = new Row(fieldNames.length);
117124

@@ -136,6 +143,9 @@ public Row deserialize(byte[] message) throws IOException {
136143
return row;
137144
} catch (Throwable t) {
138145
//add metric of dirty data
146+
if (dirtyDataCounter.getCount()%rowLenth == 0){
147+
LOG.info("dirtyData: " + new String(message));
148+
}
139149
dirtyDataCounter.inc();
140150
return null;
141151
}finally {

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.dtstack.flink.sql.table.TableInfo;
2525
import com.dtstack.flink.sql.util.ClassUtil;
2626
import com.dtstack.flink.sql.util.MathUtil;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729

2830
import java.util.Map;
2931
import java.util.regex.Matcher;
@@ -38,6 +40,8 @@
3840

3941
public class KafkaSourceParser extends AbsSourceParser {
4042

43+
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceParser.class);
44+
4145
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4246

4347
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
@@ -63,6 +67,9 @@ static void dealNestField(Matcher matcher, TableInfo tableInfo) {
6367
tableInfo.addField(mappingField);
6468
tableInfo.addFieldClass(fieldClass);
6569
tableInfo.addFieldType(fieldType);
70+
if(LOG.isInfoEnabled()){
71+
LOG.info(physicalField + "--->" + mappingField + " Class: " + fieldClass.toString());
72+
}
6673
}
6774

6875
@Override

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6464

6565
private static final long serialVersionUID = 2385115520960444192L;
6666

67+
private static int rowLenth = 1000;
68+
6769
private final ObjectMapper objectMapper = new ObjectMapper();
6870

6971
/** Type information describing the result type. */
@@ -111,10 +113,15 @@ public Row deserialize(byte[] message) throws IOException {
111113
}
112114

113115
try {
116+
JsonNode root = objectMapper.readTree(message);
117+
118+
if (numInRecord.getCount()%rowLenth == 0){
119+
LOG.info(root.toString());
120+
}
121+
114122
numInRecord.inc();
115123
numInBytes.inc(message.length);
116124

117-
JsonNode root = objectMapper.readTree(message);
118125
parseTree(root, null);
119126
Row row = new Row(fieldNames.length);
120127

@@ -139,6 +146,9 @@ public Row deserialize(byte[] message) throws IOException {
139146
return row;
140147
} catch (Throwable t) {
141148
//add metric of dirty data
149+
if (dirtyDataCounter.getCount()%rowLenth == 0){
150+
LOG.info("dirtyData: " + new String(message));
151+
}
142152
dirtyDataCounter.inc();
143153
return null;
144154
}finally {

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.dtstack.flink.sql.table.TableInfo;
2525
import com.dtstack.flink.sql.util.ClassUtil;
2626
import com.dtstack.flink.sql.util.MathUtil;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729

2830
import java.util.Map;
2931
import java.util.regex.Matcher;
@@ -38,6 +40,8 @@
3840

3941
public class KafkaSourceParser extends AbsSourceParser {
4042

43+
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceParser.class);
44+
4145
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4246

4347
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
@@ -63,6 +67,9 @@ static void dealNestField(Matcher matcher, TableInfo tableInfo) {
6367
tableInfo.addField(mappingField);
6468
tableInfo.addFieldClass(fieldClass);
6569
tableInfo.addFieldType(fieldType);
70+
if(LOG.isInfoEnabled()){
71+
LOG.info(physicalField + "--->" + mappingField + " Class: " + fieldClass.toString());
72+
}
6673
}
6774

6875
@Override

0 commit comments

Comments
 (0)