Skip to content

Commit b259aa5

Browse files
committed
Merge remote-tracking branch 'origin/v1.8.0_dev' into v1.9.0_dev
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java
2 parents 6ad858d + b2e4085 commit b259aa5

File tree

15 files changed

+190
-115
lines changed

15 files changed

+190
-115
lines changed

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
6969

7070
List<String[]> data = new ArrayList<>();
7171
data.add(fieldNames);
72-
data.add(record.toString().split(","));
72+
String[] recordStr = new String[record.getArity()];
73+
for (int i=0; i < record.getArity(); i++) {
74+
recordStr[i] = (String.valueOf(record.getField(i)));
75+
}
76+
data.add(recordStr);
7377
TablePrintUtil.build(data).print();
7478

7579
outRecords.inc();

core/src/main/java/com/dtstack/flink/sql/metric/MetricConstant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class MetricConstant {
4747

4848
public static final String DT_NUM_DIRTY_RECORDS_OUT = "dtNumDirtyRecordsOut";
4949

50+
public static final String DT_NUM_SIDE_PARSE_ERROR_RECORDS = "dtNumSideParseErrorRecords";
51+
5052
public static final String DT_NUM_RECORDS_OUT_RATE = "dtNumRecordsOutRate";
5153

5254
public static final String DT_EVENT_DELAY_GAUGE = "dtEventDelay";

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,20 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.enums.ECacheType;
24+
import com.dtstack.flink.sql.metric.MetricConstant;
2425
import com.dtstack.flink.sql.side.cache.AbsSideCache;
2526
import com.dtstack.flink.sql.side.cache.CacheObj;
2627
import com.dtstack.flink.sql.side.cache.LRUSideCache;
2728
import org.apache.calcite.sql.JoinType;
2829
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.metrics.Counter;
2931
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3032
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3133
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
3234
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3335
import org.apache.flink.types.Row;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
3438

3539
import java.sql.Timestamp;
3640
import java.time.LocalDateTime;
@@ -47,25 +51,21 @@
4751
*/
4852

4953
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> implements ISideReqRow {
50-
54+
private static final Logger LOG = LoggerFactory.getLogger(AsyncReqRow.class);
5155
private static final long serialVersionUID = 2098635244857937717L;
5256

5357
protected SideInfo sideInfo;
58+
protected transient Counter parseErrorRecords;
5459

5560
public AsyncReqRow(SideInfo sideInfo){
5661
this.sideInfo = sideInfo;
5762
}
5863

5964
@Override
60-
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
61-
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
62-
try {
63-
if (null == future.get()) {
64-
new TimeoutException("Async function call has timed out.");
65-
}
66-
} catch (Exception e) {
67-
throw new Exception(e);
68-
}
65+
public void open(Configuration parameters) throws Exception {
66+
super.open(parameters);
67+
initCache();
68+
initMetric();
6969
}
7070

7171
private void initCache(){
@@ -85,6 +85,10 @@ private void initCache(){
8585
sideCache.initCache();
8686
}
8787

88+
private void initMetric() {
89+
parseErrorRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_SIDE_PARSE_ERROR_RECORDS);
90+
}
91+
8892

8993
protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
9094
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(index).getClass());
@@ -111,17 +115,41 @@ protected boolean openCache(){
111115
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
112116
if(sideInfo.getJoinType() == JoinType.LEFT){
113117
//Reserved left table data
114-
Row row = fillData(input, null);
115-
resultFuture.complete(Collections.singleton(row));
118+
try {
119+
Row row = fillData(input, null);
120+
resultFuture.complete(Collections.singleton(row));
121+
} catch (Exception e) {
122+
dealFillDataError(resultFuture, e, input);
123+
}
116124
}else{
117125
resultFuture.complete(null);
118126
}
119127
}
120128

129+
protected void dealCacheData(String key, CacheObj missKeyObj) {
130+
if (openCache()) {
131+
putCache(key, missKeyObj);
132+
}
133+
}
134+
121135
@Override
122-
public void open(Configuration parameters) throws Exception {
123-
super.open(parameters);
124-
initCache();
136+
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
137+
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
138+
try {
139+
if (null == future.get()) {
140+
resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));
141+
}
142+
} catch (Exception e) {
143+
resultFuture.completeExceptionally(new Exception(e));
144+
}
145+
}
146+
147+
148+
protected void dealFillDataError(ResultFuture<Row> resultFuture, Exception e, Object sourceData) {
149+
LOG.debug("source data {} join side table error ", sourceData);
150+
LOG.debug("async buid row error..{}", e);
151+
parseErrorRecords.inc();
152+
resultFuture.complete(Collections.emptyList());
125153
}
126154

127155
@Override

core/src/main/java/com/dtstack/flink/sql/source/JsonDataParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2828
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
2929
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
30+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
3031
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
3132
import org.apache.flink.types.Row;
3233

@@ -151,7 +152,13 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
151152
if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
152153
return node.asBoolean();
153154
} else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
154-
return node.asText();
155+
if (node instanceof ObjectNode) {
156+
return node.toString();
157+
} else if (node instanceof NullNode) {
158+
return null;
159+
} else {
160+
return node.asText();
161+
}
155162
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
156163
return Date.valueOf(node.asText());
157164
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
132132
dealMissKey(inputRow, resultFuture);
133133
return;
134134
}
135-
136135
refData.put(sideInfo.getEqualFieldList().get(i), equalObj);
137136
}
138137

@@ -146,12 +145,20 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
146145
dealMissKey(inputRow, resultFuture);
147146
return;
148147
}else if(ECacheContentType.SingleLine == val.getType()){
149-
Row row = fillData(inputRow, val);
150-
resultFuture.complete(Collections.singleton(row));
151-
}else if(ECacheContentType.MultiLine == val.getType()){
152-
for(Object one : (List)val.getContent()){
153-
Row row = fillData(inputRow, one);
148+
try {
149+
Row row = fillData(inputRow, val);
154150
resultFuture.complete(Collections.singleton(row));
151+
} catch (Exception e) {
152+
dealFillDataError(resultFuture, e, inputRow);
153+
}
154+
}else if(ECacheContentType.MultiLine == val.getType()){
155+
try {
156+
for(Object one : (List)val.getContent()){
157+
Row row = fillData(inputRow, one);
158+
resultFuture.complete(Collections.singleton(row));
159+
}
160+
} catch (Exception e) {
161+
dealFillDataError(resultFuture, e, inputRow);
155162
}
156163
}
157164
return;

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/AbsRowKeyModeDealer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,13 @@ public AbsRowKeyModeDealer(Map<String, String> colRefType, String[] colNames, HB
7474

7575
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
7676
if(joinType == JoinType.LEFT){
77-
//保留left 表数据
78-
Row row = fillData(input, null);
79-
resultFuture.complete(Collections.singleton(row));
77+
try {
78+
//保留left 表数据
79+
Row row = fillData(input, null);
80+
resultFuture.complete(Collections.singleton(row));
81+
} catch (Exception e) {
82+
resultFuture.completeExceptionally(e);
83+
}
8084
}else{
8185
resultFuture.complete(null);
8286
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -103,25 +103,28 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
103103
Object val = HbaseUtils.convertByte(keyValue.value(), colType);
104104
sideMap.put(mapKey, val);
105105
}
106-
107-
if (oneRow.size() > 0) {
108-
//The order of the fields defined in the data conversion table
109-
List<Object> sideVal = Lists.newArrayList();
110-
for (String key : colNames) {
111-
Object val = sideMap.get(key);
112-
if (val == null) {
113-
System.out.println("can't get data with column " + key);
114-
LOG.error("can't get data with column " + key);
106+
try {
107+
if (oneRow.size() > 0) {
108+
//The order of the fields defined in the data conversion table
109+
List<Object> sideVal = Lists.newArrayList();
110+
for (String key : colNames) {
111+
Object val = sideMap.get(key);
112+
if (val == null) {
113+
System.out.println("can't get data with column " + key);
114+
LOG.error("can't get data with column " + key);
115+
}
116+
117+
sideVal.add(val);
115118
}
116119

117-
sideVal.add(val);
118-
}
119-
120-
Row row = fillData(input, sideVal);
121-
if (openCache) {
122-
cacheContent.add(sideVal);
120+
Row row = fillData(input, sideVal);
121+
if (openCache) {
122+
cacheContent.add(sideVal);
123+
}
124+
rowList.add(row);
123125
}
124-
rowList.add(row);
126+
}catch (Exception e) {
127+
resultFuture.completeExceptionally(e);
125128
}
126129
} catch (Exception e) {
127130
resultFuture.complete(null);

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,29 @@ public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFu
7979
}
8080

8181
if(arg.size() > 0){
82-
//The order of the fields defined in the data conversion table
83-
List<Object> sideVal = Lists.newArrayList();
84-
for(String key : colNames){
85-
Object val = sideMap.get(key);
86-
if(val == null){
87-
System.out.println("can't get data with column " + key);
88-
LOG.error("can't get data with column " + key);
82+
try {
83+
//The order of the fields defined in the data conversion table
84+
List<Object> sideVal = Lists.newArrayList();
85+
for(String key : colNames){
86+
Object val = sideMap.get(key);
87+
if(val == null){
88+
System.out.println("can't get data with column " + key);
89+
LOG.error("can't get data with column " + key);
90+
}
91+
92+
sideVal.add(val);
8993
}
9094

91-
sideVal.add(val);
92-
}
93-
94-
Row row = fillData(input, sideVal);
95-
if(openCache){
96-
sideCache.putCache(rowKeyStr, CacheObj.buildCacheObj(ECacheContentType.SingleLine, row));
95+
Row row = fillData(input, sideVal);
96+
if(openCache){
97+
sideCache.putCache(rowKeyStr, CacheObj.buildCacheObj(ECacheContentType.SingleLine, row));
98+
}
99+
resultFuture.complete(Collections.singleton(row));
100+
} catch (Exception e) {
101+
resultFuture.completeExceptionally(e);
97102
}
98-
99-
resultFuture.complete(Collections.singleton(row));
100103
}else{
101-
102104
dealMissKey(input, resultFuture);
103-
104105
if(openCache){
105106
sideCache.putCache(rowKeyStr, CacheMissVal.getMissKeyObj());
106107
}

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,11 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
113113
serializationSchema
114114
);
115115

116-
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
117-
return record.f1;
118-
}).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
116+
DataStream<Row> ds = dataStream
117+
.filter((Tuple2<Boolean, Row> record) -> record.f0)
118+
.map((Tuple2<Boolean, Row> record) -> {return record.f1;})
119+
.returns(getOutputType().getTypeAt(1))
120+
.setParallelism(parallelism);
119121

120122
kafkaTableSink.emitDataStream(ds);
121123
}

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,11 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
117117
serializationSchema
118118
);
119119

120-
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
121-
return record.f1;
122-
}).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism);
120+
DataStream<Row> ds = dataStream
121+
.filter((Tuple2<Boolean, Row> record) -> record.f0)
122+
.map((Tuple2<Boolean, Row> record) -> {return record.f1;})
123+
.returns(getOutputType().getTypeAt(1))
124+
.setParallelism(parallelism);
123125

124126
kafkaTableSink.emitDataStream(ds);
125127
}

0 commit comments

Comments
 (0)