diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectInstrumentationModule.java b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectInstrumentationModule.java index 65a8204f29a0..c846930c2b92 100644 --- a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectInstrumentationModule.java +++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectInstrumentationModule.java @@ -11,11 +11,13 @@ import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; import java.util.List; import net.bytebuddy.matcher.ElementMatcher; @AutoService(InstrumentationModule.class) -public class KafkaConnectInstrumentationModule extends InstrumentationModule { +public class KafkaConnectInstrumentationModule extends InstrumentationModule + implements ExperimentalInstrumentationModule { public KafkaConnectInstrumentationModule() { super("kafka-connect", "kafka-connect-2.6"); @@ -31,4 +33,9 @@ public ElementMatcher.Junction classLoaderMatcher() { // class added in 2.6.0 return hasClassesNamed("org.apache.kafka.connect.sink.SinkConnectorContext"); } + + @Override + public boolean isIndyReady() { + return true; + } } diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java index 05ad924e7730..3e65ed0b5726 100644 --- a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java +++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java @@ -13,10 +13,10 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.Collection; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -39,36 +39,50 @@ public void transform(TypeTransformer transformer) { @SuppressWarnings("unused") public static class SinkTaskPutAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(0) Collection records, - @Advice.Local("otelTask") KafkaConnectTask task, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelScope") Scope scope) { + public static class AdviceScope { + private final KafkaConnectTask task; + private final Context context; + private final Scope scope; + + private AdviceScope(KafkaConnectTask task, Context context, Scope scope) { + this.task = task; + this.context = context; + this.scope = scope; + } + + @Nullable + public static AdviceScope start(Collection records) { + Context parentContext = Context.current(); - Context parentContext = Java8BytecodeBridge.currentContext(); + KafkaConnectTask task = new KafkaConnectTask(records); + if (!instrumenter().shouldStart(parentContext, task)) { + return null; + } - task = new KafkaConnectTask(records); - if (!instrumenter().shouldStart(parentContext, task)) { - return; + Context context = instrumenter().start(parentContext, task); + return new AdviceScope(task, context, context.makeCurrent()); } - context = instrumenter().start(parentContext, task); - scope = context.makeCurrent(); + public void end(@Nullable Throwable throwable) { + scope.close(); + instrumenter().end(context, task, null, throwable); + } + } + + @Nullable + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AdviceScope onEnter(@Advice.Argument(0) Collection records) { + return AdviceScope.start(records); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void onExit( - @Advice.Thrown Throwable throwable, - @Advice.Local("otelTask") KafkaConnectTask task, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelScope") Scope scope) { + @Advice.Thrown @Nullable Throwable throwable, + @Advice.Enter @Nullable AdviceScope adviceScope) { - if (scope == null) { - return; + if (adviceScope != null) { + adviceScope.end(throwable); } - scope.close(); - instrumenter().end(context, task, null, throwable); } } }