diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java index 46957bf399e8..3d9276fe0eaf 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java @@ -17,6 +17,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.util.VirtualField; +import io.opentelemetry.javaagent.bootstrap.CallDepth; import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.bootstrap.executors.ContextPropagatingRunnable; import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper; @@ -89,7 +90,13 @@ public static class SetExecuteRunnableStateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static PropagatedContext enterJobSubmit( - @Advice.Argument(value = 0, readOnly = false) Runnable task) { + @Advice.This Object executor, + @Advice.Argument(value = 0, readOnly = false) Runnable task, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + callDepth = CallDepth.forClass(executor.getClass()); + if (callDepth.getAndIncrement() > 0) { + return null; + } Context context = Java8BytecodeBridge.currentContext(); if (!ExecutorAdviceHelper.shouldPropagateContext(context, task)) { return null; @@ -107,7 +114,11 @@ public static PropagatedContext enterJobSubmit( public static void exitJobSubmit( @Advice.Argument(0) Runnable task, @Advice.Enter PropagatedContext propagatedContext, - @Advice.Thrown Throwable throwable) { + @Advice.Thrown Throwable throwable, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + if (callDepth.decrementAndGet() > 0) { + return; + } VirtualField virtualField = VirtualField.find(Runnable.class, PropagatedContext.class); ExecutorAdviceHelper.cleanUpAfterSubmit(propagatedContext, throwable, virtualField, task); @@ -144,7 +155,13 @@ public static class SetSubmitRunnableStateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static PropagatedContext enterJobSubmit( - @Advice.Argument(value = 0, readOnly = false) Runnable task) { + @Advice.This Object executor, + @Advice.Argument(value = 0, readOnly = false) Runnable task, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + callDepth = CallDepth.forClass(executor.getClass()); + if (callDepth.getAndIncrement() > 0) { + return null; + } Context context = Java8BytecodeBridge.currentContext(); if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) { VirtualField virtualField = @@ -159,7 +176,11 @@ public static void exitJobSubmit( @Advice.Argument(0) Runnable task, @Advice.Enter PropagatedContext propagatedContext, @Advice.Thrown Throwable throwable, - @Advice.Return Future future) { + @Advice.Return Future future, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + if (callDepth.decrementAndGet() > 0) { + return; + } if (propagatedContext != null && future != null) { VirtualField, PropagatedContext> virtualField = VirtualField.find(Future.class, PropagatedContext.class); @@ -175,7 +196,14 @@ public static void exitJobSubmit( public static class SetCallableStateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static PropagatedContext enterJobSubmit(@Advice.Argument(0) Callable task) { + public static PropagatedContext enterJobSubmit( + @Advice.This Object executor, + @Advice.Argument(0) Callable task, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + callDepth = CallDepth.forClass(executor.getClass()); + if (callDepth.getAndIncrement() > 0) { + return null; + } Context context = Java8BytecodeBridge.currentContext(); if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) { VirtualField, PropagatedContext> virtualField = @@ -190,7 +218,11 @@ public static void exitJobSubmit( @Advice.Argument(0) Callable task, @Advice.Enter PropagatedContext propagatedContext, @Advice.Thrown Throwable throwable, - @Advice.Return Future future) { + @Advice.Return Future future, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + if (callDepth.decrementAndGet() > 0) { + return; + } if (propagatedContext != null && future != null) { VirtualField, PropagatedContext> virtualField = VirtualField.find(Future.class, PropagatedContext.class); @@ -207,11 +239,18 @@ public static class SetCallableStateForCallableCollectionAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static Collection submitEnter( - @Advice.Argument(0) Collection> tasks) { + @Advice.This Object executor, + @Advice.Argument(0) Collection> tasks, + @Advice.Local("otelCallDepth") CallDepth callDepth) { if (tasks == null) { return Collections.emptyList(); } + callDepth = CallDepth.forClass(executor.getClass()); + if (callDepth.getAndIncrement() > 0) { + return Collections.emptyList(); + } + Context context = Java8BytecodeBridge.currentContext(); for (Callable task : tasks) { if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) { @@ -228,7 +267,13 @@ public static Collection submitEnter( @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void submitExit( - @Advice.Enter Collection> tasks, @Advice.Thrown Throwable throwable) { + @Advice.Enter Collection> tasks, + @Advice.Thrown Throwable throwable, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + if (callDepth.decrementAndGet() > 0) { + return; + } + /* Note1: invokeAny doesn't return any futures so all we need to do for it is to make sure we close all scopes in case of an exception.