Skip to content

Commit 909bd48

Browse files
authored
Fix rxjava context propagation for ObservableFromCallable (#14393)
1 parent 2df0a1e commit 909bd48

File tree

5 files changed

+345
-69
lines changed

5 files changed

+345
-69
lines changed

instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ public final class TracingAssembly {
9393
private static Function<? super ParallelFlowable, ? extends ParallelFlowable>
9494
oldOnParallelAssembly;
9595

96+
@GuardedBy("TracingAssembly.class")
97+
@Nullable
98+
private static Function<? super Runnable, ? extends Runnable> oldScheduleHandler;
99+
96100
@GuardedBy("TracingAssembly.class")
97101
private static boolean enabled;
98102

@@ -118,6 +122,8 @@ public void enable() {
118122

119123
enableObservable();
120124

125+
enableWrappedScheduleHandler();
126+
121127
enableCompletable();
122128

123129
enableSingle();
@@ -142,6 +148,8 @@ public void disable() {
142148

143149
disableObservable();
144150

151+
disableWrappedScheduleHandler();
152+
145153
disableCompletable();
146154

147155
disableSingle();
@@ -219,6 +227,25 @@ private static void enableObservable() {
219227
}
220228
}
221229

230+
@GuardedBy("TracingAssembly.class")
231+
private static void enableWrappedScheduleHandler() {
232+
oldScheduleHandler = RxJavaPlugins.getScheduleHandler();
233+
RxJavaPlugins.setScheduleHandler(
234+
runnable -> {
235+
Context context = Context.current();
236+
Runnable wrappedRunnable =
237+
() -> {
238+
try (Scope ignored = context.makeCurrent()) {
239+
runnable.run();
240+
}
241+
};
242+
// If there was a previous handler, apply it to our wrapped runnable
243+
return oldScheduleHandler != null
244+
? oldScheduleHandler.apply(wrappedRunnable)
245+
: wrappedRunnable;
246+
});
247+
}
248+
222249
@GuardedBy("TracingAssembly.class")
223250
@SuppressWarnings({"rawtypes", "unchecked"})
224251
private static void enableSingle() {
@@ -274,6 +301,12 @@ private static void disableObservable() {
274301
oldOnObservableSubscribe = null;
275302
}
276303

304+
@GuardedBy("TracingAssembly.class")
305+
private static void disableWrappedScheduleHandler() {
306+
RxJavaPlugins.setScheduleHandler(oldScheduleHandler);
307+
oldScheduleHandler = null;
308+
}
309+
277310
@GuardedBy("TracingAssembly.class")
278311
private static void disableCompletable() {
279312
RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe);

0 commit comments

Comments
 (0)