Skip to content

Commit 71ff0ff

Browse files
committed
support wrapper chaining
1 parent d4d2a40 commit 71ff0ff

File tree

5 files changed

+157
-18
lines changed

5 files changed

+157
-18
lines changed

instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ public final class TracingAssembly {
9393
private static Function<? super ParallelFlowable, ? extends ParallelFlowable>
9494
oldOnParallelAssembly;
9595

96+
@GuardedBy("TracingAssembly.class")
97+
@Nullable
98+
private static Function<? super Runnable, ? extends Runnable> oldScheduleHandler;
99+
96100
@GuardedBy("TracingAssembly.class")
97101
private static boolean enabled;
98102

@@ -225,14 +229,20 @@ private static void enableObservable() {
225229

226230
@GuardedBy("TracingAssembly.class")
227231
private static void enableObservableAssembly() {
232+
oldScheduleHandler = RxJavaPlugins.getScheduleHandler();
228233
RxJavaPlugins.setScheduleHandler(
229234
runnable -> {
230235
Context context = Context.current();
231-
return () -> {
232-
try (Scope ignored = context.makeCurrent()) {
233-
runnable.run();
234-
}
235-
};
236+
Runnable wrappedRunnable =
237+
() -> {
238+
try (Scope ignored = context.makeCurrent()) {
239+
runnable.run();
240+
}
241+
};
242+
// If there was a previous handler, apply it to our wrapped runnable
243+
return oldScheduleHandler != null
244+
? oldScheduleHandler.apply(wrappedRunnable)
245+
: wrappedRunnable;
236246
});
237247
}
238248

@@ -293,7 +303,8 @@ private static void disableObservable() {
293303

294304
@GuardedBy("TracingAssembly.class")
295305
private static void disableObservableAssembly() {
296-
RxJavaPlugins.setScheduleHandler(null);
306+
RxJavaPlugins.setScheduleHandler(oldScheduleHandler);
307+
oldScheduleHandler = null;
297308
}
298309

299310
@GuardedBy("TracingAssembly.class")

instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@
2525
import io.reactivex.Scheduler;
2626
import io.reactivex.Single;
2727
import io.reactivex.disposables.Disposable;
28+
import io.reactivex.functions.Function;
2829
import io.reactivex.internal.operators.flowable.FlowablePublish;
2930
import io.reactivex.internal.operators.observable.ObservablePublish;
31+
import io.reactivex.plugins.RxJavaPlugins;
3032
import io.reactivex.schedulers.Schedulers;
3133
import java.util.Comparator;
3234
import java.util.List;
3335
import java.util.concurrent.CountDownLatch;
3436
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.concurrent.atomic.AtomicInteger;
3539
import java.util.concurrent.atomic.AtomicReference;
3640
import java.util.function.Consumer;
3741
import java.util.stream.Stream;
@@ -886,4 +890,53 @@ void maybeMultipleTraceChains(Scheduler scheduler) {
886890
assertions);
887891
testing().clearData();
888892
}
893+
894+
@Test
895+
void scheduleHandlerChainingPreservesExistingHandler() throws InterruptedException {
896+
AtomicInteger customHandlerCallCount = new AtomicInteger(0);
897+
AtomicBoolean customHandlerExecuted = new AtomicBoolean(false);
898+
899+
Function<? super Runnable, ? extends Runnable> originalHandler =
900+
RxJavaPlugins.getScheduleHandler();
901+
Function<Runnable, Runnable> customHandler =
902+
runnable ->
903+
() -> {
904+
customHandlerCallCount.incrementAndGet();
905+
customHandlerExecuted.set(true);
906+
runnable.run();
907+
};
908+
909+
try {
910+
RxJavaPlugins.setScheduleHandler(customHandler);
911+
912+
CountDownLatch latch = new CountDownLatch(1);
913+
AtomicBoolean observableExecuted = new AtomicBoolean(false);
914+
AtomicReference<String> traceId = new AtomicReference<>();
915+
916+
createParentSpan(
917+
() -> {
918+
traceId.set(Span.current().getSpanContext().getTraceId());
919+
Disposable unused =
920+
Observable.fromCallable(
921+
() -> {
922+
observableExecuted.set(true);
923+
return "test";
924+
})
925+
.subscribeOn(Schedulers.io())
926+
.subscribe(result -> latch.countDown());
927+
});
928+
929+
latch.await();
930+
assertThat(observableExecuted.get()).isTrue();
931+
assertThat(customHandlerExecuted.get()).isTrue();
932+
assertThat(customHandlerCallCount.get()).isGreaterThan(0);
933+
934+
assertThat(traceId.get()).isNotNull();
935+
assertThat(traceId.get()).isNotEqualTo("00000000000000000000000000000000");
936+
937+
} finally {
938+
// Restore original handler to avoid affecting other tests
939+
RxJavaPlugins.setScheduleHandler(originalHandler);
940+
}
941+
}
889942
}

instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@
2525
import io.reactivex.rxjava3.core.Scheduler;
2626
import io.reactivex.rxjava3.core.Single;
2727
import io.reactivex.rxjava3.disposables.Disposable;
28+
import io.reactivex.rxjava3.functions.Function;
2829
import io.reactivex.rxjava3.internal.operators.flowable.FlowablePublish;
2930
import io.reactivex.rxjava3.internal.operators.observable.ObservablePublish;
31+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
3032
import io.reactivex.rxjava3.schedulers.Schedulers;
3133
import java.util.Comparator;
3234
import java.util.List;
3335
import java.util.concurrent.CountDownLatch;
3436
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.concurrent.atomic.AtomicInteger;
3539
import java.util.concurrent.atomic.AtomicReference;
3640
import java.util.function.Consumer;
3741
import java.util.stream.Stream;
@@ -887,4 +891,53 @@ void maybeMultipleTraceChains(Scheduler scheduler) {
887891
assertions);
888892
testing().clearData();
889893
}
894+
895+
@Test
896+
void scheduleHandlerChainingPreservesExistingHandler() throws InterruptedException {
897+
AtomicInteger customHandlerCallCount = new AtomicInteger(0);
898+
AtomicBoolean customHandlerExecuted = new AtomicBoolean(false);
899+
900+
Function<? super Runnable, ? extends Runnable> originalHandler =
901+
RxJavaPlugins.getScheduleHandler();
902+
Function<Runnable, Runnable> customHandler =
903+
runnable ->
904+
() -> {
905+
customHandlerCallCount.incrementAndGet();
906+
customHandlerExecuted.set(true);
907+
runnable.run();
908+
};
909+
910+
try {
911+
RxJavaPlugins.setScheduleHandler(customHandler);
912+
913+
CountDownLatch latch = new CountDownLatch(1);
914+
AtomicBoolean observableExecuted = new AtomicBoolean(false);
915+
AtomicReference<String> traceId = new AtomicReference<>();
916+
917+
createParentSpan(
918+
() -> {
919+
traceId.set(Span.current().getSpanContext().getTraceId());
920+
Disposable unused =
921+
Observable.fromCallable(
922+
() -> {
923+
observableExecuted.set(true);
924+
return "test";
925+
})
926+
.subscribeOn(Schedulers.io())
927+
.subscribe(result -> latch.countDown());
928+
});
929+
930+
latch.await();
931+
assertThat(observableExecuted.get()).isTrue();
932+
assertThat(customHandlerExecuted.get()).isTrue();
933+
assertThat(customHandlerCallCount.get()).isGreaterThan(0);
934+
935+
assertThat(traceId.get()).isNotNull();
936+
assertThat(traceId.get()).isNotEqualTo("00000000000000000000000000000000");
937+
938+
} finally {
939+
// Restore original handler to avoid affecting other tests
940+
RxJavaPlugins.setScheduleHandler(originalHandler);
941+
}
942+
}
890943
}

instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ public final class TracingAssembly {
9797
private static Function<? super ParallelFlowable, ? extends ParallelFlowable>
9898
oldOnParallelAssembly;
9999

100+
@GuardedBy("TracingAssembly.class")
101+
@Nullable
102+
private static Function<? super Runnable, ? extends Runnable> oldScheduleHandler;
103+
100104
@GuardedBy("TracingAssembly.class")
101105
private static boolean enabled;
102106

@@ -227,14 +231,20 @@ private static void enableObservable() {
227231

228232
@GuardedBy("TracingAssembly.class")
229233
private static void enableObservableAssembly() {
234+
oldScheduleHandler = RxJavaPlugins.getScheduleHandler();
230235
RxJavaPlugins.setScheduleHandler(
231236
runnable -> {
232237
Context context = Context.current();
233-
return () -> {
234-
try (Scope ignored = context.makeCurrent()) {
235-
runnable.run();
236-
}
237-
};
238+
Runnable wrappedRunnable =
239+
() -> {
240+
try (Scope ignored = context.makeCurrent()) {
241+
runnable.run();
242+
}
243+
};
244+
// If there was a previous handler, apply it to our wrapped runnable
245+
return oldScheduleHandler != null
246+
? oldScheduleHandler.apply(wrappedRunnable)
247+
: wrappedRunnable;
238248
});
239249
}
240250

@@ -295,7 +305,8 @@ private static void disableObservable() {
295305

296306
@GuardedBy("TracingAssembly.class")
297307
private static void disableObservableAssembly() {
298-
RxJavaPlugins.setScheduleHandler(null);
308+
RxJavaPlugins.setScheduleHandler(oldScheduleHandler);
309+
oldScheduleHandler = null;
299310
}
300311

301312
@GuardedBy("TracingAssembly.class")

instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ public final class TracingAssembly {
9797
private static Function<? super ParallelFlowable, ? extends ParallelFlowable>
9898
oldOnParallelAssembly;
9999

100+
@GuardedBy("TracingAssembly.class")
101+
@Nullable
102+
private static Function<? super Runnable, ? extends Runnable> oldScheduleHandler;
103+
100104
@GuardedBy("TracingAssembly.class")
101105
private static boolean enabled;
102106

@@ -192,14 +196,20 @@ private static void enableCompletable() {
192196

193197
@GuardedBy("TracingAssembly.class")
194198
private static void enableObservableAssembly() {
199+
oldScheduleHandler = RxJavaPlugins.getScheduleHandler();
195200
RxJavaPlugins.setScheduleHandler(
196201
runnable -> {
197202
Context context = Context.current();
198-
return () -> {
199-
try (Scope ignored = context.makeCurrent()) {
200-
runnable.run();
201-
}
202-
};
203+
Runnable wrappedRunnable =
204+
() -> {
205+
try (Scope ignored = context.makeCurrent()) {
206+
runnable.run();
207+
}
208+
};
209+
// If there was a previous handler, apply it to our wrapped runnable
210+
return oldScheduleHandler != null
211+
? oldScheduleHandler.apply(wrappedRunnable)
212+
: wrappedRunnable;
203213
});
204214
}
205215

@@ -295,7 +305,8 @@ private static void disableObservable() {
295305

296306
@GuardedBy("TracingAssembly.class")
297307
private static void disableObservableAssembly() {
298-
RxJavaPlugins.setScheduleHandler(null);
308+
RxJavaPlugins.setScheduleHandler(oldScheduleHandler);
309+
oldScheduleHandler = null;
299310
}
300311

301312
@GuardedBy("TracingAssembly.class")

0 commit comments

Comments
 (0)