@@ -871,7 +871,7 @@ private synchronized void onStreamingQueryProgressEvent(
871871 batchSpan .setTag (prefix + "input_rows_per_second" , source .inputRowsPerSecond ());
872872 batchSpan .setTag (prefix + "processed_rows_per_second" , source .processedRowsPerSecond ());
873873
874- reportKafkaOffsets (progress . name (), batchSpan , source );
874+ reportKafkaOffsets (batchSpan . getServiceName (), batchSpan , source );
875875 }
876876
877877 for (int i = 0 ; i < progress .stateOperators ().length ; i ++) {
@@ -1167,27 +1167,21 @@ private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDat
11671167
11681168 private static void reportKafkaOffsets (
11691169 final String appName , final AgentSpan span , final SourceProgress progress ) {
1170- System .out .println ("==== reporting kafka offsets ====" );
11711170 if (!span .traceConfig ().isDataStreamsEnabled () || progress == null ) {
1172- System .out .println ("==== progress is null ====" );
11731171 return ;
11741172 }
11751173
11761174 // check if this is a kafka source
1177- System .out .println (
1178- "==== processing source '" + progress .description ().toLowerCase () + "' ====" );
1179- System .out .println ("==== raw data '" + progress .endOffset () + "' ====" );
11801175 if (progress .description ().toLowerCase ().startsWith ("kafka" )) {
11811176 try {
11821177 ObjectMapper objectMapper = new ObjectMapper ();
11831178 // parse offsets from endOffsets json, reported in a format:
1184- // "topic" -> ["partition":" value" ]
1179+ // "topic" -> ["partition":value]
11851180 JsonNode jsonNode = objectMapper .readTree (progress .endOffset ());
11861181 Iterator <String > topics = jsonNode .fieldNames ();
11871182 // report offsets for all topics / partitions
11881183 while (topics .hasNext ()) {
11891184 String topic = topics .next ();
1190- System .out .println ("==== found topic '" + topic + "' ====" );
11911185 JsonNode topicNode = jsonNode .get (topic );
11921186 // iterate thought reported partitions
11931187 Iterator <String > allPartitions = topicNode .fieldNames ();
@@ -1201,19 +1195,14 @@ private static void reportKafkaOffsets(
12011195
12021196 while (allPartitions .hasNext ()) {
12031197 String partition = allPartitions .next ();
1204- Long value = topicNode .get (partition ).asLong ();
12051198 sortedTags .put (PARTITION_TAG , partition );
1206- System .out .println (
1207- "==== found partition '" + partition + "', value '" + value .toString () + "' ====" );
1208- AgentTracer .get ().getDataStreamsMonitoring ().trackBacklog (sortedTags , value );
1209-
1210- // for debug only, will be removed
1211- span .setTag ("dsm." + topic + "." + partition , value );
1212- span .setTag ("dsm.app_name" , appName );
1199+ AgentTracer .get ()
1200+ .getDataStreamsMonitoring ()
1201+ .trackBacklog (sortedTags , topicNode .get (partition ).asLong ());
12131202 }
12141203 }
12151204 } catch (Exception e ) {
1216- System . out . println ( "==== Failed to parse kafka offsets ==== \n " + e . toString () );
1205+ log . debug ( " Failed to parse kafka offsets" , e );
12171206 }
12181207 }
12191208 }
0 commit comments