Skip to content

Commit 5536ee7

Browse files
author
yanxi0227
committed
add log
1 parent de1176d commit 5536ee7

File tree

10 files changed

+57
-3
lines changed

10 files changed

+57
-3
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,7 @@ public static void main(String[] args) throws Exception {
173173
}
174174

175175
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
176-
if(LOG.isInfoEnabled()){
177-
LOG.info("exe-sql:\n" + result.getExecSql());
178-
}
176+
LOG.info("exe-sql:\n" + result.getExecSql());
179177

180178
boolean isSide = false;
181179

@@ -205,6 +203,7 @@ public static void main(String[] args) throws Exception {
205203
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
206204
}else{
207205
tableEnv.sqlUpdate(result.getExecSql());
206+
LOG.info("exec sql: " + result.getExecSql());
208207
}
209208
}
210209
}
@@ -287,6 +286,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
287286

288287
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
289288
tableEnv.registerTable(tableInfo.getName(), regTable);
289+
LOG.info("registe table {} success.", tableInfo.getName());
290290
registerTableCache.put(tableInfo.getName(), regTable);
291291
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
292292
} else if (tableInfo instanceof TargetTableInfo) {

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import org.apache.flink.table.api.Table;
5151
import org.apache.flink.table.api.java.StreamTableEnvironment;
5252
import org.apache.flink.types.Row;
53+
import org.slf4j.Logger;
54+
import org.slf4j.LoggerFactory;
5355

5456
import java.util.*;
5557

@@ -64,6 +66,8 @@
6466

6567
public class SideSqlExec {
6668

69+
private static final Logger LOG = LoggerFactory.getLogger(SideSqlExec.class);
70+
6771
private String localSqlPluginPath = null;
6872

6973
private String tmpFields = null;
@@ -102,6 +106,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
102106

103107
if(pollSqlNode.getKind() == INSERT){
104108
tableEnv.sqlUpdate(pollSqlNode.toString());
109+
LOG.info("exec sql: " + pollSqlNode.toString());
105110
}else if(pollSqlNode.getKind() == AS){
106111
AliasInfo aliasInfo = parseASNode(pollSqlNode);
107112
Table table = tableEnv.sql(aliasInfo.getName());
@@ -531,6 +536,7 @@ public void registerTmpTable(CreateTmpTableParser.SqlParserResult result,
531536
AliasInfo aliasInfo = parseASNode(pollSqlNode);
532537
Table table = tableEnv.sql(aliasInfo.getName());
533538
tableEnv.registerTable(aliasInfo.getAlias(), table);
539+
LOG.info("Register Table {} by {}", aliasInfo.getAlias(), aliasInfo.getName());
534540
localTableCache.put(aliasInfo.getAlias(), table);
535541
} else if (pollSqlNode.getKind() == SELECT){
536542
Table table = tableEnv.sqlQuery(pollObj.toString());

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class HbaseOutputFormat extends MetricOutputFormat {
6464
public final SimpleDateFormat ROWKEY_DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss");
6565
public final SimpleDateFormat FIELD_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
6666

67+
private static int rowLenth = 1000;
68+
6769
@Override
6870
public void configure(Configuration parameters) {
6971
LOG.warn("---configure---");
@@ -133,6 +135,10 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
133135
}
134136

135137
table.put(put);
138+
139+
if (outRecords.getCount()%rowLenth == 0){
140+
LOG.info(record.toString());
141+
}
136142
outRecords.inc();
137143

138144
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6262

6363
private static final long serialVersionUID = 2385115520960444192L;
6464

65+
private static int rowLenth = 1000;
66+
6567
private final ObjectMapper objectMapper = new ObjectMapper();
6668

6769
/** Type information describing the result type. */
@@ -113,6 +115,10 @@ public Row deserialize(byte[] message) throws IOException {
113115
numInBytes.inc(message.length);
114116

115117
JsonNode root = objectMapper.readTree(message);
118+
119+
if (numInResolveRecord.getCount()%rowLenth == 0){
120+
LOG.info(root.toString());
121+
}
116122
parseTree(root, null);
117123
Row row = new Row(fieldNames.length);
118124

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.dtstack.flink.sql.table.TableInfo;
2626
import com.dtstack.flink.sql.util.ClassUtil;
2727
import com.dtstack.flink.sql.util.MathUtil;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2830

2931
import java.util.Map;
3032
import java.util.regex.Matcher;
@@ -39,6 +41,8 @@
3941

4042
public class KafkaSourceParser extends AbsSourceParser {
4143

44+
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceParser.class);
45+
4246
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4347

4448
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
@@ -59,6 +63,7 @@ static void dealNestField(Matcher matcher, TableInfo tableInfo) {
5963
tableInfo.addField(mappingField);
6064
tableInfo.addFieldClass(fieldClass);
6165
tableInfo.addFieldType(fieldType);
66+
LOG.info(physicalField + "--->" + mappingField + " Class: " + fieldClass.toString());
6267
}
6368

6469
@Override

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6262

6363
private static final long serialVersionUID = 2385115520960444192L;
6464

65+
private static int rowLenth = 1000;
66+
6567
private final ObjectMapper objectMapper = new ObjectMapper();
6668

6769
/** Type information describing the result type. */
@@ -112,6 +114,10 @@ public Row deserialize(byte[] message) throws IOException {
112114
numInBytes.inc(message.length);
113115

114116
JsonNode root = objectMapper.readTree(message);
117+
118+
if (numInResolveRecord.getCount()%rowLenth == 0){
119+
LOG.info(root.toString());
120+
}
115121
parseTree(root, null);
116122
Row row = new Row(fieldNames.length);
117123

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6464

6565
private static final long serialVersionUID = 2385115520960444192L;
6666

67+
private static int rowLenth = 1000;
68+
6769
private final ObjectMapper objectMapper = new ObjectMapper();
6870

6971
/** Type information describing the result type. */
@@ -115,6 +117,10 @@ public Row deserialize(byte[] message) throws IOException {
115117
numInBytes.inc(message.length);
116118

117119
JsonNode root = objectMapper.readTree(message);
120+
121+
if (numInResolveRecord.getCount()%rowLenth == 0){
122+
LOG.info(root.toString());
123+
}
118124
parseTree(root, null);
119125
Row row = new Row(fieldNames.length);
120126

mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoOutputFormat.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class MongoOutputFormat extends MetricOutputFormat {
6363

6464
private static String PK = "_ID";
6565

66+
private static int rowLenth = 1000;
67+
6668
public final SimpleDateFormat ROWKEY_DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss");
6769

6870
@Override
@@ -107,6 +109,10 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
107109
} else {
108110
dbCollection.insertOne(doc);
109111
}
112+
113+
if (outRecords.getCount()%rowLenth == 0){
114+
LOG.info(record.toString());
115+
}
110116
outRecords.inc();
111117
}
112118

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
6161
private PreparedStatement upload;
6262

6363
private int batchCount = 0;
64+
private static int rowLenth = 1000;
6465

6566
//index field
6667
private Map<String, List<String>> realIndexes = Maps.newHashMap();
@@ -140,6 +141,9 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
140141
try {
141142
if (retract) {
142143
insertWrite(row);
144+
if (outRecords.getCount()%rowLenth == 0){
145+
LOG.info(row.toString());
146+
}
143147
outRecords.inc();
144148
} else {
145149
//do nothing

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,15 @@
2424
import org.apache.flink.api.java.tuple.Tuple2;
2525
import org.apache.flink.configuration.Configuration;
2626
import org.apache.flink.types.Row;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729
import redis.clients.jedis.*;
2830
import java.io.Closeable;
2931
import java.io.IOException;
3032
import java.util.*;
3133

3234
public class RedisOutputFormat extends MetricOutputFormat {
35+
private static final Logger LOG = LoggerFactory.getLogger(RedisOutputFormat.class);
3336

3437
private String url;
3538

@@ -65,6 +68,8 @@ public class RedisOutputFormat extends MetricOutputFormat {
6568

6669
private GenericObjectPoolConfig poolConfig;
6770

71+
private static int rowLenth = 1000;
72+
6873
private RedisOutputFormat(){
6974
}
7075
@Override
@@ -168,6 +173,10 @@ public void writeRecord(Tuple2 record) throws IOException {
168173
key.append(tableName).append(":").append(perKey).append(":").append(fieldNames[i]);
169174
jedis.set(key.toString(), row.getField(i).toString());
170175
}
176+
177+
if (outRecords.getCount()%rowLenth == 0){
178+
LOG.info(record.toString());
179+
}
171180
outRecords.inc();
172181
}
173182

0 commit comments

Comments
 (0)