Skip to content

Commit a902c9e

Browse files
committed
check to make sure class is ObservableFromCallable
1 parent b7d0890 commit a902c9e

File tree

4 files changed

+90
-36
lines changed

4 files changed

+90
-36
lines changed

instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ public final class TracingAssembly {
9393
private static Function<? super ParallelFlowable, ? extends ParallelFlowable>
9494
oldOnParallelAssembly;
9595

96+
@SuppressWarnings("rawtypes")
97+
@GuardedBy("TracingAssembly.class")
98+
@Nullable
99+
private static Function<? super Observable, ? extends Observable> oldOnObservableAssembly;
100+
96101
@GuardedBy("TracingAssembly.class")
97102
private static boolean enabled;
98103

@@ -118,6 +123,8 @@ public void enable() {
118123

119124
enableObservable();
120125

126+
enableObservableAssembly();
127+
121128
enableCompletable();
122129

123130
enableSingle();
@@ -142,6 +149,8 @@ public void disable() {
142149

143150
disableObservable();
144151

152+
disableObservableAssembly();
153+
145154
disableCompletable();
146155

147156
disableSingle();
@@ -219,6 +228,24 @@ private static void enableObservable() {
219228
}
220229
}
221230

231+
@GuardedBy("TracingAssembly.class")
232+
@SuppressWarnings({"rawtypes", "unchecked"})
233+
private static void enableObservableAssembly() {
234+
oldOnObservableAssembly = RxJavaPlugins.getOnObservableAssembly();
235+
RxJavaPlugins.setOnObservableAssembly(
236+
compose(
237+
oldOnObservableAssembly,
238+
observable -> {
239+
if (observable
240+
.getClass()
241+
.getName()
242+
.equals("io.reactivex.internal.operators.observable.ObservableFromCallable")) {
243+
return new TracingCallableObservable(observable, Context.current());
244+
}
245+
return observable;
246+
}));
247+
}
248+
222249
@GuardedBy("TracingAssembly.class")
223250
@SuppressWarnings({"rawtypes", "unchecked"})
224251
private static void enableSingle() {
@@ -274,6 +301,12 @@ private static void disableObservable() {
274301
oldOnObservableSubscribe = null;
275302
}
276303

304+
@GuardedBy("TracingAssembly.class")
305+
private static void disableObservableAssembly() {
306+
RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly);
307+
oldOnObservableAssembly = null;
308+
}
309+
277310
@GuardedBy("TracingAssembly.class")
278311
private static void disableCompletable() {
279312
RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.rxjava.v2_0;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import io.reactivex.Observable;
11+
import io.reactivex.Observer;
12+
import java.lang.reflect.Field;
13+
import java.util.concurrent.Callable;
14+
15+
/** Wraps an ObservableFromCallable to ensure context propagation for the callable execution. */
16+
final class TracingCallableObservable<T> extends Observable<T> {
17+
private final Observable<T> source;
18+
private final Context context;
19+
20+
TracingCallableObservable(Observable<T> source, Context context) {
21+
this.source = source;
22+
this.context = context;
23+
}
24+
25+
@Override
26+
protected void subscribeActual(Observer<? super T> observer) {
27+
try {
28+
Field callableField = source.getClass().getDeclaredField("callable");
29+
callableField.setAccessible(true);
30+
@SuppressWarnings("unchecked")
31+
Callable<T> originalCallable = (Callable<T>) callableField.get(source);
32+
33+
if (originalCallable != null) {
34+
Callable<T> wrappedCallable =
35+
() -> {
36+
try (Scope ignored = context.makeCurrent()) {
37+
return originalCallable.call();
38+
}
39+
};
40+
41+
callableField.set(source, wrappedCallable);
42+
}
43+
} catch (Exception e) {
44+
// If reflection fails, fall back to original behavior
45+
}
46+
47+
source.subscribe(observer);
48+
}
49+
}

instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.reactivex.schedulers.Schedulers;
3131
import java.util.Comparator;
3232
import java.util.List;
33+
import java.util.concurrent.CountDownLatch;
3334
import java.util.concurrent.TimeUnit;
3435
import java.util.concurrent.atomic.AtomicReference;
3536
import java.util.function.Consumer;
@@ -342,7 +343,8 @@ void basicObservable() {
342343
}
343344

344345
@Test
345-
void basicObservableFromCallable() {
346+
void observableFromCallableContextPropagation() throws InterruptedException {
347+
CountDownLatch latch = new CountDownLatch(1);
346348
AtomicReference<String> traceId = new AtomicReference<>();
347349
AtomicReference<String> innerObservableTraceId = new AtomicReference<>();
348350
AtomicReference<String> endObservableTraceId = new AtomicReference<>();
@@ -359,11 +361,14 @@ void basicObservableFromCallable() {
359361
.subscribeOn(Schedulers.io())
360362
.observeOn(Schedulers.single())
361363
.subscribe(
362-
data ->
363-
endObservableTraceId.set(Span.current().getSpanContext().getTraceId()));
364+
data -> {
365+
endObservableTraceId.set(Span.current().getSpanContext().getTraceId());
366+
latch.countDown();
367+
});
364368
assertThat(unused).isNotNull();
365369
});
366370

371+
latch.await();
367372
assertThat(innerObservableTraceId.get()).isEqualTo(traceId.get());
368373
assertThat(endObservableTraceId.get()).isEqualTo(traceId.get());
369374
}

0 commit comments

Comments
 (0)