Skip to content

Commit 8dc904b

Browse files
authored
Merge branch 'main' into kafka-connect
2 parents 851ade2 + f56c6d0 commit 8dc904b

File tree

11 files changed

+394
-153
lines changed

11 files changed

+394
-153
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.bootstrap.executors;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import io.opentelemetry.instrumentation.api.internal.ContextPropagationDebug;
11+
import java.util.concurrent.Callable;
12+
13+
public final class ContextPropagatingCallable<T> implements Callable<T> {
14+
15+
public static <T> boolean shouldDecorateCallable(Callable<T> task) {
16+
// We wrap only lambdas' anonymous classes and if given object has not already been wrapped.
17+
// Anonymous classes have '/' in class name which is not allowed in 'normal' classes.
18+
// note: it is always safe to decorate lambdas since downstream code cannot be expecting a
19+
// specific runnable implementation anyways
20+
return task.getClass().getName().contains("/") && !(task instanceof ContextPropagatingCallable);
21+
}
22+
23+
public static <T> Callable<T> propagateContext(Callable<T> task, Context context) {
24+
return new ContextPropagatingCallable<T>(task, context);
25+
}
26+
27+
private final Callable<T> delegate;
28+
private final Context context;
29+
30+
private ContextPropagatingCallable(Callable<T> delegate, Context context) {
31+
this.delegate = delegate;
32+
this.context = ContextPropagationDebug.addDebugInfo(context, delegate);
33+
}
34+
35+
@Override
36+
public T call() throws Exception {
37+
try (Scope ignored = context.makeCurrent()) {
38+
return delegate.call();
39+
}
40+
}
41+
}

instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919
import io.opentelemetry.instrumentation.api.util.VirtualField;
2020
import io.opentelemetry.javaagent.bootstrap.CallDepth;
2121
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
22+
import io.opentelemetry.javaagent.bootstrap.executors.ContextPropagatingCallable;
2223
import io.opentelemetry.javaagent.bootstrap.executors.ContextPropagatingRunnable;
2324
import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper;
2425
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
2526
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2627
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
28+
import java.util.ArrayList;
2729
import java.util.Collection;
2830
import java.util.Collections;
31+
import java.util.List;
2932
import java.util.concurrent.Callable;
3033
import java.util.concurrent.ForkJoinTask;
3134
import java.util.concurrent.Future;
@@ -163,12 +166,16 @@ public static PropagatedContext enterJobSubmit(
163166
return null;
164167
}
165168
Context context = Java8BytecodeBridge.currentContext();
166-
if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
167-
VirtualField<Runnable, PropagatedContext> virtualField =
168-
VirtualField.find(Runnable.class, PropagatedContext.class);
169-
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, task);
169+
if (!ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
170+
return null;
170171
}
171-
return null;
172+
if (ContextPropagatingRunnable.shouldDecorateRunnable(task)) {
173+
task = ContextPropagatingRunnable.propagateContext(task, context);
174+
return null;
175+
}
176+
VirtualField<Runnable, PropagatedContext> virtualField =
177+
VirtualField.find(Runnable.class, PropagatedContext.class);
178+
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, task);
172179
}
173180

174181
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
@@ -198,19 +205,23 @@ public static class SetCallableStateAdvice {
198205
@Advice.OnMethodEnter(suppress = Throwable.class)
199206
public static PropagatedContext enterJobSubmit(
200207
@Advice.This Object executor,
201-
@Advice.Argument(0) Callable<?> task,
208+
@Advice.Argument(value = 0, readOnly = false) Callable<?> task,
202209
@Advice.Local("otelCallDepth") CallDepth callDepth) {
203210
callDepth = CallDepth.forClass(executor.getClass());
204211
if (callDepth.getAndIncrement() > 0) {
205212
return null;
206213
}
207214
Context context = Java8BytecodeBridge.currentContext();
208-
if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
209-
VirtualField<Callable<?>, PropagatedContext> virtualField =
210-
VirtualField.find(Callable.class, PropagatedContext.class);
211-
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, task);
215+
if (!ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
216+
return null;
212217
}
213-
return null;
218+
if (ContextPropagatingCallable.shouldDecorateCallable(task)) {
219+
task = ContextPropagatingCallable.propagateContext(task, context);
220+
return null;
221+
}
222+
VirtualField<Callable<?>, PropagatedContext> virtualField =
223+
VirtualField.find(Callable.class, PropagatedContext.class);
224+
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, task);
214225
}
215226

216227
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
@@ -240,7 +251,7 @@ public static class SetCallableStateForCallableCollectionAdvice {
240251
@Advice.OnMethodEnter(suppress = Throwable.class)
241252
public static Collection<?> submitEnter(
242253
@Advice.This Object executor,
243-
@Advice.Argument(0) Collection<? extends Callable<?>> tasks,
254+
@Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks,
244255
@Advice.Local("otelCallDepth") CallDepth callDepth) {
245256
if (tasks == null) {
246257
return Collections.emptyList();
@@ -252,14 +263,40 @@ public static Collection<?> submitEnter(
252263
}
253264

254265
Context context = Java8BytecodeBridge.currentContext();
266+
267+
// first, go through the list and wrap all Callables that need to be wrapped
268+
List<Callable<?>> list = null;
269+
for (Callable<?> task : tasks) {
270+
if (!ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
271+
continue;
272+
}
273+
if (ContextPropagatingCallable.shouldDecorateCallable(task)) {
274+
// lazily create the list only if we need to
275+
if (list == null) {
276+
list = new ArrayList<>();
277+
}
278+
list.add(ContextPropagatingCallable.propagateContext(task, context));
279+
}
280+
}
281+
255282
for (Callable<?> task : tasks) {
256-
if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
283+
if (ExecutorAdviceHelper.shouldPropagateContext(context, task)
284+
&& !ContextPropagatingCallable.shouldDecorateCallable(task)) {
257285
VirtualField<Callable<?>, PropagatedContext> virtualField =
258286
VirtualField.find(Callable.class, PropagatedContext.class);
259287
ExecutorAdviceHelper.attachContextToTask(context, virtualField, task);
288+
// if there are wrapped Callables, we need to add the unwrapped ones as well
289+
if (list != null) {
290+
list.add(task);
291+
}
260292
}
261293
}
262294

295+
// replace the original list with our new list if we created one
296+
if (list != null) {
297+
tasks = list;
298+
}
299+
263300
// returning tasks and not propagatedContexts to avoid allocating another list just for an
264301
// edge case (exception)
265302
return tasks;

instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/executors/LambdaContextPropagationTest.java

Lines changed: 123 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,149 @@
99

1010
import io.opentelemetry.api.baggage.Baggage;
1111
import io.opentelemetry.context.Scope;
12+
import java.util.ArrayList;
13+
import java.util.Collections;
14+
import java.util.List;
15+
import java.util.concurrent.Callable;
16+
import java.util.concurrent.ExecutionException;
1217
import java.util.concurrent.ExecutorService;
1318
import java.util.concurrent.Executors;
19+
import java.util.concurrent.TimeUnit;
1420
import java.util.concurrent.atomic.AtomicInteger;
21+
import org.junit.jupiter.api.BeforeEach;
1522
import org.junit.jupiter.api.Test;
1623

17-
// regression test for #9175
24+
// regression test for:
25+
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/9175
26+
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/14805
1827
class LambdaContextPropagationTest {
1928

2029
// must be static! the lambda that uses that must be non-capturing
2130
private static final AtomicInteger failureCounter = new AtomicInteger();
2231

32+
@BeforeEach
33+
void reset() {
34+
failureCounter.set(0);
35+
}
36+
2337
@Test
24-
void shouldCorrectlyPropagateContextToRunnables() {
38+
void propagateContextExecuteRunnable() throws InterruptedException {
2539
ExecutorService executor = Executors.newSingleThreadExecutor();
2640

2741
Baggage baggage = Baggage.builder().put("test", "test").build();
2842
try (Scope ignored = baggage.makeCurrent()) {
2943
for (int i = 0; i < 20; i++) {
30-
// must text execute() -- other methods like submit() decorate the Runnable with a
31-
// FutureTask
3244
executor.execute(LambdaContextPropagationTest::assertBaggage);
3345
}
3446
}
3547

48+
executor.shutdown();
49+
executor.awaitTermination(30, TimeUnit.SECONDS);
50+
51+
assertThat(failureCounter).hasValue(0);
52+
}
53+
54+
@Test
55+
void propagateContextSubmitRunnable() throws InterruptedException {
56+
ExecutorService executor = Executors.newSingleThreadExecutor();
57+
58+
Baggage baggage = Baggage.builder().put("test", "test").build();
59+
try (Scope ignored = baggage.makeCurrent()) {
60+
for (int i = 0; i < 20; i++) {
61+
executor.submit(LambdaContextPropagationTest::assertBaggage);
62+
}
63+
}
64+
65+
executor.shutdown();
66+
executor.awaitTermination(30, TimeUnit.SECONDS);
67+
68+
assertThat(failureCounter).hasValue(0);
69+
}
70+
71+
@Test
72+
void propagateContextSubmitRunnableAndResult() throws InterruptedException {
73+
ExecutorService executor = Executors.newSingleThreadExecutor();
74+
75+
Baggage baggage = Baggage.builder().put("test", "test").build();
76+
try (Scope ignored = baggage.makeCurrent()) {
77+
for (int i = 0; i < 20; i++) {
78+
executor.submit(LambdaContextPropagationTest::assertBaggage, null);
79+
}
80+
}
81+
82+
executor.shutdown();
83+
executor.awaitTermination(30, TimeUnit.SECONDS);
84+
85+
assertThat(failureCounter).hasValue(0);
86+
}
87+
88+
@Test
89+
void propagateContextSubmitCallable() throws InterruptedException {
90+
ExecutorService executor = Executors.newSingleThreadExecutor();
91+
92+
Baggage baggage = Baggage.builder().put("test", "test").build();
93+
try (Scope ignored = baggage.makeCurrent()) {
94+
for (int i = 0; i < 20; i++) {
95+
Callable<?> callable =
96+
() -> {
97+
assertBaggage();
98+
return null;
99+
};
100+
executor.submit(callable);
101+
}
102+
}
103+
104+
executor.shutdown();
105+
executor.awaitTermination(30, TimeUnit.SECONDS);
106+
107+
assertThat(failureCounter).hasValue(0);
108+
}
109+
110+
@Test
111+
void propagateContextInvokeAll() throws InterruptedException {
112+
ExecutorService executor = Executors.newSingleThreadExecutor();
113+
114+
Baggage baggage = Baggage.builder().put("test", "test").build();
115+
try (Scope ignored = baggage.makeCurrent()) {
116+
for (int i = 0; i < 20; i++) {
117+
Callable<Void> callable =
118+
() -> {
119+
assertBaggage();
120+
return null;
121+
};
122+
List<Callable<Void>> callables = new ArrayList<>();
123+
for (int j = 0; j < 20; j++) {
124+
callables.add(callable);
125+
}
126+
executor.invokeAll(callables);
127+
}
128+
}
129+
130+
executor.shutdown();
131+
executor.awaitTermination(30, TimeUnit.SECONDS);
132+
133+
assertThat(failureCounter).hasValue(0);
134+
}
135+
136+
@Test
137+
void propagateContextInvokeAny() throws InterruptedException, ExecutionException {
138+
ExecutorService executor = Executors.newSingleThreadExecutor();
139+
140+
Baggage baggage = Baggage.builder().put("test", "test").build();
141+
try (Scope ignored = baggage.makeCurrent()) {
142+
for (int i = 0; i < 20; i++) {
143+
Callable<?> callable =
144+
() -> {
145+
assertBaggage();
146+
return null;
147+
};
148+
executor.invokeAny(Collections.singletonList(callable));
149+
}
150+
}
151+
152+
executor.shutdown();
153+
executor.awaitTermination(30, TimeUnit.SECONDS);
154+
36155
assertThat(failureCounter).hasValue(0);
37156
}
38157

instrumentation/opensearch/opensearch-rest-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/rest/v1_0/OpenSearchRestInstrumentationModule.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
import com.google.auto.service.AutoService;
1313
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
1414
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
15+
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
1516
import java.util.List;
1617
import net.bytebuddy.matcher.ElementMatcher;
1718

1819
@AutoService(InstrumentationModule.class)
19-
public class OpenSearchRestInstrumentationModule extends InstrumentationModule {
20+
public class OpenSearchRestInstrumentationModule extends InstrumentationModule
21+
implements ExperimentalInstrumentationModule {
2022
public OpenSearchRestInstrumentationModule() {
2123
super("opensearch-rest", "opensearch-rest-1.0", "opensearch");
2224
}
@@ -33,4 +35,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
3335
public List<TypeInstrumentation> typeInstrumentations() {
3436
return singletonList(new RestClientInstrumentation());
3537
}
38+
39+
@Override
40+
public boolean isIndyReady() {
41+
return true;
42+
}
3643
}

0 commit comments

Comments
 (0)