Skip to content

Commit 916da8b

Browse files
committed
fix kafka cvs pattern error and remove kafka08 monitor
1 parent 8c75469 commit 916da8b

File tree

10 files changed

+29
-47
lines changed

10 files changed

+29
-47
lines changed

kafka08/kafka08-source/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<dependency>
2121
<groupId>org.apache.flink</groupId>
2222
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
23-
<version>1.6.0</version>
23+
<version>${flink.version}</version>
2424
</dependency>
2525
</dependencies>
2626

kafka08/kafka08-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,19 +90,19 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
9090
}
9191
} else if ("csv".equalsIgnoreCase(kafka08SourceTableInfo.getSourceDataType())) {
9292
if (topicIsPattern) {
93-
kafkaSrc = new CustomerCsvConsumer(topicName,
93+
kafkaSrc = new CustomerCsvConsumer(Pattern.compile(topicName),
9494
new com.dtstack.flink.sql.source.kafka.deserialization.CustomerCsvDeserialization(typeInformation,
9595
kafka08SourceTableInfo.getFieldDelimiter(), kafka08SourceTableInfo.getLengthCheckPolicy()), props);
9696
} else {
97-
kafkaSrc = new CustomerCsvConsumer(Pattern.compile(topicName),
97+
kafkaSrc = new CustomerCsvConsumer(topicName,
9898
new CustomerCsvDeserialization(typeInformation,
9999
kafka08SourceTableInfo.getFieldDelimiter(), kafka08SourceTableInfo.getLengthCheckPolicy()), props);
100100
}
101101
} else {
102102
if (topicIsPattern) {
103-
kafkaSrc = new CustomerCommonConsumer(topicName, new com.dtstack.flink.sql.source.kafka.deserialization.CustomerCommonDeserialization(), props);
103+
kafkaSrc = new CustomerCommonConsumer(Pattern.compile(topicName), new com.dtstack.flink.sql.source.kafka.deserialization.CustomerCommonDeserialization(), props);
104104
} else {
105-
kafkaSrc = new CustomerCommonConsumer(Pattern.compile(topicName), new CustomerCommonDeserialization(), props);
105+
kafkaSrc = new CustomerCommonConsumer(topicName, new CustomerCommonDeserialization(), props);
106106
}
107107
}
108108

kafka08/kafka08-source/src/main/java/com/dtstack/flink/sql/source/kafka/consumer/CustomerCsvConsumer.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,11 @@
2020

2121
import com.dtstack.flink.sql.source.AbsDeserialization;
2222
import com.dtstack.flink.sql.source.kafka.deserialization.CustomerCsvDeserialization;
23-
import org.apache.flink.metrics.MetricGroup;
24-
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
25-
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
2623
import org.apache.flink.streaming.api.functions.source.SourceFunction;
27-
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
2824
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
29-
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
30-
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
31-
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3225
import org.apache.flink.types.Row;
33-
import org.apache.flink.util.SerializedValue;
3426

3527
import java.util.Arrays;
36-
import java.util.Map;
3728
import java.util.Properties;
3829
import java.util.regex.Pattern;
3930

kafka08/kafka08-source/src/main/java/com/dtstack/flink/sql/source/kafka/consumer/CustomerJsonConsumer.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,11 @@
2020

2121
import com.dtstack.flink.sql.source.AbsDeserialization;
2222
import com.dtstack.flink.sql.source.kafka.deserialization.CustomerJsonDeserialization;
23-
import org.apache.flink.metrics.MetricGroup;
24-
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
25-
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
2623
import org.apache.flink.streaming.api.functions.source.SourceFunction;
27-
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
2824
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
29-
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
30-
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
31-
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3225
import org.apache.flink.types.Row;
33-
import org.apache.flink.util.SerializedValue;
3426

3527
import java.util.Arrays;
36-
import java.util.Map;
3728
import java.util.Properties;
3829
import java.util.regex.Pattern;
3930

kafka08/kafka08-source/src/main/java/com/dtstack/flink/sql/source/kafka/deserialization/CustomerCommonDeserialization.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ public class CustomerCommonDeserialization extends AbsDeserialization<Row> imple
4848
@Override
4949
public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
5050

51-
numInRecord.inc();
52-
numInBytes.inc(message.length);
53-
numInBytes.inc(messageKey.length);
51+
//numInRecord.inc();
52+
//numInBytes.inc(message.length);
53+
//numInBytes.inc(messageKey.length);
5454

5555
try {
5656
Row row = Row.of(
@@ -63,7 +63,7 @@ public Row deserialize(byte[] messageKey, byte[] message, String topic, int part
6363
return row;
6464
} catch (Throwable t) {
6565
LOG.error(t.getMessage());
66-
dirtyDataCounter.inc();
66+
// dirtyDataCounter.inc();
6767
return null;
6868
}
6969
}

kafka08/kafka08-source/src/main/java/com/dtstack/flink/sql/source/kafka/deserialization/CustomerCsvDeserialization.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ public CustomerCsvDeserialization(TypeInformation<Row> typeInfo, String fieldDel
8080
public Row deserialize(byte[] message) throws IOException {
8181

8282
try {
83-
numInRecord.inc();
84-
numInBytes.inc(message.length);
83+
//numInRecord.inc();
84+
//numInBytes.inc(message.length);
8585
String[] fieldsList = null;
8686
if (message != null && message.length > 0){
8787
fieldsList = new String(message).split(fieldDelimiter);
@@ -103,11 +103,11 @@ public Row deserialize(byte[] message) throws IOException {
103103
}
104104
}
105105

106-
numInResolveRecord.inc();
106+
//numInResolveRecord.inc();
107107
return row;
108108
} catch (Throwable t) {
109109
//add metric of dirty data
110-
dirtyDataCounter.inc();
110+
//dirtyDataCounter.inc();
111111
throw new RuntimeException(t);
112112
}
113113
}

kafka08/kafka08-source/src/main/java/com/dtstack/flink/sql/source/kafka/deserialization/CustomerJsonDeserialization.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ public CustomerJsonDeserialization(TypeInformation<Row> typeInfo){
8080
public Row deserialize(byte[] message) throws IOException {
8181

8282
try {
83-
numInRecord.inc();
84-
numInBytes.inc(message.length);
83+
// numInRecord.inc();
84+
// numInBytes.inc(message.length);
8585

8686
JsonNode root = objectMapper.readTree(message);
8787
Row row = new Row(fieldNames.length);
@@ -102,12 +102,12 @@ public Row deserialize(byte[] message) throws IOException {
102102
}
103103
}
104104

105-
numInResolveRecord.inc();
105+
// numInResolveRecord.inc();
106106
return row;
107107
} catch (Throwable t) {
108108
//add metric of dirty data
109109
LOG.error(t.getMessage());
110-
dirtyDataCounter.inc();
110+
// dirtyDataCounter.inc();
111111
return null;
112112
}
113113
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,19 +100,19 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
100100
}
101101
} else if ("csv".equalsIgnoreCase(kafka09SourceTableInfo.getSourceDataType())) {
102102
if (topicIsPattern) {
103-
kafkaSrc = new CustomerCsvConsumer(topicName,
103+
kafkaSrc = new CustomerCsvConsumer(Pattern.compile(topicName),
104104
new CustomerCsvDeserialization(typeInformation,
105105
kafka09SourceTableInfo.getFieldDelimiter(), kafka09SourceTableInfo.getLengthCheckPolicy()), props);
106106
} else {
107-
kafkaSrc = new CustomerCsvConsumer(Pattern.compile(topicName),
107+
kafkaSrc = new CustomerCsvConsumer(topicName,
108108
new CustomerCsvDeserialization(typeInformation,
109109
kafka09SourceTableInfo.getFieldDelimiter(), kafka09SourceTableInfo.getLengthCheckPolicy()), props);
110110
}
111111
} else {
112112
if (topicIsPattern) {
113-
kafkaSrc = new CustomerCommonConsumer(topicName, new CustomerCommonDeserialization(), props);
114-
} else {
115113
kafkaSrc = new CustomerCommonConsumer(Pattern.compile(topicName), new CustomerCommonDeserialization(), props);
114+
} else {
115+
kafkaSrc = new CustomerCommonConsumer(topicName, new CustomerCommonDeserialization(), props);
116116
}
117117
}
118118

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,19 +103,19 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
103103
}
104104
} else if ("csv".equalsIgnoreCase(kafka010SourceTableInfo.getSourceDataType())) {
105105
if (topicIsPattern) {
106-
kafkaSrc = new CustomerCsvConsumer(topicName,
106+
kafkaSrc = new CustomerCsvConsumer(Pattern.compile(topicName),
107107
new CustomerCsvDeserialization(typeInformation,
108108
kafka010SourceTableInfo.getFieldDelimiter(), kafka010SourceTableInfo.getLengthCheckPolicy()), props);
109109
} else {
110-
kafkaSrc = new CustomerCsvConsumer(Pattern.compile(topicName),
110+
kafkaSrc = new CustomerCsvConsumer(topicName,
111111
new CustomerCsvDeserialization(typeInformation,
112112
kafka010SourceTableInfo.getFieldDelimiter(), kafka010SourceTableInfo.getLengthCheckPolicy()), props);
113113
}
114114
} else {
115115
if (topicIsPattern) {
116-
kafkaSrc = new CustomerCommonConsumer(topicName, new CustomerCommonDeserialization(), props);
117-
} else {
118116
kafkaSrc = new CustomerCommonConsumer(Pattern.compile(topicName), new CustomerCommonDeserialization(), props);
117+
} else {
118+
kafkaSrc = new CustomerCommonConsumer(topicName, new CustomerCommonDeserialization(), props);
119119
}
120120
}
121121

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,19 +100,19 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
100100
}
101101
} else if ("csv".equalsIgnoreCase(kafka011SourceTableInfo.getSourceDataType())) {
102102
if (topicIsPattern) {
103-
kafkaSrc = new CustomerCsvConsumer(topicName,
103+
kafkaSrc = new CustomerCsvConsumer(Pattern.compile(topicName),
104104
new CustomerCsvDeserialization(typeInformation,
105105
kafka011SourceTableInfo.getFieldDelimiter(), kafka011SourceTableInfo.getLengthCheckPolicy()), props);
106106
} else {
107-
kafkaSrc = new CustomerCsvConsumer(Pattern.compile(topicName),
107+
kafkaSrc = new CustomerCsvConsumer(topicName,
108108
new CustomerCsvDeserialization(typeInformation,
109109
kafka011SourceTableInfo.getFieldDelimiter(), kafka011SourceTableInfo.getLengthCheckPolicy()), props);
110110
}
111111
} else {
112112
if (topicIsPattern) {
113-
kafkaSrc = new CustomerCommonConsumer(topicName, new CustomerCommonDeserialization(), props);
114-
} else {
115113
kafkaSrc = new CustomerCommonConsumer(Pattern.compile(topicName), new CustomerCommonDeserialization(), props);
114+
} else {
115+
kafkaSrc = new CustomerCommonConsumer(topicName, new CustomerCommonDeserialization(), props);
116116
}
117117
}
118118

0 commit comments

Comments
 (0)