55
66package io .opentelemetry .javaagent .instrumentation .kafkaconnect .v2_6 ;
77
8- import static net .bytebuddy .matcher .ElementMatchers .nameContains ;
8+ import static net .bytebuddy .matcher .ElementMatchers .hasSuperType ;
9+ import static net .bytebuddy .matcher .ElementMatchers .named ;
10+ import static net .bytebuddy .matcher .ElementMatchers .isPublic ;
11+ import static net .bytebuddy .matcher .ElementMatchers .takesArgument ;
912
1013import io .opentelemetry .context .Context ;
1114import io .opentelemetry .context .Scope ;
15+ import io .opentelemetry .context .propagation .TextMapGetter ;
1216import io .opentelemetry .javaagent .bootstrap .Java8BytecodeBridge ;
1317import io .opentelemetry .javaagent .extension .instrumentation .TypeInstrumentation ;
1418import io .opentelemetry .javaagent .extension .instrumentation .TypeTransformer ;
1721import net .bytebuddy .description .type .TypeDescription ;
1822import net .bytebuddy .matcher .ElementMatcher ;
1923import org .apache .kafka .connect .sink .SinkRecord ;
24+ import org .apache .kafka .connect .header .Header ;
25+ import org .apache .kafka .connect .header .Headers ;
26+ import java .nio .charset .StandardCharsets ;
2027import java .util .logging .Logger ;
21- import java .lang .reflect .Method ;
22- import static net .bytebuddy .matcher .ElementMatchers .isPublic ;
23- import static net .bytebuddy .matcher .ElementMatchers .named ;
24- import static net .bytebuddy .matcher .ElementMatchers .takesArgument ;
2528
2629public class SinkTaskInstrumentation implements TypeInstrumentation {
2730
28- // Change from private to public:
29- public static final Logger logger = Logger .getLogger (SinkTaskInstrumentation .class .getName ());
31+ private static final Logger logger = Logger .getLogger (SinkTaskInstrumentation .class .getName ());
3032
3133 @ Override
3234 public ElementMatcher <TypeDescription > typeMatcher () {
3335 logger .info ("KafkaConnect: typeMatcher called" );
34-
35- // Temporarily match ANY class with "Sink" in the name to see what's available
36- return nameContains ("Sink" );
36+ return hasSuperType (named ("org.apache.kafka.connect.sink.SinkTask" ));
3737 }
3838
3939 @ Override
4040 public void transform (TypeTransformer transformer ) {
41- logger .info ("KafkaConnect: transform() called - about to apply put() advice" );
41+ logger .info ("KafkaConnect: transform() called - applying put() advice" );
4242
43- // Switch back to instrumenting the put method
43+ // Add advice to constructor to see what classes we're instrumenting
44+ transformer .applyAdviceToMethod (
45+ named ("<init>" ),
46+ SinkTaskInstrumentation .class .getName () + "$ConstructorAdvice" );
47+
4448 transformer .applyAdviceToMethod (
4549 named ("put" ).and (takesArgument (0 , Collection .class )).and (isPublic ()),
4650 SinkTaskInstrumentation .class .getName () + "$SinkTaskPutAdvice" );
4751 }
4852
4953 @ SuppressWarnings ("unused" )
5054 public static class ConstructorAdvice {
51- // Change from private to public:
55+
5256 public static final Logger logger = Logger .getLogger (ConstructorAdvice .class .getName ());
5357
5458 @ Advice .OnMethodExit (suppress = Throwable .class )
5559 public static void onExit (@ Advice .This Object thiz ) {
5660 String className = thiz .getClass ().getName ();
57- logger .info ("KafkaConnect: Created instance of: " + className );
58-
59- // Special logging for the classes we care about
60- if (className .contains ("JdbcSinkTask" )) {
61- logger .info ("🎯 KafkaConnect: Found JdbcSinkTask! " + className );
62- }
63- if (className .contains ("SinkTask" )) {
64- logger .info ("📋 KafkaConnect: Found SinkTask subclass: " + className );
65- }
66- }
67- }
68-
69- @ SuppressWarnings ("unused" )
70- public static class AllMethodsAdvice {
71- private static final Logger logger = Logger .getLogger (AllMethodsAdvice .class .getName ());
72-
73- @ Advice .OnMethodEnter (suppress = Throwable .class )
74- public static void onEnter (@ Advice .Origin Method method , @ Advice .This Object thiz ) {
75- // Only log methods that look interesting
76- String methodName = method .getName ();
77- if (methodName .contains ("put" ) || methodName .contains ("process" ) ||
78- methodName .contains ("write" ) || methodName .contains ("execute" )) {
79- logger .info ("KafkaConnect: Method called: " + thiz .getClass ().getName () + "." + methodName + "()" );
80- }
61+ logger .info ("KafkaConnect: Instrumented class instantiated: " + className );
8162 }
8263 }
8364
@@ -93,19 +74,16 @@ public static void onEnter(
9374 @ Advice .Local ("otelContext" ) Context context ,
9475 @ Advice .Local ("otelScope" ) Scope scope ) {
9576
96- logger .info ("🚀 KafkaConnect: SinkTask.put() ENTER called with " + records .size () + " records" );
97-
98- Context parentContext = Java8BytecodeBridge .currentContext ();
77+ logger .info ("KafkaConnect: SinkTask.put() called with " + records .size () + " records" );
9978
100- // TEMPORARILY COMMENT OUT this problematic line:
101- /*
79+ Context parentContext = Java8BytecodeBridge .currentContext ();
80+
81+ // Extract context from first record if available
10282 if (!records .isEmpty ()) {
10383 SinkRecord firstRecord = records .iterator ().next ();
104- Context extractedContext = KafkaConnectSingletons.propagator()
105- .extract(parentContext, firstRecord, SinkRecordHeadersGetter.INSTANCE);
106- // ... rest of extraction logic
84+ Context extractedContext = extractContextFromRecord (parentContext , firstRecord );
85+ parentContext = extractedContext ;
10786 }
108- */
10987
11088 task = new KafkaConnectTask (records );
11189 if (!KafkaConnectSingletons .instrumenter ().shouldStart (parentContext , task )) {
@@ -115,7 +93,7 @@ public static void onEnter(
11593
11694 context = KafkaConnectSingletons .instrumenter ().start (parentContext , task );
11795 scope = context .makeCurrent ();
118- logger .info ("🎯 KafkaConnect: Started span with context: " + context );
96+ logger .info ("KafkaConnect: Started span" );
11997 }
12098
12199 @ Advice .OnMethodExit (onThrowable = Throwable .class , suppress = Throwable .class )
@@ -125,15 +103,45 @@ public static void onExit(
125103 @ Advice .Local ("otelContext" ) Context context ,
126104 @ Advice .Local ("otelScope" ) Scope scope ) {
127105
128- logger .info ("KafkaConnect: SinkTask.put() EXIT called" );
129-
130106 if (scope == null ) {
131- logger .warning ("KafkaConnect: Scope is null, exiting" );
107+ logger .info ("KafkaConnect: Scope is null, exiting" );
132108 return ;
133109 }
134110 scope .close ();
135111 KafkaConnectSingletons .instrumenter ().end (context , task , null , throwable );
136112 logger .info ("KafkaConnect: Span ended" );
137113 }
114+
115+ public static Context extractContextFromRecord (Context parentContext , SinkRecord record ) {
116+ // Create a simple TextMapGetter for SinkRecord headers
117+ return KafkaConnectSingletons .propagator ().extract (parentContext , record , new TextMapGetter <SinkRecord >() {
118+ @ Override
119+ public Iterable <String > keys (SinkRecord sinkRecord ) {
120+ Headers headers = sinkRecord .headers ();
121+ java .util .List <String > keys = new java .util .ArrayList <>();
122+ for (Header header : headers ) {
123+ keys .add (header .key ());
124+ }
125+ return keys ;
126+ }
127+
128+ @ Override
129+ public String get (SinkRecord sinkRecord , String key ) {
130+ if (sinkRecord == null ) {
131+ return null ;
132+ }
133+ Headers headers = sinkRecord .headers ();
134+ Header header = headers .lastWithName (key );
135+ if (header == null ) {
136+ return null ;
137+ }
138+ Object value = header .value ();
139+ if (value instanceof byte []) {
140+ return new String ((byte []) value , StandardCharsets .UTF_8 );
141+ }
142+ return value != null ? value .toString () : null ;
143+ }
144+ });
145+ }
138146 }
139147}
0 commit comments