@@ -1167,11 +1167,15 @@ private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDat
11671167
11681168 private static void reportKafkaOffsets (
11691169 final String appName , final AgentSpan span , final SourceProgress progress ) {
1170+ System .out .println ("==== reporting kafka offsets ====" );
11701171 if (!span .traceConfig ().isDataStreamsEnabled () || progress == null ) {
1172+ System .out .println ("==== progress is null ====" );
11711173 return ;
11721174 }
11731175
11741176 // check if this is a kafka source
1177+ System .out .println (
1178+ "==== processing source '" + progress .description ().toLowerCase () + "' ====" );
11751179 if (progress .description ().toLowerCase ().startsWith ("kafka" )) {
11761180 try {
11771181 ObjectMapper objectMapper = new ObjectMapper ();
@@ -1182,6 +1186,7 @@ private static void reportKafkaOffsets(
11821186 // report offsets for all topics / partitions
11831187 while (topics .hasNext ()) {
11841188 String topic = topics .next ();
1189+ System .out .println ("==== found topic '" + topic + "' ====" );
11851190 JsonNode topicNode = jsonNode .get (topic );
11861191 // iterate thought reported partitions
11871192 Iterator <String > allPartitions = topicNode .get (topic ).fieldNames ();
@@ -1197,17 +1202,17 @@ private static void reportKafkaOffsets(
11971202 String partition = allPartitions .next ();
11981203 String value = topicNode .get (partition ).textValue ();
11991204 sortedTags .put (PARTITION_TAG , partition );
1200-
1205+ System . out . println ( "==== found partition '" + partition + "' ====" );
12011206 AgentTracer .get ()
12021207 .getDataStreamsMonitoring ()
12031208 .trackBacklog (sortedTags , Long .parseLong (value ));
12041209
12051210 // for debug only, will be removed
1206- span .setTag ("dsm." + partition , value );
1211+ span .setTag ("dsm." + topic + "." + partition , value );
12071212 }
12081213 }
12091214 } catch (Exception e ) {
1210- log . debug ( " Failed to parse kafka offsets" , e );
1215+ System . out . println ( "==== Failed to parse kafka offsets ==== \n " + e . toString () );
12111216 }
12121217 }
12131218 }
0 commit comments