Skip to content

Commit 8c7704f

Browse files
committed
Merge remote-tracking branch 'origin/v1.5.0_dev' into 1.8_merge_1.5
# Conflicts: # kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java # kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java
2 parents 0c4ddc7 + 57b9d28 commit 8c7704f

File tree

6 files changed

+157
-39
lines changed

6 files changed

+157
-39
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,16 @@ private Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
109109
aliasInfo.setAlias(alias.toString());
110110

111111
return aliasInfo;
112+
113+
case UNION:
114+
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
115+
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
116+
117+
parseSql(unionLeft, sideTableSet, queueInfo);
118+
119+
parseSql(unionRight, sideTableSet, queueInfo);
120+
121+
break;
112122
}
113123

114124
return "";

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,16 @@ private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, St
306306
throw new RuntimeException("---not deal type:" + sqlNode);
307307
}
308308

309+
break;
310+
case UNION:
311+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
312+
313+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
314+
315+
replaceFieldName(unionLeft, mappingTable, targetTableName, tableAlias);
316+
317+
replaceFieldName(unionRight, mappingTable, targetTableName, tableAlias);
318+
309319
break;
310320
default:
311321
break;
@@ -439,39 +449,33 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
439449
return sqlIdentifier;
440450
}else if(selectNode.getKind() == LITERAL || selectNode.getKind() == LITERAL_CHAIN){//字面含义
441451
return selectNode;
442-
}else if(selectNode.getKind() == OTHER_FUNCTION
452+
}else if( AGGREGATE.contains(selectNode.getKind())
453+
|| AVG_AGG_FUNCTIONS.contains(selectNode.getKind())
454+
|| COMPARISON.contains(selectNode.getKind())
455+
|| selectNode.getKind() == OTHER_FUNCTION
443456
|| selectNode.getKind() == DIVIDE
444457
|| selectNode.getKind() == CAST
445-
|| selectNode.getKind() == SUM
446-
|| selectNode.getKind() == AVG
447-
|| selectNode.getKind() == MAX
448-
|| selectNode.getKind() == MIN
449458
|| selectNode.getKind() == TRIM
450459
|| selectNode.getKind() == TIMES
451460
|| selectNode.getKind() == PLUS
452-
|| selectNode.getKind() == IN
461+
|| selectNode.getKind() == NOT_IN
453462
|| selectNode.getKind() == OR
454463
|| selectNode.getKind() == AND
455-
|| selectNode.getKind() == COUNT
456-
|| selectNode.getKind() == SUM0
457-
|| selectNode.getKind() == LEAD
458-
|| selectNode.getKind() == LAG
459-
|| selectNode.getKind() == EQUALS
460-
|| selectNode.getKind() == NOT_EQUALS
461464
|| selectNode.getKind() == MINUS
462465
|| selectNode.getKind() == TUMBLE
463466
|| selectNode.getKind() == TUMBLE_START
464467
|| selectNode.getKind() == TUMBLE_END
465468
|| selectNode.getKind() == SESSION
466469
|| selectNode.getKind() == SESSION_START
467470
|| selectNode.getKind() == SESSION_END
471+
|| selectNode.getKind() == HOP
472+
|| selectNode.getKind() == HOP_START
473+
|| selectNode.getKind() == HOP_END
468474
|| selectNode.getKind() == BETWEEN
469475
|| selectNode.getKind() == IS_NULL
470476
|| selectNode.getKind() == IS_NOT_NULL
471-
|| selectNode.getKind() == LESS_THAN
472-
|| selectNode.getKind() == GREATER_THAN
473-
|| selectNode.getKind() == LESS_THAN_OR_EQUAL
474-
|| selectNode.getKind() == GREATER_THAN_OR_EQUAL
477+
|| selectNode.getKind() == CONTAINS
478+
475479
){
476480
SqlBasicCall sqlBasicCall = (SqlBasicCall) selectNode;
477481
for(int i=0; i<sqlBasicCall.getOperands().length; i++){

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import com.dtstack.flink.sql.source.AbsDeserialization;
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
27+
import org.apache.flink.api.common.typeinfo.Types;
2728
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2829
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2930
import org.apache.flink.metrics.MetricGroup;
3031
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3133
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3234
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3335
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
@@ -43,6 +45,9 @@
4345

4446
import java.io.IOException;
4547
import java.lang.reflect.Field;
48+
import java.sql.Date;
49+
import java.sql.Time;
50+
import java.sql.Timestamp;
4651
import java.util.Iterator;
4752
import java.util.Map;
4853
import java.util.Set;
@@ -62,7 +67,7 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6267

6368
private static final long serialVersionUID = 2385115520960444192L;
6469

65-
private static int rowLenth = 1000;
70+
private static int dirtyDataFrequency = 1000;
6671

6772
private final ObjectMapper objectMapper = new ObjectMapper();
6873

@@ -112,7 +117,7 @@ public Row deserialize(byte[] message) throws IOException {
112117

113118
try {
114119
JsonNode root = objectMapper.readTree(message);
115-
if (numInRecord.getCount()%rowLenth == 0){
120+
if (numInRecord.getCount() % dirtyDataFrequency == 0) {
116121
LOG.info(root.toString());
117122
}
118123

@@ -134,17 +139,18 @@ public Row deserialize(byte[] message) throws IOException {
134139
}
135140
} else {
136141
// Read the value as specified type
137-
Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
142+
Object value = convert(node, fieldTypes[i]);
138143
row.setField(i, value);
139144
}
140145
}
141146

142147
numInResolveRecord.inc();
143148
return row;
144-
} catch (Throwable t) {
149+
} catch (Exception e) {
145150
//add metric of dirty data
146-
if (dirtyDataCounter.getCount()%rowLenth == 0){
151+
if (dirtyDataCounter.getCount() % dirtyDataFrequency == 0) {
147152
LOG.info("dirtyData: " + new String(message));
153+
LOG.info(" " ,e);
148154
}
149155
dirtyDataCounter.inc();
150156
return null;
@@ -243,4 +249,31 @@ protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exceptio
243249
private static String partitionLagMetricName(TopicPartition tp) {
244250
return tp + ".records-lag";
245251
}
246-
}
252+
253+
private Object convert(JsonNode node, TypeInformation<?> info) {
254+
if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
255+
return node.asBoolean();
256+
} else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
257+
return node.asText();
258+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
259+
return Date.valueOf(node.asText());
260+
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
261+
// local zone
262+
return Time.valueOf(node.asText());
263+
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
264+
// local zone
265+
return Timestamp.valueOf(node.asText());
266+
} else {
267+
// for types that were specified without JSON schema
268+
// e.g. POJOs
269+
try {
270+
return objectMapper.treeToValue(node, info.getTypeClass());
271+
} catch (JsonProcessingException e) {
272+
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
273+
}
274+
}
275+
}
276+
277+
278+
279+
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.source.kafka;
2222

2323

2424
import com.dtstack.flink.sql.source.AbsDeserialization;
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
27+
import org.apache.flink.api.common.typeinfo.Types;
2728
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2829
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2930
import org.apache.flink.metrics.MetricGroup;
3031
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3133
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3234
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3335
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
@@ -43,6 +45,9 @@
4345

4446
import java.io.IOException;
4547
import java.lang.reflect.Field;
48+
import java.sql.Date;
49+
import java.sql.Time;
50+
import java.sql.Timestamp;
4651
import java.util.Iterator;
4752
import java.util.Map;
4853
import java.util.Set;
@@ -62,7 +67,7 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6267

6368
private static final long serialVersionUID = 2385115520960444192L;
6469

65-
private static int rowLenth = 1000;
70+
private static int dirtyDataFrequency = 1000;
6671

6772
private final ObjectMapper objectMapper = new ObjectMapper();
6873

@@ -112,7 +117,7 @@ public Row deserialize(byte[] message) throws IOException {
112117
try {
113118
JsonNode root = objectMapper.readTree(message);
114119

115-
if (numInRecord.getCount()%rowLenth == 0){
120+
if (numInRecord.getCount() % dirtyDataFrequency == 0) {
116121
LOG.info(root.toString());
117122
}
118123

@@ -134,17 +139,18 @@ public Row deserialize(byte[] message) throws IOException {
134139
}
135140
} else {
136141
// Read the value as specified type
137-
Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
142+
Object value = convert(node, fieldTypes[i]);
138143
row.setField(i, value);
139144
}
140145
}
141146

142147
numInResolveRecord.inc();
143148
return row;
144-
} catch (Throwable t) {
149+
} catch (Exception e) {
145150
//add metric of dirty data
146-
if (dirtyDataCounter.getCount()%rowLenth == 0){
151+
if (dirtyDataCounter.getCount() % dirtyDataFrequency == 0) {
147152
LOG.info("dirtyData: " + new String(message));
153+
LOG.error(" ", e);
148154
}
149155
dirtyDataCounter.inc();
150156
return null;
@@ -244,4 +250,28 @@ protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exceptio
244250
private static String partitionLagMetricName(TopicPartition tp) {
245251
return tp + ".records-lag";
246252
}
247-
}
253+
254+
private Object convert(JsonNode node, TypeInformation<?> info) {
255+
if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
256+
return node.asBoolean();
257+
} else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
258+
return node.asText();
259+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
260+
return Date.valueOf(node.asText());
261+
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
262+
// local zone
263+
return Time.valueOf(node.asText());
264+
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
265+
// local zone
266+
return Timestamp.valueOf(node.asText());
267+
} else {
268+
// for types that were specified without JSON schema
269+
// e.g. POJOs
270+
try {
271+
return objectMapper.treeToValue(node, info.getTypeClass());
272+
} catch (JsonProcessingException e) {
273+
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
274+
}
275+
}
276+
}
277+
}

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import com.dtstack.flink.sql.source.AbsDeserialization;
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
27+
import org.apache.flink.api.common.typeinfo.Types;
2728
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2829
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2930
import org.apache.flink.metrics.MetricGroup;
3031
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3133
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3234
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3335
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
@@ -42,7 +44,11 @@
4244
import org.slf4j.LoggerFactory;
4345

4446
import java.io.IOException;
47+
import java.lang.reflect.Array;
4548
import java.lang.reflect.Field;
49+
import java.sql.Date;
50+
import java.sql.Time;
51+
import java.sql.Timestamp;
4652
import java.util.Iterator;
4753
import java.util.Map;
4854
import java.util.Set;
@@ -64,7 +70,7 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6470

6571
private static final long serialVersionUID = 2385115520960444192L;
6672

67-
private static int rowLenth = 1000;
73+
private static int dirtyDataFrequency = 1000;
6874

6975
private final ObjectMapper objectMapper = new ObjectMapper();
7076

@@ -115,7 +121,7 @@ public Row deserialize(byte[] message) throws IOException {
115121
try {
116122
JsonNode root = objectMapper.readTree(message);
117123

118-
if (numInRecord.getCount()%rowLenth == 0){
124+
if (numInRecord.getCount() % dirtyDataFrequency == 0) {
119125
LOG.info(root.toString());
120126
}
121127

@@ -137,17 +143,19 @@ public Row deserialize(byte[] message) throws IOException {
137143
}
138144
} else {
139145
// Read the value as specified type
140-
Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
146+
147+
Object value = convert(node, fieldTypes[i]);
141148
row.setField(i, value);
142149
}
143150
}
144151

145152
numInResolveRecord.inc();
146153
return row;
147-
} catch (Throwable t) {
154+
} catch (Exception e) {
148155
//add metric of dirty data
149-
if (dirtyDataCounter.getCount()%rowLenth == 0){
156+
if (dirtyDataCounter.getCount() % dirtyDataFrequency == 0) {
150157
LOG.info("dirtyData: " + new String(message));
158+
LOG.error("" , e);
151159
}
152160
dirtyDataCounter.inc();
153161
return null;
@@ -245,4 +253,29 @@ protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exceptio
245253
private static String partitionLagMetricName(TopicPartition tp) {
246254
return tp + ".records-lag";
247255
}
256+
257+
private Object convert(JsonNode node, TypeInformation<?> info) {
258+
if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
259+
return node.asBoolean();
260+
} else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
261+
return node.asText();
262+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
263+
return Date.valueOf(node.asText());
264+
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
265+
// local zone
266+
return Time.valueOf(node.asText());
267+
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
268+
// local zone
269+
return Timestamp.valueOf(node.asText());
270+
} else {
271+
// for types that were specified without JSON schema
272+
// e.g. POJOs
273+
try {
274+
return objectMapper.treeToValue(node, info.getTypeClass());
275+
} catch (JsonProcessingException e) {
276+
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
277+
}
278+
}
279+
}
280+
248281
}

0 commit comments

Comments
 (0)