Skip to content

Commit a054ed0

Browse files
committed
拿kafka10-source 做示范,对deseralizeSchema做优化
1 parent a2e3300 commit a054ed0

File tree

13 files changed

+273
-64
lines changed

13 files changed

+273
-64
lines changed

core/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,24 @@
116116
<version>${flink.version}</version>
117117
</dependency>
118118

119+
<dependency>
120+
<groupId>org.apache.flink</groupId>
121+
<artifactId>flink-avro</artifactId>
122+
<version>${flink.version}</version>
123+
</dependency>
124+
125+
<dependency>
126+
<groupId>org.apache.flink</groupId>
127+
<artifactId>flink-csv</artifactId>
128+
<version>${flink.version}</version>
129+
</dependency>
130+
131+
<dependency>
132+
<groupId>org.apache.flink</groupId>
133+
<artifactId>flink-json</artifactId>
134+
<version>${flink.version}</version>
135+
</dependency>
136+
119137

120138
</dependencies>
121139

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.format;
20+
21+
import com.dtstack.flink.sql.metric.MetricConstant;
22+
import org.apache.flink.api.common.functions.RuntimeContext;
23+
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
24+
import org.apache.flink.api.common.serialization.DeserializationSchema;
25+
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.metrics.Counter;
27+
import org.apache.flink.metrics.Meter;
28+
import org.apache.flink.metrics.MeterView;
29+
import org.apache.flink.types.Row;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.io.IOException;
34+
35+
/**
36+
* add metric for source
37+
* <p>
38+
* company: www.dtstack.com
39+
* author: toutian
40+
* create: 2019/12/24
41+
*/
42+
public class DeserializationMetricWrapper extends AbstractDeserializationSchema<Row> {
43+
44+
private static final Logger LOG = LoggerFactory.getLogger(DeserializationMetricWrapper.class);
45+
46+
private static int dataPrintFrequency = 1000;
47+
48+
private transient DeserializationSchema<Row> deserializationSchema;
49+
50+
private transient RuntimeContext runtimeContext;
51+
52+
protected transient Counter dirtyDataCounter;
53+
54+
/**
55+
* tps ransactions Per Second
56+
*/
57+
protected transient Counter numInRecord;
58+
59+
protected transient Meter numInRate;
60+
61+
/**
62+
* rps Record Per Second: deserialize data and out record num
63+
*/
64+
protected transient Counter numInResolveRecord;
65+
66+
protected transient Meter numInResolveRate;
67+
68+
protected transient Counter numInBytes;
69+
70+
protected transient Meter numInBytesRate;
71+
72+
public DeserializationMetricWrapper(TypeInformation<Row> typeInfo, DeserializationSchema<Row> deserializationSchema) {
73+
super(typeInfo);
74+
this.deserializationSchema = deserializationSchema;
75+
}
76+
77+
public void initMetric() {
78+
dirtyDataCounter = runtimeContext.getMetricGroup().counter(MetricConstant.DT_DIRTY_DATA_COUNTER);
79+
80+
numInRecord = runtimeContext.getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_IN_COUNTER);
81+
numInRate = runtimeContext.getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_IN_RATE, new MeterView(numInRecord, 20));
82+
83+
numInBytes = runtimeContext.getMetricGroup().counter(MetricConstant.DT_NUM_BYTES_IN_COUNTER);
84+
numInBytesRate = runtimeContext.getMetricGroup().meter(MetricConstant.DT_NUM_BYTES_IN_RATE, new MeterView(numInBytes, 20));
85+
86+
numInResolveRecord = runtimeContext.getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_RESOVED_IN_COUNTER);
87+
numInResolveRate = runtimeContext.getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_RESOVED_IN_RATE, new MeterView(numInResolveRecord, 20));
88+
}
89+
90+
@Override
91+
public Row deserialize(byte[] message) throws IOException {
92+
try {
93+
if (numInRecord.getCount() % dataPrintFrequency == 0) {
94+
LOG.info("receive source data:" + new String(message, "UTF-8"));
95+
}
96+
numInRecord.inc();
97+
numInBytes.inc(message.length);
98+
beforeDeserialize();
99+
Row row = deserializationSchema.deserialize(message);
100+
afterDeserialize();
101+
numInResolveRecord.inc();
102+
return row;
103+
} catch (Exception e) {
104+
//add metric of dirty data
105+
if (dirtyDataCounter.getCount() % dataPrintFrequency == 0) {
106+
LOG.info("dirtyData: " + new String(message));
107+
LOG.error("data parse error", e);
108+
}
109+
dirtyDataCounter.inc();
110+
return null;
111+
}
112+
}
113+
114+
protected void beforeDeserialize() throws IOException {
115+
}
116+
117+
protected void afterDeserialize() throws IOException {
118+
}
119+
120+
public RuntimeContext getRuntimeContext() {
121+
return runtimeContext;
122+
}
123+
124+
public void setRuntimeContext(RuntimeContext runtimeContext) {
125+
this.runtimeContext = runtimeContext;
126+
}
127+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.dtstack.flink.sql.format;
2+
3+
/**
4+
* company: www.dtstack.com
5+
* author: toutian
6+
* create: 2019/12/24
7+
*/
8+
public enum FormatType {
9+
JSON, AVRO, CSV
10+
}

kafka08/kafka08-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
100100
}
101101
} else {
102102
if (topicIsPattern) {
103-
kafkaSrc = new CustomerCommonConsumer(Pattern.compile(topicName), new com.dtstack.flink.sql.source.kafka.deserialization.CustomerCommonDeserialization(), props);
103+
kafkaSrc = new CustomerCommonConsumer(Pattern.compile(topicName), new CustomerCommonDeserialization(), props);
104104
} else {
105105
kafkaSrc = new CustomerCommonConsumer(topicName, new CustomerCommonDeserialization(), props);
106106
}

kafka08/kafka08-source/src/main/java/com/dtstack/flink/sql/source/kafka/consumer/CustomerCommonConsumer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package com.dtstack.flink.sql.source.kafka.consumer;
1919

20+
import com.dtstack.flink.sql.format.AbsDeserialization;
2021
import com.dtstack.flink.sql.source.kafka.deserialization.CustomerCommonDeserialization;
2122
import org.apache.flink.streaming.api.functions.source.SourceFunction;
2223
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
@@ -39,12 +40,12 @@ public class CustomerCommonConsumer extends FlinkKafkaConsumer08<Row> {
3940
private CustomerCommonDeserialization customerCommonDeserialization;
4041

4142

42-
public CustomerCommonConsumer(String topic, KeyedDeserializationSchema<Row> deserializer, Properties props) {
43+
public CustomerCommonConsumer(String topic, AbsDeserialization<Row> deserializer, Properties props) {
4344
super(topic, deserializer, props);
4445
this.customerCommonDeserialization= (CustomerCommonDeserialization) deserializer;
4546
}
4647

47-
public CustomerCommonConsumer(Pattern subscriptionPattern, KeyedDeserializationSchema<Row> deserializer, Properties props) {
48+
public CustomerCommonConsumer(Pattern subscriptionPattern, AbsDeserialization<Row> deserializer, Properties props) {
4849
super(subscriptionPattern, deserializer, props);
4950
this.customerCommonDeserialization= (CustomerCommonDeserialization) deserializer;
5051
}

kafka08/kafka08-source/src/main/java/com/dtstack/flink/sql/source/kafka/consumer/CustomerCsvConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class CustomerCsvConsumer extends FlinkKafkaConsumer08<Row> {
4343
private CustomerCsvDeserialization customerCsvDeserialization;
4444

4545
public CustomerCsvConsumer(String topic, AbsDeserialization<Row> valueDeserializer, Properties props) {
46-
super(Arrays.asList(topic.split(",")), valueDeserializer, props);
46+
super(topic, valueDeserializer, props);
4747
this.customerCsvDeserialization = (CustomerCsvDeserialization) valueDeserializer;
4848
}
4949

kafka08/kafka08-source/src/main/java/com/dtstack/flink/sql/source/kafka/consumer/CustomerJsonConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class CustomerJsonConsumer extends FlinkKafkaConsumer08<Row> {
4343
private CustomerJsonDeserialization customerJsonDeserialization;
4444

4545
public CustomerJsonConsumer(String topic, AbsDeserialization<Row> valueDeserializer, Properties props) {
46-
super(Arrays.asList(topic.split(",")), valueDeserializer, props);
46+
super(topic, valueDeserializer, props);
4747
this.customerJsonDeserialization = (CustomerJsonDeserialization) valueDeserializer;
4848
}
4949

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerCsvSerialization.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515

1616
import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.writeNullMask;
1717
/**
18-
*
1918
* Date: 2018/12/18
2019
* Company: www.dtstack.com
21-
*
2220
* @author DocLi
2321
*
2422
* @modifyer maqi

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerKafka010Consumer.java renamed to kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerKafkaConsumer010.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.source.kafka;
2020

21-
import com.dtstack.flink.sql.format.AbsDeserialization;
21+
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
2222
import org.apache.flink.metrics.MetricGroup;
2323
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
2424
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -35,39 +35,42 @@
3535
import java.util.Properties;
3636
import java.util.regex.Pattern;
3737

38+
3839
/**
3940
* Reason:
4041
* Date: 2018/10/19
4142
* Company: www.dtstack.com
43+
*
4244
* @author xuchao
4345
*/
4446

45-
public class CustomerKafka010Consumer extends FlinkKafkaConsumer010<Row> {
47+
public class CustomerKafkaConsumer010 extends FlinkKafkaConsumer010<Row> {
4648

4749
private static final long serialVersionUID = 4873757508981691375L;
4850

49-
private CustomerJsonDeserialization customerJsonDeserialization;
51+
private DeserializationMetricWrapper deserializationMetricWrapper;
5052

51-
public CustomerKafka010Consumer(String topic, AbsDeserialization valueDeserializer, Properties props) {
52-
super(Arrays.asList(topic.split(",")), valueDeserializer, props);
53-
this.customerJsonDeserialization = (CustomerJsonDeserialization) valueDeserializer;
53+
public CustomerKafkaConsumer010(String topic, DeserializationMetricWrapper deserializationMetricWrapper, Properties props) {
54+
super(Arrays.asList(topic.split(",")), deserializationMetricWrapper, props);
55+
this.deserializationMetricWrapper = deserializationMetricWrapper;
5456
}
5557

56-
public CustomerKafka010Consumer(Pattern subscriptionPattern, AbsDeserialization<Row> valueDeserializer, Properties props) {
57-
super(subscriptionPattern, valueDeserializer, props);
58-
this.customerJsonDeserialization = (CustomerJsonDeserialization) valueDeserializer;
58+
public CustomerKafkaConsumer010(Pattern subscriptionPattern, DeserializationMetricWrapper deserializationMetricWrapper, Properties props) {
59+
super(subscriptionPattern, deserializationMetricWrapper, props);
60+
this.deserializationMetricWrapper = deserializationMetricWrapper;
5961
}
62+
6063
@Override
6164
public void run(SourceContext<Row> sourceContext) throws Exception {
62-
customerJsonDeserialization.setRuntimeContext(getRuntimeContext());
63-
customerJsonDeserialization.initMetric();
65+
deserializationMetricWrapper.setRuntimeContext(getRuntimeContext());
66+
deserializationMetricWrapper.initMetric();
6467
super.run(sourceContext);
6568
}
6669

6770
@Override
6871
protected AbstractFetcher<Row, ?> createFetcher(SourceContext<Row> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
6972
AbstractFetcher<Row, ?> fetcher = super.createFetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext, offsetCommitMode, consumerMetricGroup, useMetrics);
70-
customerJsonDeserialization.setFetcher(fetcher);
73+
((KafkaDeserializationMetricWrapper) deserializationMetricWrapper).setFetcher(fetcher);
7174
return fetcher;
7275
}
7376

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.dtstack.flink.sql.source.kafka;
2+
3+
import com.dtstack.flink.sql.format.FormatType;
4+
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
5+
import org.apache.flink.api.common.serialization.DeserializationSchema;
6+
import org.apache.flink.api.common.typeinfo.TypeInformation;
7+
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
8+
import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
9+
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
10+
import org.apache.flink.types.Row;
11+
12+
import java.util.Properties;
13+
import java.util.regex.Pattern;
14+
15+
/**
16+
* company: www.dtstack.com
17+
* author: toutian
18+
* create: 2019/12/24
19+
*/
20+
public class KafkaConsumer010Factory {
21+
22+
public static CustomerKafkaConsumer010 createKafkaTableSource(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation, Properties props) {
23+
DeserializationSchema<Row> deserializationSchema = null;
24+
if (FormatType.JSON.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
25+
deserializationSchema = new JsonRowDeserializationSchema(typeInformation);
26+
} else if (FormatType.CSV.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
27+
final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInformation);
28+
deserSchemaBuilder.setFieldDelimiter(kafkaSourceTableInfo.getFieldDelimiter().toCharArray()[0]);
29+
deserializationSchema = deserSchemaBuilder.build();
30+
} else if (FormatType.AVRO.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
31+
deserializationSchema = new AvroRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
32+
}
33+
34+
if (null == deserializationSchema) {
35+
throw new UnsupportedOperationException("FormatType:" + kafkaSourceTableInfo.getSourceDataType());
36+
}
37+
38+
CustomerKafkaConsumer010 kafkaSrc = null;
39+
if (kafkaSourceTableInfo.getTopicIsPattern()) {
40+
kafkaSrc = new CustomerKafkaConsumer010(Pattern.compile(kafkaSourceTableInfo.getTopic()), new KafkaDeserializationMetricWrapper(typeInformation, deserializationSchema), props);
41+
} else {
42+
kafkaSrc = new CustomerKafkaConsumer010(kafkaSourceTableInfo.getTopic(), new KafkaDeserializationMetricWrapper(typeInformation, deserializationSchema), props);
43+
}
44+
return kafkaSrc;
45+
}
46+
47+
}

0 commit comments

Comments
 (0)