1616import io .opentelemetry .javaagent .extension .instrumentation .TypeInstrumentation ;
1717import io .opentelemetry .javaagent .extension .instrumentation .TypeTransformer ;
1818import java .util .Collection ;
19- import java .util .logging .Logger ;
2019import net .bytebuddy .asm .Advice ;
2120import net .bytebuddy .description .type .TypeDescription ;
2221import net .bytebuddy .matcher .ElementMatcher ;
2322import org .apache .kafka .connect .sink .SinkRecord ;
2423
2524public class SinkTaskInstrumentation implements TypeInstrumentation {
2625
27- private static final Logger logger = Logger .getLogger (SinkTaskInstrumentation .class .getName ());
28-
2926 @ Override
3027 public ElementMatcher <TypeDescription > typeMatcher () {
31- logger .info ("KafkaConnect: typeMatcher called" );
3228 return hasSuperType (named ("org.apache.kafka.connect.sink.SinkTask" ));
3329 }
3430
3531 @ Override
3632 public void transform (TypeTransformer transformer ) {
37- logger .info ("KafkaConnect: transform() called - applying put() advice" );
38-
39- // Add advice to constructor to see what classes we're instrumenting
40- transformer .applyAdviceToMethod (
41- named ("<init>" ), SinkTaskInstrumentation .class .getName () + "$ConstructorAdvice" );
42-
4333 transformer .applyAdviceToMethod (
4434 named ("put" ).and (takesArgument (0 , Collection .class )).and (isPublic ()),
4535 SinkTaskInstrumentation .class .getName () + "$SinkTaskPutAdvice" );
4636 }
4737
48- @ SuppressWarnings ("unused" )
49- public static class ConstructorAdvice {
50-
51- public static final Logger logger = Logger .getLogger (ConstructorAdvice .class .getName ());
52-
53- @ Advice .OnMethodExit (suppress = Throwable .class )
54- public static void onExit (@ Advice .This Object thiz ) {
55- String className = thiz .getClass ().getName ();
56- logger .info ("KafkaConnect: Instrumented class instantiated: " + className );
57- }
58- }
59-
6038 @ SuppressWarnings ("unused" )
6139 public static class SinkTaskPutAdvice {
6240
63- public static final Logger logger = Logger .getLogger (SinkTaskPutAdvice .class .getName ());
64-
6541 @ Advice .OnMethodEnter (suppress = Throwable .class )
6642 public static void onEnter (
6743 @ Advice .Argument (0 ) Collection <SinkRecord > records ,
6844 @ Advice .Local ("otelTask" ) KafkaConnectTask task ,
6945 @ Advice .Local ("otelContext" ) Context context ,
7046 @ Advice .Local ("otelScope" ) Scope scope ) {
7147
72- logger .info ("KafkaConnect: SinkTask.put() called with " + records .size () + " records" );
73-
7448 Context parentContext = Java8BytecodeBridge .currentContext ();
7549
7650 // Extract context from first record if available
@@ -85,13 +59,11 @@ public static void onEnter(
8559
8660 task = new KafkaConnectTask (records );
8761 if (!KafkaConnectSingletons .instrumenter ().shouldStart (parentContext , task )) {
88- logger .info ("KafkaConnect: Instrumenter shouldStart returned false" );
8962 return ;
9063 }
9164
9265 context = KafkaConnectSingletons .instrumenter ().start (parentContext , task );
9366 scope = context .makeCurrent ();
94- logger .info ("KafkaConnect: Started span" );
9567 }
9668
9769 @ Advice .OnMethodExit (onThrowable = Throwable .class , suppress = Throwable .class )
@@ -102,12 +74,10 @@ public static void onExit(
10274 @ Advice .Local ("otelScope" ) Scope scope ) {
10375
10476 if (scope == null ) {
105- logger .info ("KafkaConnect: Scope is null, exiting" );
10677 return ;
10778 }
10879 scope .close ();
10980 KafkaConnectSingletons .instrumenter ().end (context , task , null , throwable );
110- logger .info ("KafkaConnect: Span ended" );
11181 }
11282 }
11383}
0 commit comments