Skip to content

Commit dbd56ae

Browse files
committed
simplify instrumentation, fix other versions of rxjava
1 parent 18d6f4e commit dbd56ae

File tree

5 files changed

+101
-63
lines changed

5 files changed

+101
-63
lines changed

instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -229,21 +229,16 @@ private static void enableObservable() {
229229
}
230230

231231
@GuardedBy("TracingAssembly.class")
232-
@SuppressWarnings({"rawtypes", "unchecked"})
233232
private static void enableObservableAssembly() {
234-
oldOnObservableAssembly = RxJavaPlugins.getOnObservableAssembly();
235-
RxJavaPlugins.setOnObservableAssembly(
236-
compose(
237-
observable -> {
238-
if (observable
239-
.getClass()
240-
.getName()
241-
.equals("io.reactivex.internal.operators.observable.ObservableFromCallable")) {
242-
return new TracingCallableObservable(observable, Context.current());
243-
}
244-
return observable;
245-
},
246-
oldOnObservableAssembly));
233+
RxJavaPlugins.setScheduleHandler(
234+
runnable -> {
235+
Context context = Context.current();
236+
return () -> {
237+
try (Scope ignored = context.makeCurrent()) {
238+
runnable.run();
239+
}
240+
};
241+
});
247242
}
248243

249244
@GuardedBy("TracingAssembly.class")

instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingCallableObservable.java

Lines changed: 0 additions & 49 deletions
This file was deleted.

instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1212

1313
import io.opentelemetry.api.common.AttributeKey;
14+
import io.opentelemetry.api.trace.Span;
1415
import io.opentelemetry.api.trace.SpanKind;
1516
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
1617
import io.opentelemetry.instrumentation.testing.util.ThrowingRunnable;
@@ -23,14 +24,18 @@
2324
import io.reactivex.rxjava3.core.Observable;
2425
import io.reactivex.rxjava3.core.Scheduler;
2526
import io.reactivex.rxjava3.core.Single;
27+
import io.reactivex.rxjava3.disposables.Disposable;
2628
import io.reactivex.rxjava3.internal.operators.flowable.FlowablePublish;
2729
import io.reactivex.rxjava3.internal.operators.observable.ObservablePublish;
2830
import io.reactivex.rxjava3.schedulers.Schedulers;
2931
import java.util.Comparator;
3032
import java.util.List;
33+
import java.util.concurrent.CountDownLatch;
3134
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.atomic.AtomicReference;
3236
import java.util.function.Consumer;
3337
import java.util.stream.Stream;
38+
import org.assertj.core.api.Assertions;
3439
import org.junit.jupiter.api.Test;
3540
import org.junit.jupiter.api.TestInstance;
3641
import org.junit.jupiter.params.ParameterizedTest;
@@ -224,6 +229,37 @@ public void twoOperationsFlowable() {
224229
.hasParent(trace.getSpan(0))));
225230
}
226231

232+
@Test
233+
void observableFromCallableContextPropagation() throws InterruptedException {
234+
CountDownLatch latch = new CountDownLatch(1);
235+
AtomicReference<String> traceId = new AtomicReference<>();
236+
AtomicReference<String> innerObservableTraceId = new AtomicReference<>();
237+
AtomicReference<String> endObservableTraceId = new AtomicReference<>();
238+
239+
createParentSpan(
240+
() -> {
241+
traceId.set(Span.current().getSpanContext().getTraceId());
242+
Disposable unused =
243+
Observable.fromCallable(
244+
() -> {
245+
innerObservableTraceId.set(Span.current().getSpanContext().getTraceId());
246+
return "success";
247+
})
248+
.subscribeOn(Schedulers.io())
249+
.observeOn(Schedulers.single())
250+
.subscribe(
251+
data -> {
252+
endObservableTraceId.set(Span.current().getSpanContext().getTraceId());
253+
latch.countDown();
254+
});
255+
assertThat(unused).isNotNull();
256+
});
257+
258+
latch.await();
259+
Assertions.assertThat(innerObservableTraceId.get()).isEqualTo(traceId.get());
260+
Assertions.assertThat(endObservableTraceId.get()).isEqualTo(traceId.get());
261+
}
262+
227263
@Test
228264
public void delayedFlowable() {
229265
List<Integer> result =

instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public final class TracingAssembly {
6666
private static BiFunction<? super Observable, ? super Observer, ? extends Observer>
6767
oldOnObservableSubscribe;
6868

69+
@SuppressWarnings("rawtypes")
70+
@GuardedBy("TracingAssembly.class")
71+
@Nullable
72+
private static Function<? super Observable, ? extends Observable> oldOnObservableAssembly;
73+
6974
@SuppressWarnings("rawtypes")
7075
@GuardedBy("TracingAssembly.class")
7176
@Nullable
@@ -122,6 +127,8 @@ public void enable() {
122127

123128
enableObservable();
124129

130+
enableObservableAssembly();
131+
125132
enableCompletable();
126133

127134
enableSingle();
@@ -146,6 +153,8 @@ public void disable() {
146153

147154
disableObservable();
148155

156+
disableObservableAssembly();
157+
149158
disableCompletable();
150159

151160
disableSingle();
@@ -221,6 +230,19 @@ private static void enableObservable() {
221230
}));
222231
}
223232

233+
@GuardedBy("TracingAssembly.class")
234+
private static void enableObservableAssembly() {
235+
RxJavaPlugins.setScheduleHandler(
236+
runnable -> {
237+
Context context = Context.current();
238+
return () -> {
239+
try (Scope ignored = context.makeCurrent()) {
240+
runnable.run();
241+
}
242+
};
243+
});
244+
}
245+
224246
@GuardedBy("TracingAssembly.class")
225247
@SuppressWarnings({"rawtypes", "unchecked"})
226248
private static void enableSingle() {
@@ -276,6 +298,12 @@ private static void disableObservable() {
276298
oldOnObservableSubscribe = null;
277299
}
278300

301+
@GuardedBy("TracingAssembly.class")
302+
private static void disableObservableAssembly() {
303+
RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly);
304+
oldOnObservableAssembly = null;
305+
}
306+
279307
@GuardedBy("TracingAssembly.class")
280308
private static void disableCompletable() {
281309
RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe);

instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public final class TracingAssembly {
6666
private static BiFunction<? super Observable, ? super Observer, ? extends Observer>
6767
oldOnObservableSubscribe;
6868

69+
@SuppressWarnings("rawtypes")
70+
@GuardedBy("TracingAssembly.class")
71+
@Nullable
72+
private static Function<? super Observable, ? extends Observable> oldOnObservableAssembly;
73+
6974
@SuppressWarnings("rawtypes")
7075
@GuardedBy("TracingAssembly.class")
7176
@Nullable
@@ -122,6 +127,8 @@ public void enable() {
122127

123128
enableObservable();
124129

130+
enableObservableAssembly();
131+
125132
enableCompletable();
126133

127134
enableSingle();
@@ -146,6 +153,8 @@ public void disable() {
146153

147154
disableObservable();
148155

156+
disableObservableAssembly();
157+
149158
disableCompletable();
150159

151160
disableSingle();
@@ -186,6 +195,19 @@ private static void enableCompletable() {
186195
}));
187196
}
188197

198+
@GuardedBy("TracingAssembly.class")
199+
private static void enableObservableAssembly() {
200+
RxJavaPlugins.setScheduleHandler(
201+
runnable -> {
202+
Context context = Context.current();
203+
return () -> {
204+
try (Scope ignored = context.makeCurrent()) {
205+
runnable.run();
206+
}
207+
};
208+
});
209+
}
210+
189211
@GuardedBy("TracingAssembly.class")
190212
@SuppressWarnings({"rawtypes", "unchecked"})
191213
private static void enableFlowable() {
@@ -276,6 +298,12 @@ private static void disableObservable() {
276298
oldOnObservableSubscribe = null;
277299
}
278300

301+
@GuardedBy("TracingAssembly.class")
302+
private static void disableObservableAssembly() {
303+
RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly);
304+
oldOnObservableAssembly = null;
305+
}
306+
279307
@GuardedBy("TracingAssembly.class")
280308
private static void disableCompletable() {
281309
RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe);

0 commit comments

Comments
 (0)