From 519143457cee0cc01b6fe6009af02c2427b3ea64 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Fri, 29 Aug 2025 14:09:37 +0300 Subject: [PATCH 1/2] Add call depth check to executor instrumentation --- .../JavaExecutorInstrumentation.java | 57 ++++++++++++++++--- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java index 46957bf399e8..c33a947774ab 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java @@ -17,6 +17,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.util.VirtualField; +import io.opentelemetry.javaagent.bootstrap.CallDepth; import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.bootstrap.executors.ContextPropagatingRunnable; import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper; @@ -26,6 +27,7 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.Future; import net.bytebuddy.asm.Advice; @@ -89,7 +91,12 @@ public static class SetExecuteRunnableStateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static PropagatedContext enterJobSubmit( - @Advice.Argument(value = 0, readOnly = false) Runnable task) { + @Advice.Argument(value = 0, readOnly = false) Runnable task, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + callDepth = CallDepth.forClass(Executor.class); + if (callDepth.getAndIncrement() > 0) { + return null; + } Context context = Java8BytecodeBridge.currentContext(); if (!ExecutorAdviceHelper.shouldPropagateContext(context, task)) { return null; @@ -107,7 +114,11 @@ public static PropagatedContext enterJobSubmit( public static void exitJobSubmit( @Advice.Argument(0) Runnable task, @Advice.Enter PropagatedContext propagatedContext, - @Advice.Thrown Throwable throwable) { + @Advice.Thrown Throwable throwable, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + if (callDepth.decrementAndGet() > 0) { + return; + } VirtualField virtualField = VirtualField.find(Runnable.class, PropagatedContext.class); ExecutorAdviceHelper.cleanUpAfterSubmit(propagatedContext, throwable, virtualField, task); @@ -144,7 +155,12 @@ public static class SetSubmitRunnableStateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static PropagatedContext enterJobSubmit( - @Advice.Argument(value = 0, readOnly = false) Runnable task) { + @Advice.Argument(value = 0, readOnly = false) Runnable task, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + callDepth = CallDepth.forClass(Executor.class); + if (callDepth.getAndIncrement() > 0) { + return null; + } Context context = Java8BytecodeBridge.currentContext(); if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) { VirtualField virtualField = @@ -159,7 +175,11 @@ public static void exitJobSubmit( @Advice.Argument(0) Runnable task, @Advice.Enter PropagatedContext propagatedContext, @Advice.Thrown Throwable throwable, - @Advice.Return Future future) { + @Advice.Return Future future, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + if (callDepth.decrementAndGet() > 0) { + return; + } if (propagatedContext != null && future != null) { VirtualField, PropagatedContext> virtualField = VirtualField.find(Future.class, PropagatedContext.class); @@ -175,7 +195,12 @@ public static void exitJobSubmit( public static class SetCallableStateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static PropagatedContext enterJobSubmit(@Advice.Argument(0) Callable task) { + public static PropagatedContext enterJobSubmit( + @Advice.Argument(0) Callable task, @Advice.Local("otelCallDepth") CallDepth callDepth) { + callDepth = CallDepth.forClass(Executor.class); + if (callDepth.getAndIncrement() > 0) { + return null; + } Context context = Java8BytecodeBridge.currentContext(); if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) { VirtualField, PropagatedContext> virtualField = @@ -190,7 +215,11 @@ public static void exitJobSubmit( @Advice.Argument(0) Callable task, @Advice.Enter PropagatedContext propagatedContext, @Advice.Thrown Throwable throwable, - @Advice.Return Future future) { + @Advice.Return Future future, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + if (callDepth.decrementAndGet() > 0) { + return; + } if (propagatedContext != null && future != null) { VirtualField, PropagatedContext> virtualField = VirtualField.find(Future.class, PropagatedContext.class); @@ -207,11 +236,17 @@ public static class SetCallableStateForCallableCollectionAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static Collection submitEnter( - @Advice.Argument(0) Collection> tasks) { + @Advice.Argument(0) Collection> tasks, + @Advice.Local("otelCallDepth") CallDepth callDepth) { if (tasks == null) { return Collections.emptyList(); } + callDepth = CallDepth.forClass(Executor.class); + if (callDepth.getAndIncrement() > 0) { + return Collections.emptyList(); + } + Context context = Java8BytecodeBridge.currentContext(); for (Callable task : tasks) { if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) { @@ -228,7 +263,13 @@ public static Collection submitEnter( @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void submitExit( - @Advice.Enter Collection> tasks, @Advice.Thrown Throwable throwable) { + @Advice.Enter Collection> tasks, + @Advice.Thrown Throwable throwable, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + if (callDepth.decrementAndGet() > 0) { + return; + } + /* Note1: invokeAny doesn't return any futures so all we need to do for it is to make sure we close all scopes in case of an exception. From 23d97f1111468dab9edc935cd0ce18a2912defac Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Mon, 1 Sep 2025 13:18:49 +0300 Subject: [PATCH 2/2] use class of the executor --- .../executors/JavaExecutorInstrumentation.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java index c33a947774ab..3d9276fe0eaf 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java @@ -27,7 +27,6 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.Callable; -import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.Future; import net.bytebuddy.asm.Advice; @@ -91,9 +90,10 @@ public static class SetExecuteRunnableStateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static PropagatedContext enterJobSubmit( + @Advice.This Object executor, @Advice.Argument(value = 0, readOnly = false) Runnable task, @Advice.Local("otelCallDepth") CallDepth callDepth) { - callDepth = CallDepth.forClass(Executor.class); + callDepth = CallDepth.forClass(executor.getClass()); if (callDepth.getAndIncrement() > 0) { return null; } @@ -155,9 +155,10 @@ public static class SetSubmitRunnableStateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static PropagatedContext enterJobSubmit( + @Advice.This Object executor, @Advice.Argument(value = 0, readOnly = false) Runnable task, @Advice.Local("otelCallDepth") CallDepth callDepth) { - callDepth = CallDepth.forClass(Executor.class); + callDepth = CallDepth.forClass(executor.getClass()); if (callDepth.getAndIncrement() > 0) { return null; } @@ -196,8 +197,10 @@ public static class SetCallableStateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static PropagatedContext enterJobSubmit( - @Advice.Argument(0) Callable task, @Advice.Local("otelCallDepth") CallDepth callDepth) { - callDepth = CallDepth.forClass(Executor.class); + @Advice.This Object executor, + @Advice.Argument(0) Callable task, + @Advice.Local("otelCallDepth") CallDepth callDepth) { + callDepth = CallDepth.forClass(executor.getClass()); if (callDepth.getAndIncrement() > 0) { return null; } @@ -236,13 +239,14 @@ public static class SetCallableStateForCallableCollectionAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static Collection submitEnter( + @Advice.This Object executor, @Advice.Argument(0) Collection> tasks, @Advice.Local("otelCallDepth") CallDepth callDepth) { if (tasks == null) { return Collections.emptyList(); } - callDepth = CallDepth.forClass(Executor.class); + callDepth = CallDepth.forClass(executor.getClass()); if (callDepth.getAndIncrement() > 0) { return Collections.emptyList(); }