Skip to content

Commit fecf7a6

Browse files
committed
rxjava
1 parent beba51e commit fecf7a6

File tree

3 files changed

+116
-1
lines changed

3 files changed

+116
-1
lines changed

instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ public final class TracingAssembly {
6262
private static BiFunction<? super Observable, ? super Observer, ? extends Observer>
6363
oldOnObservableSubscribe;
6464

65+
@SuppressWarnings("rawtypes")
66+
@GuardedBy("TracingAssembly.class")
67+
@Nullable
68+
static volatile Function<? super Observable, ? extends Observable> oldOnObservableAssembly;
69+
70+
6571
@SuppressWarnings("rawtypes")
6672
@GuardedBy("TracingAssembly.class")
6773
@Nullable
@@ -118,6 +124,8 @@ public void enable() {
118124

119125
enableObservable();
120126

127+
enableObservableAssembly();
128+
121129
enableCompletable();
122130

123131
enableSingle();
@@ -142,6 +150,8 @@ public void disable() {
142150

143151
disableObservable();
144152

153+
disableObservableAssembly();
154+
145155
disableCompletable();
146156

147157
disableSingle();
@@ -219,6 +229,18 @@ private static void enableObservable() {
219229
}
220230
}
221231

232+
@GuardedBy("TracingAssembly.class")
233+
@SuppressWarnings({"rawtypes", "unchecked"})
234+
private static void enableObservableAssembly() {
235+
if (TracingObserver.canEnable()) {
236+
oldOnObservableAssembly = RxJavaPlugins.getOnObservableAssembly();
237+
RxJavaPlugins.setOnObservableAssembly(
238+
compose(
239+
oldOnObservableAssembly,
240+
observable -> new TracingObservable(observable, Context.current())));
241+
}
242+
}
243+
222244
@GuardedBy("TracingAssembly.class")
223245
@SuppressWarnings({"rawtypes", "unchecked"})
224246
private static void enableSingle() {
@@ -274,6 +296,12 @@ private static void disableObservable() {
274296
oldOnObservableSubscribe = null;
275297
}
276298

299+
@GuardedBy("TracingAssembly.class")
300+
private static void disableObservableAssembly() {
301+
RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly);
302+
oldOnObservableAssembly = null;
303+
}
304+
277305
@GuardedBy("TracingAssembly.class")
278306
private static void disableCompletable() {
279307
RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.rxjava.v2_0;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import io.reactivex.Observable;
11+
import io.reactivex.Observer;
12+
13+
final class TracingObservable<T> extends Observable<T> {
14+
private final Observable<T> source;
15+
private final Context context;
16+
17+
TracingObservable(Observable<T> source, Context context) {
18+
this.source = source;
19+
this.context = context;
20+
System.out.println("TracingObservable created with context: " + context);
21+
}
22+
23+
@Override
24+
protected void subscribeActual(Observer<? super T> observer) {
25+
System.out.println("TracingObservable.subscribeActual called with context: " + context);
26+
System.out.println("Current context before making current: " + Context.current());
27+
28+
try (Scope ignored = context.makeCurrent()) {
29+
System.out.println("Current context after making current: " + Context.current());
30+
31+
// Don't double-wrap if the observer is already a TracingObserver
32+
if (observer instanceof TracingObserver) {
33+
System.out.println("Observer is already TracingObserver, not wrapping");
34+
source.subscribe(observer);
35+
} else {
36+
System.out.println("Wrapping observer in TracingObserver");
37+
source.subscribe(new TracingObserver<>(observer, context));
38+
}
39+
}
40+
}
41+
}

instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.assertj.core.api.Assertions.assertThat;
1111
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1212

13+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
1314
import io.opentelemetry.api.common.AttributeKey;
1415
import io.opentelemetry.api.trace.SpanKind;
1516
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
@@ -23,6 +24,7 @@
2324
import io.reactivex.Observable;
2425
import io.reactivex.Scheduler;
2526
import io.reactivex.Single;
27+
import io.reactivex.disposables.Disposable;
2628
import io.reactivex.internal.operators.flowable.FlowablePublish;
2729
import io.reactivex.internal.operators.observable.ObservablePublish;
2830
import io.reactivex.schedulers.Schedulers;
@@ -338,6 +340,29 @@ public void basicObservable() {
338340
.hasParent(trace.getSpan(0))));
339341
}
340342

343+
344+
@Test
345+
@SuppressWarnings("CanIgnoreReturnValue")
346+
public void basicObservableFromCallable() {
347+
Observable<String> test = createParentSpan(() -> Observable.fromCallable(() -> "success"));
348+
assertThat(test).isNotNull();
349+
350+
// Actually subscribe to trigger the Observable execution
351+
String result = test.blockingFirst();
352+
assertThat(result).isEqualTo("success");
353+
354+
testing()
355+
.waitAndAssertTraces(
356+
trace ->
357+
trace.hasSpansSatisfyingExactly(
358+
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
359+
span ->
360+
span.hasName(ADD_ONE)
361+
.hasKind(SpanKind.INTERNAL)
362+
.hasParent(trace.getSpan(0))));
363+
}
364+
365+
341366
@Test
342367
public void connectableFlowable() {
343368
List<Integer> result =
@@ -779,8 +804,29 @@ public void observableChainHasSubscriptionContext() {
779804
.hasParent(trace.getSpan(0))));
780805
}
781806

782-
@ParameterizedTest
807+
@Test
808+
public void basicObservableFromCallableTest() {
809+
Disposable disposable = Observable.fromCallable(() -> "success")
810+
.onErrorReturnItem("")
811+
.subscribeOn(Schedulers.io())
812+
.observeOn(Schedulers.single())
813+
.subscribe(data -> System.out.print("done"));
814+
815+
assertThat(disposable).isNotNull();
816+
817+
testing()
818+
.waitAndAssertTraces(
819+
trace ->
820+
trace.hasSpansSatisfyingExactly(
821+
span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(),
822+
span ->
823+
span.hasName(ADD_ONE)
824+
.hasKind(SpanKind.INTERNAL)
825+
.hasParent(trace.getSpan(0))));
826+
}
827+
783828
@MethodSource("schedulers")
829+
@ParameterizedTest
784830
public void flowableMultiResults(Scheduler scheduler) {
785831
List<Integer> result =
786832
testing()

0 commit comments

Comments
 (0)