1919import io .opentelemetry .instrumentation .api .util .VirtualField ;
2020import io .opentelemetry .javaagent .bootstrap .CallDepth ;
2121import io .opentelemetry .javaagent .bootstrap .Java8BytecodeBridge ;
22+ import io .opentelemetry .javaagent .bootstrap .executors .ContextPropagatingCallable ;
2223import io .opentelemetry .javaagent .bootstrap .executors .ContextPropagatingRunnable ;
2324import io .opentelemetry .javaagent .bootstrap .executors .ExecutorAdviceHelper ;
2425import io .opentelemetry .javaagent .bootstrap .executors .PropagatedContext ;
2526import io .opentelemetry .javaagent .extension .instrumentation .TypeInstrumentation ;
2627import io .opentelemetry .javaagent .extension .instrumentation .TypeTransformer ;
28+ import java .util .ArrayList ;
2729import java .util .Collection ;
2830import java .util .Collections ;
31+ import java .util .List ;
2932import java .util .concurrent .Callable ;
3033import java .util .concurrent .ForkJoinTask ;
3134import java .util .concurrent .Future ;
@@ -163,12 +166,16 @@ public static PropagatedContext enterJobSubmit(
163166 return null ;
164167 }
165168 Context context = Java8BytecodeBridge .currentContext ();
166- if (ExecutorAdviceHelper .shouldPropagateContext (context , task )) {
167- VirtualField <Runnable , PropagatedContext > virtualField =
168- VirtualField .find (Runnable .class , PropagatedContext .class );
169- return ExecutorAdviceHelper .attachContextToTask (context , virtualField , task );
169+ if (!ExecutorAdviceHelper .shouldPropagateContext (context , task )) {
170+ return null ;
170171 }
171- return null ;
172+ if (ContextPropagatingRunnable .shouldDecorateRunnable (task )) {
173+ task = ContextPropagatingRunnable .propagateContext (task , context );
174+ return null ;
175+ }
176+ VirtualField <Runnable , PropagatedContext > virtualField =
177+ VirtualField .find (Runnable .class , PropagatedContext .class );
178+ return ExecutorAdviceHelper .attachContextToTask (context , virtualField , task );
172179 }
173180
174181 @ Advice .OnMethodExit (onThrowable = Throwable .class , suppress = Throwable .class )
@@ -198,19 +205,23 @@ public static class SetCallableStateAdvice {
198205 @ Advice .OnMethodEnter (suppress = Throwable .class )
199206 public static PropagatedContext enterJobSubmit (
200207 @ Advice .This Object executor ,
201- @ Advice .Argument (0 ) Callable <?> task ,
208+ @ Advice .Argument (value = 0 , readOnly = false ) Callable <?> task ,
202209 @ Advice .Local ("otelCallDepth" ) CallDepth callDepth ) {
203210 callDepth = CallDepth .forClass (executor .getClass ());
204211 if (callDepth .getAndIncrement () > 0 ) {
205212 return null ;
206213 }
207214 Context context = Java8BytecodeBridge .currentContext ();
208- if (ExecutorAdviceHelper .shouldPropagateContext (context , task )) {
209- VirtualField <Callable <?>, PropagatedContext > virtualField =
210- VirtualField .find (Callable .class , PropagatedContext .class );
211- return ExecutorAdviceHelper .attachContextToTask (context , virtualField , task );
215+ if (!ExecutorAdviceHelper .shouldPropagateContext (context , task )) {
216+ return null ;
212217 }
213- return null ;
218+ if (ContextPropagatingCallable .shouldDecorateCallable (task )) {
219+ task = ContextPropagatingCallable .propagateContext (task , context );
220+ return null ;
221+ }
222+ VirtualField <Callable <?>, PropagatedContext > virtualField =
223+ VirtualField .find (Callable .class , PropagatedContext .class );
224+ return ExecutorAdviceHelper .attachContextToTask (context , virtualField , task );
214225 }
215226
216227 @ Advice .OnMethodExit (onThrowable = Throwable .class , suppress = Throwable .class )
@@ -240,7 +251,7 @@ public static class SetCallableStateForCallableCollectionAdvice {
240251 @ Advice .OnMethodEnter (suppress = Throwable .class )
241252 public static Collection <?> submitEnter (
242253 @ Advice .This Object executor ,
243- @ Advice .Argument (0 ) Collection <? extends Callable <?>> tasks ,
254+ @ Advice .Argument (value = 0 , readOnly = false ) Collection <? extends Callable <?>> tasks ,
244255 @ Advice .Local ("otelCallDepth" ) CallDepth callDepth ) {
245256 if (tasks == null ) {
246257 return Collections .emptyList ();
@@ -252,14 +263,40 @@ public static Collection<?> submitEnter(
252263 }
253264
254265 Context context = Java8BytecodeBridge .currentContext ();
266+
267+ // first, go through the list and wrap all Callables that need to be wrapped
268+ List <Callable <?>> list = null ;
269+ for (Callable <?> task : tasks ) {
270+ if (!ExecutorAdviceHelper .shouldPropagateContext (context , task )) {
271+ continue ;
272+ }
273+ if (ContextPropagatingCallable .shouldDecorateCallable (task )) {
274+ // lazily create the list only if we need to
275+ if (list == null ) {
276+ list = new ArrayList <>();
277+ }
278+ list .add (ContextPropagatingCallable .propagateContext (task , context ));
279+ }
280+ }
281+
255282 for (Callable <?> task : tasks ) {
256- if (ExecutorAdviceHelper .shouldPropagateContext (context , task )) {
283+ if (ExecutorAdviceHelper .shouldPropagateContext (context , task )
284+ && !ContextPropagatingCallable .shouldDecorateCallable (task )) {
257285 VirtualField <Callable <?>, PropagatedContext > virtualField =
258286 VirtualField .find (Callable .class , PropagatedContext .class );
259287 ExecutorAdviceHelper .attachContextToTask (context , virtualField , task );
288+ // if there are wrapped Callables, we need to add the unwrapped ones as well
289+ if (list != null ) {
290+ list .add (task );
291+ }
260292 }
261293 }
262294
295+ // replace the original list with our new list if we created one
296+ if (list != null ) {
297+ tasks = list ;
298+ }
299+
263300 // returning tasks and not propagatedContexts to avoid allocating another list just for an
264301 // edge case (exception)
265302 return tasks ;
0 commit comments