File tree Expand file tree Collapse file tree 3 files changed +24
-13
lines changed
kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka
kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka
kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka Expand file tree Collapse file tree 3 files changed +24
-13
lines changed Original file line number Diff line number Diff line change @@ -111,14 +111,14 @@ public Row deserialize(byte[] message) throws IOException {
111111 }
112112
113113 try {
114- numInRecord .inc ();
115- numInBytes .inc (message .length );
116-
117114 JsonNode root = objectMapper .readTree (message );
118-
119- if (numInResolveRecord .getCount ()%rowLenth == 0 ){
115+ if (numInRecord .getCount ()%rowLenth == 0 ){
120116 LOG .info (root .toString ());
121117 }
118+
119+ numInRecord .inc ();
120+ numInBytes .inc (message .length );
121+
122122 parseTree (root , null );
123123 Row row = new Row (fieldNames .length );
124124
@@ -143,6 +143,9 @@ public Row deserialize(byte[] message) throws IOException {
143143 return row ;
144144 } catch (Throwable t ) {
145145 //add metric of dirty data
146+ if (dirtyDataCounter .getCount ()%rowLenth == 0 ){
147+ LOG .info (objectMapper .readTree (message ).toString ());
148+ }
146149 dirtyDataCounter .inc ();
147150 return null ;
148151 }finally {
Original file line number Diff line number Diff line change @@ -110,14 +110,15 @@ public Row deserialize(byte[] message) throws IOException {
110110 }
111111
112112 try {
113- numInRecord .inc ();
114- numInBytes .inc (message .length );
115-
116113 JsonNode root = objectMapper .readTree (message );
117114
118- if (numInResolveRecord .getCount ()%rowLenth == 0 ){
115+ if (numInRecord .getCount ()%rowLenth == 0 ){
119116 LOG .info (root .toString ());
120117 }
118+
119+ numInRecord .inc ();
120+ numInBytes .inc (message .length );
121+
121122 parseTree (root , null );
122123 Row row = new Row (fieldNames .length );
123124
@@ -142,6 +143,9 @@ public Row deserialize(byte[] message) throws IOException {
142143 return row ;
143144 } catch (Throwable t ) {
144145 //add metric of dirty data
146+ if (dirtyDataCounter .getCount ()%rowLenth == 0 ){
147+ LOG .info (objectMapper .readTree (message ).toString ());
148+ }
145149 dirtyDataCounter .inc ();
146150 return null ;
147151 }finally {
Original file line number Diff line number Diff line change @@ -113,14 +113,15 @@ public Row deserialize(byte[] message) throws IOException {
113113 }
114114
115115 try {
116- numInRecord .inc ();
117- numInBytes .inc (message .length );
118-
119116 JsonNode root = objectMapper .readTree (message );
120117
121- if (numInResolveRecord .getCount ()%rowLenth == 0 ){
118+ if (numInRecord .getCount ()%rowLenth == 0 ){
122119 LOG .info (root .toString ());
123120 }
121+
122+ numInRecord .inc ();
123+ numInBytes .inc (message .length );
124+
124125 parseTree (root , null );
125126 Row row = new Row (fieldNames .length );
126127
@@ -145,6 +146,9 @@ public Row deserialize(byte[] message) throws IOException {
145146 return row ;
146147 } catch (Throwable t ) {
147148 //add metric of dirty data
149+ if (dirtyDataCounter .getCount ()%rowLenth == 0 ){
150+ LOG .info (objectMapper .readTree (message ).toString ());
151+ }
148152 dirtyDataCounter .inc ();
149153 return null ;
150154 }finally {
You can’t perform that action at this time.
0 commit comments