Skip to content

Commit 8f2336f

Browse files
committed
modify dirtyDataFrequency name
1 parent 632eda8 commit 8f2336f

File tree

3 files changed

+9
-9
lines changed

3 files changed

+9
-9
lines changed

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6767

6868
private static final long serialVersionUID = 2385115520960444192L;
6969

70-
private static int rowLenth = 1000;
70+
private static int dirtyDataFrequency = 1000;
7171

7272
private final ObjectMapper objectMapper = new ObjectMapper();
7373

@@ -117,7 +117,7 @@ public Row deserialize(byte[] message) throws IOException {
117117

118118
try {
119119
JsonNode root = objectMapper.readTree(message);
120-
if (numInRecord.getCount()%rowLenth == 0){
120+
if (numInRecord.getCount() % dirtyDataFrequency == 0) {
121121
LOG.info(root.toString());
122122
}
123123

@@ -148,7 +148,7 @@ public Row deserialize(byte[] message) throws IOException {
148148
return row;
149149
} catch (Exception e) {
150150
//add metric of dirty data
151-
if (dirtyDataCounter.getCount()%rowLenth == 0){
151+
if (dirtyDataCounter.getCount() % dirtyDataFrequency == 0) {
152152
LOG.info("dirtyData: " + new String(message));
153153
LOG.info(" " ,e);
154154
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6767

6868
private static final long serialVersionUID = 2385115520960444192L;
6969

70-
private static int rowLenth = 1000;
70+
private static int dirtyDataFrequency = 1000;
7171

7272
private final ObjectMapper objectMapper = new ObjectMapper();
7373

@@ -117,7 +117,7 @@ public Row deserialize(byte[] message) throws IOException {
117117
try {
118118
JsonNode root = objectMapper.readTree(message);
119119

120-
if (numInRecord.getCount()%rowLenth == 0){
120+
if (numInRecord.getCount() % dirtyDataFrequency == 0) {
121121
LOG.info(root.toString());
122122
}
123123

@@ -148,7 +148,7 @@ public Row deserialize(byte[] message) throws IOException {
148148
return row;
149149
} catch (Exception e) {
150150
//add metric of dirty data
151-
if (dirtyDataCounter.getCount()%rowLenth == 0){
151+
if (dirtyDataCounter.getCount() % dirtyDataFrequency == 0) {
152152
LOG.info("dirtyData: " + new String(message));
153153
LOG.error(" ", e);
154154
}

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
7070

7171
private static final long serialVersionUID = 2385115520960444192L;
7272

73-
private static int rowLenth = 1000;
73+
private static int dirtyDataFrequency = 1000;
7474

7575
private final ObjectMapper objectMapper = new ObjectMapper();
7676

@@ -121,7 +121,7 @@ public Row deserialize(byte[] message) throws IOException {
121121
try {
122122
JsonNode root = objectMapper.readTree(message);
123123

124-
if (numInRecord.getCount()%rowLenth == 0){
124+
if (numInRecord.getCount() % dirtyDataFrequency == 0) {
125125
LOG.info(root.toString());
126126
}
127127

@@ -153,7 +153,7 @@ public Row deserialize(byte[] message) throws IOException {
153153
return row;
154154
} catch (Exception e) {
155155
//add metric of dirty data
156-
if (dirtyDataCounter.getCount()%rowLenth == 0){
156+
if (dirtyDataCounter.getCount() % dirtyDataFrequency == 0) {
157157
LOG.info("dirtyData: " + new String(message));
158158
LOG.error("" , e);
159159
}

0 commit comments

Comments
 (0)