66package io .opentelemetry .javaagent .instrumentation .kafkaconnect .v2_6 ;
77
88import static net .bytebuddy .matcher .ElementMatchers .hasSuperType ;
9- import static net .bytebuddy .matcher .ElementMatchers .named ;
109import static net .bytebuddy .matcher .ElementMatchers .isPublic ;
10+ import static net .bytebuddy .matcher .ElementMatchers .named ;
1111import static net .bytebuddy .matcher .ElementMatchers .takesArgument ;
1212
1313import io .opentelemetry .context .Context ;
1616import io .opentelemetry .javaagent .extension .instrumentation .TypeInstrumentation ;
1717import io .opentelemetry .javaagent .extension .instrumentation .TypeTransformer ;
1818import java .util .Collection ;
19+ import java .util .logging .Logger ;
1920import net .bytebuddy .asm .Advice ;
2021import net .bytebuddy .description .type .TypeDescription ;
2122import net .bytebuddy .matcher .ElementMatcher ;
2223import org .apache .kafka .connect .sink .SinkRecord ;
23- import java .util .logging .Logger ;
2424
2525public class SinkTaskInstrumentation implements TypeInstrumentation {
2626
@@ -35,20 +35,19 @@ public ElementMatcher<TypeDescription> typeMatcher() {
3535 @ Override
3636 public void transform (TypeTransformer transformer ) {
3737 logger .info ("KafkaConnect: transform() called - applying put() advice" );
38-
38+
3939 // Add advice to constructor to see what classes we're instrumenting
4040 transformer .applyAdviceToMethod (
41- named ("<init>" ),
42- SinkTaskInstrumentation .class .getName () + "$ConstructorAdvice" );
43-
41+ named ("<init>" ), SinkTaskInstrumentation .class .getName () + "$ConstructorAdvice" );
42+
4443 transformer .applyAdviceToMethod (
4544 named ("put" ).and (takesArgument (0 , Collection .class )).and (isPublic ()),
4645 SinkTaskInstrumentation .class .getName () + "$SinkTaskPutAdvice" );
4746 }
4847
4948 @ SuppressWarnings ("unused" )
5049 public static class ConstructorAdvice {
51-
50+
5251 public static final Logger logger = Logger .getLogger (ConstructorAdvice .class .getName ());
5352
5453 @ Advice .OnMethodExit (suppress = Throwable .class )
@@ -69,16 +68,18 @@ public static void onEnter(
6968 @ Advice .Local ("otelTask" ) KafkaConnectTask task ,
7069 @ Advice .Local ("otelContext" ) Context context ,
7170 @ Advice .Local ("otelScope" ) Scope scope ) {
72-
71+
7372 logger .info ("KafkaConnect: SinkTask.put() called with " + records .size () + " records" );
74-
75- Context parentContext = Java8BytecodeBridge .currentContext ();
73+
74+ Context parentContext = Java8BytecodeBridge .currentContext ();
7675
7776 // Extract context from first record if available
7877 if (!records .isEmpty ()) {
7978 SinkRecord firstRecord = records .iterator ().next ();
80- Context extractedContext = KafkaConnectSingletons .propagator ()
81- .extract (parentContext , firstRecord , KafkaConnectSingletons .sinkRecordHeaderGetter ());
79+ Context extractedContext =
80+ KafkaConnectSingletons .propagator ()
81+ .extract (
82+ parentContext , firstRecord , KafkaConnectSingletons .sinkRecordHeaderGetter ());
8283 parentContext = extractedContext ;
8384 }
8485
@@ -87,7 +88,7 @@ public static void onEnter(
8788 logger .info ("KafkaConnect: Instrumenter shouldStart returned false" );
8889 return ;
8990 }
90-
91+
9192 context = KafkaConnectSingletons .instrumenter ().start (parentContext , task );
9293 scope = context .makeCurrent ();
9394 logger .info ("KafkaConnect: Started span" );
@@ -99,7 +100,7 @@ public static void onExit(
99100 @ Advice .Local ("otelTask" ) KafkaConnectTask task ,
100101 @ Advice .Local ("otelContext" ) Context context ,
101102 @ Advice .Local ("otelScope" ) Scope scope ) {
102-
103+
103104 if (scope == null ) {
104105 logger .info ("KafkaConnect: Scope is null, exiting" );
105106 return ;
@@ -108,7 +109,5 @@ public static void onExit(
108109 KafkaConnectSingletons .instrumenter ().end (context , task , null , throwable );
109110 logger .info ("KafkaConnect: Span ended" );
110111 }
111-
112-
113112 }
114113}
0 commit comments