Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public final class TracingAssembly {
private static Function<? super ParallelFlowable, ? extends ParallelFlowable>
oldOnParallelAssembly;

@SuppressWarnings("rawtypes")
@GuardedBy("TracingAssembly.class")
@Nullable
private static Function<? super Observable, ? extends Observable> oldOnObservableAssembly;

@GuardedBy("TracingAssembly.class")
private static boolean enabled;

Expand All @@ -118,6 +123,8 @@ public void enable() {

enableObservable();

enableObservableAssembly();

enableCompletable();

enableSingle();
Expand All @@ -142,6 +149,8 @@ public void disable() {

disableObservable();

disableObservableAssembly();

disableCompletable();

disableSingle();
Expand Down Expand Up @@ -219,6 +228,24 @@ private static void enableObservable() {
}
}

@GuardedBy("TracingAssembly.class")
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableObservableAssembly() {
oldOnObservableAssembly = RxJavaPlugins.getOnObservableAssembly();
RxJavaPlugins.setOnObservableAssembly(
compose(
oldOnObservableAssembly,
observable -> {
if (observable
.getClass()
.getName()
.equals("io.reactivex.internal.operators.observable.ObservableFromCallable")) {
return new TracingCallableObservable(observable, Context.current());
}
return observable;
}));
}

@GuardedBy("TracingAssembly.class")
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableSingle() {
Expand Down Expand Up @@ -274,6 +301,12 @@ private static void disableObservable() {
oldOnObservableSubscribe = null;
}

@GuardedBy("TracingAssembly.class")
private static void disableObservableAssembly() {
RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly);
oldOnObservableAssembly = null;
}

@GuardedBy("TracingAssembly.class")
private static void disableCompletable() {
RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rxjava.v2_0;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.Observable;
import io.reactivex.Observer;
import java.lang.reflect.Field;
import java.util.concurrent.Callable;

/** Wraps an ObservableFromCallable to ensure context propagation for the callable execution. */
final class TracingCallableObservable<T> extends Observable<T> {
private final Observable<T> source;
private final Context context;

TracingCallableObservable(Observable<T> source, Context context) {
this.source = source;
this.context = context;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
try {
Field callableField = source.getClass().getDeclaredField("callable");
callableField.setAccessible(true);
@SuppressWarnings("unchecked")
Callable<T> originalCallable = (Callable<T>) callableField.get(source);

if (originalCallable != null) {
Callable<T> wrappedCallable =
() -> {
try (Scope ignored = context.makeCurrent()) {
return originalCallable.call();
}
};

callableField.set(source, wrappedCallable);
}
} catch (Exception e) {
// If reflection fails, fall back to original behavior
}

source.subscribe(observer);
}
}
Loading
Loading