From fecf7a62094da32c93bc3ec56412eecd4b53169c Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Thu, 7 Aug 2025 10:54:13 -0400 Subject: [PATCH 01/12] rxjava --- .../rxjava/v2_0/TracingAssembly.java | 28 +++++++++++ .../rxjava/v2_0/TracingObservable.java | 41 ++++++++++++++++ .../rxjava/v2_0/AbstractRxJava2Test.java | 48 ++++++++++++++++++- 3 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java index 30a4292366e3..bf85af554dcb 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java @@ -62,6 +62,12 @@ public final class TracingAssembly { private static BiFunction oldOnObservableSubscribe; + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + @Nullable + static volatile Function oldOnObservableAssembly; + + @SuppressWarnings("rawtypes") @GuardedBy("TracingAssembly.class") @Nullable @@ -118,6 +124,8 @@ public void enable() { enableObservable(); + enableObservableAssembly(); + enableCompletable(); enableSingle(); @@ -142,6 +150,8 @@ public void disable() { disableObservable(); + disableObservableAssembly(); + disableCompletable(); disableSingle(); @@ -219,6 +229,18 @@ private static void enableObservable() { } } + @GuardedBy("TracingAssembly.class") + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void enableObservableAssembly() { + if (TracingObserver.canEnable()) { + oldOnObservableAssembly = RxJavaPlugins.getOnObservableAssembly(); + RxJavaPlugins.setOnObservableAssembly( + compose( + oldOnObservableAssembly, + observable -> new TracingObservable(observable, Context.current()))); + } + } + @GuardedBy("TracingAssembly.class") @SuppressWarnings({"rawtypes", "unchecked"}) private static void enableSingle() { @@ -274,6 +296,12 @@ private static void disableObservable() { oldOnObservableSubscribe = null; } + @GuardedBy("TracingAssembly.class") + private static void disableObservableAssembly() { + RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly); + oldOnObservableAssembly = null; + } + @GuardedBy("TracingAssembly.class") private static void disableCompletable() { RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe); diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java new file mode 100644 index 000000000000..4868cb88088f --- /dev/null +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava.v2_0; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.Observable; +import io.reactivex.Observer; + +final class TracingObservable extends Observable { + private final Observable source; + private final Context context; + + TracingObservable(Observable source, Context context) { + this.source = source; + this.context = context; + System.out.println("TracingObservable created with context: " + context); + } + + @Override + protected void subscribeActual(Observer observer) { + System.out.println("TracingObservable.subscribeActual called with context: " + context); + System.out.println("Current context before making current: " + Context.current()); + + try (Scope ignored = context.makeCurrent()) { + System.out.println("Current context after making current: " + Context.current()); + + // Don't double-wrap if the observer is already a TracingObserver + if (observer instanceof TracingObserver) { + System.out.println("Observer is already TracingObserver, not wrapping"); + source.subscribe(observer); + } else { + System.out.println("Wrapping observer in TracingObserver"); + source.subscribe(new TracingObserver<>(observer, context)); + } + } + } +} diff --git a/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java b/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java index 1be2f4112448..48a0de7cfc78 100644 --- a/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java +++ b/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java @@ -10,6 +10,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; @@ -23,6 +24,7 @@ import io.reactivex.Observable; import io.reactivex.Scheduler; import io.reactivex.Single; +import io.reactivex.disposables.Disposable; import io.reactivex.internal.operators.flowable.FlowablePublish; import io.reactivex.internal.operators.observable.ObservablePublish; import io.reactivex.schedulers.Schedulers; @@ -338,6 +340,29 @@ public void basicObservable() { .hasParent(trace.getSpan(0)))); } + + @Test + @SuppressWarnings("CanIgnoreReturnValue") + public void basicObservableFromCallable() { + Observable test = createParentSpan(() -> Observable.fromCallable(() -> "success")); + assertThat(test).isNotNull(); + + // Actually subscribe to trigger the Observable execution + String result = test.blockingFirst(); + assertThat(result).isEqualTo("success"); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(ADD_ONE) + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + + @Test public void connectableFlowable() { List result = @@ -779,8 +804,29 @@ public void observableChainHasSubscriptionContext() { .hasParent(trace.getSpan(0)))); } - @ParameterizedTest + @Test + public void basicObservableFromCallableTest() { + Disposable disposable = Observable.fromCallable(() -> "success") + .onErrorReturnItem("") + .subscribeOn(Schedulers.io()) + .observeOn(Schedulers.single()) + .subscribe(data -> System.out.print("done")); + + assertThat(disposable).isNotNull(); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(ADD_ONE) + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + @MethodSource("schedulers") + @ParameterizedTest public void flowableMultiResults(Scheduler scheduler) { List result = testing() From 76e6387ccfb77da364326c269a9a9b98ffcb0082 Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Thu, 7 Aug 2025 14:18:35 -0400 Subject: [PATCH 02/12] update test --- .../rxjava/v2_0/TracingAssembly.java | 1 - .../rxjava/v2_0/TracingObservable.java | 10 +-- .../rxjava/v2_0/AbstractRxJava2Test.java | 78 +++++++++---------- 3 files changed, 38 insertions(+), 51 deletions(-) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java index bf85af554dcb..7312038c5370 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java @@ -67,7 +67,6 @@ public final class TracingAssembly { @Nullable static volatile Function oldOnObservableAssembly; - @SuppressWarnings("rawtypes") @GuardedBy("TracingAssembly.class") @Nullable diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java index 4868cb88088f..f6d1b11d3138 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java @@ -17,23 +17,15 @@ final class TracingObservable extends Observable { TracingObservable(Observable source, Context context) { this.source = source; this.context = context; - System.out.println("TracingObservable created with context: " + context); } @Override protected void subscribeActual(Observer observer) { - System.out.println("TracingObservable.subscribeActual called with context: " + context); - System.out.println("Current context before making current: " + Context.current()); - try (Scope ignored = context.makeCurrent()) { - System.out.println("Current context after making current: " + Context.current()); - - // Don't double-wrap if the observer is already a TracingObserver + // Don't double-wrap if already a TracingObserver if (observer instanceof TracingObserver) { - System.out.println("Observer is already TracingObserver, not wrapping"); source.subscribe(observer); } else { - System.out.println("Wrapping observer in TracingObserver"); source.subscribe(new TracingObserver<>(observer, context)); } } diff --git a/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java b/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java index 48a0de7cfc78..697aee668963 100644 --- a/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java +++ b/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java @@ -10,8 +10,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.util.ThrowingRunnable; @@ -30,7 +30,9 @@ import io.reactivex.schedulers.Schedulers; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Stream; import org.junit.jupiter.api.Test; @@ -340,29 +342,44 @@ public void basicObservable() { .hasParent(trace.getSpan(0)))); } - @Test - @SuppressWarnings("CanIgnoreReturnValue") - public void basicObservableFromCallable() { - Observable test = createParentSpan(() -> Observable.fromCallable(() -> "success")); - assertThat(test).isNotNull(); - - // Actually subscribe to trigger the Observable execution - String result = test.blockingFirst(); - assertThat(result).isEqualTo("success"); - + public void basicObservableFromCallable() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference errorRef = new AtomicReference<>(); testing() - .waitAndAssertTraces( - trace -> - trace.hasSpansSatisfyingExactly( - span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(), - span -> - span.hasName(ADD_ONE) - .hasKind(SpanKind.INTERNAL) - .hasParent(trace.getSpan(0)))); + .runWithSpan( + "parent", + () -> { + String traceId = Span.current().getSpanContext().getTraceId(); + + Disposable result = + Observable.fromCallable( + () -> { + assertThat(traceId) + .isEqualTo(Span.current().getSpanContext().getTraceId()); + return "success"; + }) + .subscribeOn(Schedulers.io()) + .observeOn(Schedulers.single()) + .doOnNext( + data -> + assertThat(traceId) + .isEqualTo(Span.current().getSpanContext().getTraceId())) + .subscribe( + data -> countDownLatch.countDown(), + error -> { + errorRef.set(error); + countDownLatch.countDown(); + }); + assertThat(result).isNotNull(); + }); + countDownLatch.await(); + if (errorRef.get() != null) { + throw new AssertionError("Assertion failed in observable thread", errorRef.get()); + } + testing().waitForTraces(1); } - @Test public void connectableFlowable() { List result = @@ -804,27 +821,6 @@ public void observableChainHasSubscriptionContext() { .hasParent(trace.getSpan(0)))); } - @Test - public void basicObservableFromCallableTest() { - Disposable disposable = Observable.fromCallable(() -> "success") - .onErrorReturnItem("") - .subscribeOn(Schedulers.io()) - .observeOn(Schedulers.single()) - .subscribe(data -> System.out.print("done")); - - assertThat(disposable).isNotNull(); - - testing() - .waitAndAssertTraces( - trace -> - trace.hasSpansSatisfyingExactly( - span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(), - span -> - span.hasName(ADD_ONE) - .hasKind(SpanKind.INTERNAL) - .hasParent(trace.getSpan(0)))); - } - @MethodSource("schedulers") @ParameterizedTest public void flowableMultiResults(Scheduler scheduler) { From b7d0890f94cfa6df412e06ade173eba887c4e616 Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Thu, 7 Aug 2025 14:56:05 -0400 Subject: [PATCH 03/12] undo --- .../rxjava/v2_0/TracingAssembly.java | 27 ---- .../rxjava/v2_0/AbstractRxJava2Test.java | 132 ++++++++---------- 2 files changed, 60 insertions(+), 99 deletions(-) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java index 7312038c5370..30a4292366e3 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java @@ -62,11 +62,6 @@ public final class TracingAssembly { private static BiFunction oldOnObservableSubscribe; - @SuppressWarnings("rawtypes") - @GuardedBy("TracingAssembly.class") - @Nullable - static volatile Function oldOnObservableAssembly; - @SuppressWarnings("rawtypes") @GuardedBy("TracingAssembly.class") @Nullable @@ -123,8 +118,6 @@ public void enable() { enableObservable(); - enableObservableAssembly(); - enableCompletable(); enableSingle(); @@ -149,8 +142,6 @@ public void disable() { disableObservable(); - disableObservableAssembly(); - disableCompletable(); disableSingle(); @@ -228,18 +219,6 @@ private static void enableObservable() { } } - @GuardedBy("TracingAssembly.class") - @SuppressWarnings({"rawtypes", "unchecked"}) - private static void enableObservableAssembly() { - if (TracingObserver.canEnable()) { - oldOnObservableAssembly = RxJavaPlugins.getOnObservableAssembly(); - RxJavaPlugins.setOnObservableAssembly( - compose( - oldOnObservableAssembly, - observable -> new TracingObservable(observable, Context.current()))); - } - } - @GuardedBy("TracingAssembly.class") @SuppressWarnings({"rawtypes", "unchecked"}) private static void enableSingle() { @@ -295,12 +274,6 @@ private static void disableObservable() { oldOnObservableSubscribe = null; } - @GuardedBy("TracingAssembly.class") - private static void disableObservableAssembly() { - RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly); - oldOnObservableAssembly = null; - } - @GuardedBy("TracingAssembly.class") private static void disableCompletable() { RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe); diff --git a/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java b/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java index 697aee668963..a0199b16db6d 100644 --- a/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java +++ b/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java @@ -30,7 +30,6 @@ import io.reactivex.schedulers.Schedulers; import java.util.Comparator; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -95,7 +94,7 @@ public void onComplete() {} } @Test - public void basicMaybe() { + void basicMaybe() { int result = createParentSpan(() -> Maybe.just(1).map(this::addOne).blockingGet()); assertThat(result).isEqualTo(2); testing() @@ -110,7 +109,7 @@ public void basicMaybe() { } @Test - public void twoOperationsMaybe() { + void twoOperationsMaybe() { int result = createParentSpan(() -> Maybe.just(2).map(this::addOne).map(this::addOne).blockingGet()); assertThat(result).isEqualTo(4); @@ -130,7 +129,7 @@ public void twoOperationsMaybe() { } @Test - public void delayedMaybe() { + void delayedMaybe() { int result = createParentSpan( () -> Maybe.just(3).delay(100, TimeUnit.MILLISECONDS).map(this::addOne).blockingGet()); @@ -147,7 +146,7 @@ public void delayedMaybe() { } @Test - public void delayedTwiceMaybe() { + void delayedTwiceMaybe() { int result = createParentSpan( () -> @@ -174,7 +173,7 @@ public void delayedTwiceMaybe() { } @Test - public void basicFlowable() { + void basicFlowable() { Iterable result = createParentSpan( () -> Flowable.fromIterable(asList(5, 6)).map(this::addOne).toList().blockingGet()); @@ -195,7 +194,7 @@ public void basicFlowable() { } @Test - public void twoOperationsFlowable() { + void twoOperationsFlowable() { List result = createParentSpan( () -> @@ -229,7 +228,7 @@ public void twoOperationsFlowable() { } @Test - public void delayedFlowable() { + void delayedFlowable() { List result = createParentSpan( () -> @@ -255,7 +254,7 @@ public void delayedFlowable() { } @Test - public void delayedTwiceFlowable() { + void delayedTwiceFlowable() { List result = createParentSpan( () -> @@ -291,7 +290,7 @@ public void delayedTwiceFlowable() { } @Test - public void maybeFromCallable() { + void maybeFromCallable() { Integer result = createParentSpan( () -> Maybe.fromCallable(() -> addOne(10)).map(this::addOne).blockingGet()); @@ -312,7 +311,7 @@ public void maybeFromCallable() { } @Test - public void basicSingle() { + void basicSingle() { Integer result = createParentSpan(() -> Single.just(0).map(this::addOne).blockingGet()); assertThat(result).isEqualTo(1); testing() @@ -327,7 +326,7 @@ public void basicSingle() { } @Test - public void basicObservable() { + void basicObservable() { List result = createParentSpan(() -> Observable.just(0).map(this::addOne).toList().blockingGet()); assertThat(result).contains(1); @@ -343,45 +342,34 @@ public void basicObservable() { } @Test - public void basicObservableFromCallable() throws InterruptedException { - CountDownLatch countDownLatch = new CountDownLatch(1); - AtomicReference errorRef = new AtomicReference<>(); - testing() - .runWithSpan( - "parent", - () -> { - String traceId = Span.current().getSpanContext().getTraceId(); - - Disposable result = - Observable.fromCallable( - () -> { - assertThat(traceId) - .isEqualTo(Span.current().getSpanContext().getTraceId()); - return "success"; - }) - .subscribeOn(Schedulers.io()) - .observeOn(Schedulers.single()) - .doOnNext( - data -> - assertThat(traceId) - .isEqualTo(Span.current().getSpanContext().getTraceId())) - .subscribe( - data -> countDownLatch.countDown(), - error -> { - errorRef.set(error); - countDownLatch.countDown(); - }); - assertThat(result).isNotNull(); - }); - countDownLatch.await(); - if (errorRef.get() != null) { - throw new AssertionError("Assertion failed in observable thread", errorRef.get()); - } - testing().waitForTraces(1); - } + void basicObservableFromCallable() { + AtomicReference traceId = new AtomicReference<>(); + AtomicReference innerObservableTraceId = new AtomicReference<>(); + AtomicReference endObservableTraceId = new AtomicReference<>(); - @Test - public void connectableFlowable() { + createParentSpan( + () -> { + traceId.set(Span.current().getSpanContext().getTraceId()); + Disposable unused = + Observable.fromCallable( + () -> { + innerObservableTraceId.set(Span.current().getSpanContext().getTraceId()); + return "success"; + }) + .subscribeOn(Schedulers.io()) + .observeOn(Schedulers.single()) + .subscribe( + data -> + endObservableTraceId.set(Span.current().getSpanContext().getTraceId())); + assertThat(unused).isNotNull(); + }); + + assertThat(innerObservableTraceId.get()).isEqualTo(traceId.get()); + assertThat(endObservableTraceId.get()).isEqualTo(traceId.get()); + } + + @Test + void connectableFlowable() { List result = createParentSpan( () -> @@ -403,7 +391,7 @@ public void connectableFlowable() { } @Test - public void connectableObservable() { + void connectableObservable() { List result = createParentSpan( () -> @@ -425,7 +413,7 @@ public void connectableObservable() { } @Test - public void maybeError() { + void maybeError() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy(() -> createParentSpan(() -> Maybe.error(error).blockingGet())) .isEqualTo(error); @@ -437,7 +425,7 @@ public void maybeError() { } @Test - public void flowableError() { + void flowableError() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy(() -> createParentSpan(() -> Flowable.error(error)).toList().blockingGet()) .isEqualTo(error); @@ -449,7 +437,7 @@ public void flowableError() { } @Test - public void singleError() { + void singleError() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy(() -> createParentSpan(() -> Single.error(error)).blockingGet()) .isEqualTo(error); @@ -461,7 +449,7 @@ public void singleError() { } @Test - public void observableError() { + void observableError() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy(() -> createParentSpan(() -> Observable.error(error).toList().blockingGet())) .isEqualTo(error); @@ -473,7 +461,7 @@ public void observableError() { } @Test - public void completableError() { + void completableError() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy( () -> createParentSpan(() -> Completable.error(error).toMaybe().blockingGet())) @@ -486,7 +474,7 @@ public void completableError() { } @Test - public void basicMaybeFailure() { + void basicMaybeFailure() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy( () -> @@ -512,7 +500,7 @@ public void basicMaybeFailure() { } @Test - public void basicFlowableFailure() { + void basicFlowableFailure() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy( () -> @@ -539,7 +527,7 @@ public void basicFlowableFailure() { } @Test - public void basicMaybeCancel() { + void basicMaybeCancel() { createParentSpan( () -> Maybe.just(1).toFlowable().map(this::addOne).subscribe(CancellingSubscriber.INSTANCE)); @@ -551,7 +539,7 @@ public void basicMaybeCancel() { } @Test - public void basicFlowableCancel() { + void basicFlowableCancel() { createParentSpan( () -> Flowable.fromIterable(asList(5, 6)) @@ -565,7 +553,7 @@ public void basicFlowableCancel() { } @Test - public void basicSingleCancel() { + void basicSingleCancel() { createParentSpan( () -> Single.just(1).toFlowable().map(this::addOne).subscribe(CancellingSubscriber.INSTANCE)); @@ -577,7 +565,7 @@ public void basicSingleCancel() { } @Test - public void basicCompletableCancel() { + void basicCompletableCancel() { createParentSpan( () -> Completable.fromCallable(() -> 1) @@ -591,7 +579,7 @@ public void basicCompletableCancel() { } @Test - public void basicObservableCancel() { + void basicObservableCancel() { createParentSpan( () -> Observable.just(1) @@ -606,7 +594,7 @@ public void basicObservableCancel() { } @Test - public void basicMaybeChain() { + void basicMaybeChain() { createParentSpan( () -> Maybe.just(1) @@ -635,7 +623,7 @@ public void basicMaybeChain() { } @Test - public void basicFlowableChain() { + void basicFlowableChain() { createParentSpan( () -> Flowable.fromIterable(asList(5, 6)) @@ -673,7 +661,7 @@ public void basicFlowableChain() { // Publisher chain spans have the correct parents from subscription time @Test - public void maybeChainParentSpan() { + void maybeChainParentSpan() { Maybe maybe = Maybe.just(42).map(this::addOne).map(this::addTwo); testing().runWithSpan("trace-parent", () -> maybe.blockingGet()); testing() @@ -692,7 +680,7 @@ public void maybeChainParentSpan() { } @Test - public void maybeChainHasSubscriptionContext() { + void maybeChainHasSubscriptionContext() { Integer result = createParentSpan( () -> { @@ -722,7 +710,7 @@ public void maybeChainHasSubscriptionContext() { } @Test - public void flowableChainHasSubscriptionContext() { + void flowableChainHasSubscriptionContext() { List result = createParentSpan( () -> { @@ -761,7 +749,7 @@ public void flowableChainHasSubscriptionContext() { } @Test - public void singleChainHasSubscriptionContext() { + void singleChainHasSubscriptionContext() { Integer result = createParentSpan( () -> { @@ -791,7 +779,7 @@ public void singleChainHasSubscriptionContext() { } @Test - public void observableChainHasSubscriptionContext() { + void observableChainHasSubscriptionContext() { List result = createParentSpan( () -> { @@ -823,7 +811,7 @@ public void observableChainHasSubscriptionContext() { @MethodSource("schedulers") @ParameterizedTest - public void flowableMultiResults(Scheduler scheduler) { + void flowableMultiResults(Scheduler scheduler) { List result = testing() .runWithSpan( @@ -863,7 +851,7 @@ public void flowableMultiResults(Scheduler scheduler) { @ParameterizedTest @MethodSource("schedulers") - public void maybeMultipleTraceChains(Scheduler scheduler) { + void maybeMultipleTraceChains(Scheduler scheduler) { int iterations = 100; RxJava2ConcurrencyTestHelper.launchAndWait(scheduler, iterations, 60000, testing()); @SuppressWarnings("unchecked") From a902c9e236bd13758136c73c1ccbaf04fafdfb0e Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Thu, 7 Aug 2025 16:08:47 -0400 Subject: [PATCH 04/12] check to make sure class is ObservableFromCallable --- .../rxjava/v2_0/TracingAssembly.java | 33 +++++++++++++ .../v2_0/TracingCallableObservable.java | 49 +++++++++++++++++++ .../rxjava/v2_0/TracingObservable.java | 33 ------------- .../rxjava/v2_0/AbstractRxJava2Test.java | 11 +++-- 4 files changed, 90 insertions(+), 36 deletions(-) create mode 100644 instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingCallableObservable.java delete mode 100644 instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java index 30a4292366e3..9feadd5c5441 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java @@ -93,6 +93,11 @@ public final class TracingAssembly { private static Function oldOnParallelAssembly; + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + @Nullable + private static Function oldOnObservableAssembly; + @GuardedBy("TracingAssembly.class") private static boolean enabled; @@ -118,6 +123,8 @@ public void enable() { enableObservable(); + enableObservableAssembly(); + enableCompletable(); enableSingle(); @@ -142,6 +149,8 @@ public void disable() { disableObservable(); + disableObservableAssembly(); + disableCompletable(); disableSingle(); @@ -219,6 +228,24 @@ private static void enableObservable() { } } + @GuardedBy("TracingAssembly.class") + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void enableObservableAssembly() { + oldOnObservableAssembly = RxJavaPlugins.getOnObservableAssembly(); + RxJavaPlugins.setOnObservableAssembly( + compose( + oldOnObservableAssembly, + observable -> { + if (observable + .getClass() + .getName() + .equals("io.reactivex.internal.operators.observable.ObservableFromCallable")) { + return new TracingCallableObservable(observable, Context.current()); + } + return observable; + })); + } + @GuardedBy("TracingAssembly.class") @SuppressWarnings({"rawtypes", "unchecked"}) private static void enableSingle() { @@ -274,6 +301,12 @@ private static void disableObservable() { oldOnObservableSubscribe = null; } + @GuardedBy("TracingAssembly.class") + private static void disableObservableAssembly() { + RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly); + oldOnObservableAssembly = null; + } + @GuardedBy("TracingAssembly.class") private static void disableCompletable() { RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe); diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingCallableObservable.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingCallableObservable.java new file mode 100644 index 000000000000..e783c98bfe6b --- /dev/null +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingCallableObservable.java @@ -0,0 +1,49 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava.v2_0; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.Observable; +import io.reactivex.Observer; +import java.lang.reflect.Field; +import java.util.concurrent.Callable; + +/** Wraps an ObservableFromCallable to ensure context propagation for the callable execution. */ +final class TracingCallableObservable extends Observable { + private final Observable source; + private final Context context; + + TracingCallableObservable(Observable source, Context context) { + this.source = source; + this.context = context; + } + + @Override + protected void subscribeActual(Observer observer) { + try { + Field callableField = source.getClass().getDeclaredField("callable"); + callableField.setAccessible(true); + @SuppressWarnings("unchecked") + Callable originalCallable = (Callable) callableField.get(source); + + if (originalCallable != null) { + Callable wrappedCallable = + () -> { + try (Scope ignored = context.makeCurrent()) { + return originalCallable.call(); + } + }; + + callableField.set(source, wrappedCallable); + } + } catch (Exception e) { + // If reflection fails, fall back to original behavior + } + + source.subscribe(observer); + } +} diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java deleted file mode 100644 index f6d1b11d3138..000000000000 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingObservable.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.rxjava.v2_0; - -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.reactivex.Observable; -import io.reactivex.Observer; - -final class TracingObservable extends Observable { - private final Observable source; - private final Context context; - - TracingObservable(Observable source, Context context) { - this.source = source; - this.context = context; - } - - @Override - protected void subscribeActual(Observer observer) { - try (Scope ignored = context.makeCurrent()) { - // Don't double-wrap if already a TracingObserver - if (observer instanceof TracingObserver) { - source.subscribe(observer); - } else { - source.subscribe(new TracingObserver<>(observer, context)); - } - } - } -} diff --git a/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java b/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java index a0199b16db6d..2c2cc13474fd 100644 --- a/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java +++ b/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java @@ -30,6 +30,7 @@ import io.reactivex.schedulers.Schedulers; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -342,7 +343,8 @@ void basicObservable() { } @Test - void basicObservableFromCallable() { + void observableFromCallableContextPropagation() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); AtomicReference traceId = new AtomicReference<>(); AtomicReference innerObservableTraceId = new AtomicReference<>(); AtomicReference endObservableTraceId = new AtomicReference<>(); @@ -359,11 +361,14 @@ void basicObservableFromCallable() { .subscribeOn(Schedulers.io()) .observeOn(Schedulers.single()) .subscribe( - data -> - endObservableTraceId.set(Span.current().getSpanContext().getTraceId())); + data -> { + endObservableTraceId.set(Span.current().getSpanContext().getTraceId()); + latch.countDown(); + }); assertThat(unused).isNotNull(); }); + latch.await(); assertThat(innerObservableTraceId.get()).isEqualTo(traceId.get()); assertThat(endObservableTraceId.get()).isEqualTo(traceId.get()); } From 18d6f4e3352c57e475c66a9254952077ba9a5345 Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Mon, 11 Aug 2025 09:30:25 -0400 Subject: [PATCH 05/12] reorder compose arguments --- .../instrumentation/rxjava/v2_0/TracingAssembly.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java index 9feadd5c5441..2388b0fa57c5 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java @@ -234,7 +234,6 @@ private static void enableObservableAssembly() { oldOnObservableAssembly = RxJavaPlugins.getOnObservableAssembly(); RxJavaPlugins.setOnObservableAssembly( compose( - oldOnObservableAssembly, observable -> { if (observable .getClass() @@ -243,7 +242,8 @@ private static void enableObservableAssembly() { return new TracingCallableObservable(observable, Context.current()); } return observable; - })); + }, + oldOnObservableAssembly)); } @GuardedBy("TracingAssembly.class") @@ -345,6 +345,9 @@ private static void disableWithSpanStrategy() { if (before == null) { return after; } + if (after == null) { + return before; + } return (T v) -> after.apply(before.apply(v)); } From dbd56ae35c6ff6e9196edc748770c5a6f726ac08 Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Mon, 11 Aug 2025 09:53:17 -0400 Subject: [PATCH 06/12] simplify instrumentation, fix other versions of rxjava --- .../rxjava/v2_0/TracingAssembly.java | 23 ++++----- .../v2_0/TracingCallableObservable.java | 49 ------------------- .../rxjava/v3/common/AbstractRxJava3Test.java | 36 ++++++++++++++ .../rxjava/v3_0/TracingAssembly.java | 28 +++++++++++ .../rxjava/v3_1_1/TracingAssembly.java | 28 +++++++++++ 5 files changed, 101 insertions(+), 63 deletions(-) delete mode 100644 instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingCallableObservable.java diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java index 2388b0fa57c5..e058def2ad2a 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java @@ -229,21 +229,16 @@ private static void enableObservable() { } @GuardedBy("TracingAssembly.class") - @SuppressWarnings({"rawtypes", "unchecked"}) private static void enableObservableAssembly() { - oldOnObservableAssembly = RxJavaPlugins.getOnObservableAssembly(); - RxJavaPlugins.setOnObservableAssembly( - compose( - observable -> { - if (observable - .getClass() - .getName() - .equals("io.reactivex.internal.operators.observable.ObservableFromCallable")) { - return new TracingCallableObservable(observable, Context.current()); - } - return observable; - }, - oldOnObservableAssembly)); + RxJavaPlugins.setScheduleHandler( + runnable -> { + Context context = Context.current(); + return () -> { + try (Scope ignored = context.makeCurrent()) { + runnable.run(); + } + }; + }); } @GuardedBy("TracingAssembly.class") diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingCallableObservable.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingCallableObservable.java deleted file mode 100644 index e783c98bfe6b..000000000000 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingCallableObservable.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.rxjava.v2_0; - -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.reactivex.Observable; -import io.reactivex.Observer; -import java.lang.reflect.Field; -import java.util.concurrent.Callable; - -/** Wraps an ObservableFromCallable to ensure context propagation for the callable execution. */ -final class TracingCallableObservable extends Observable { - private final Observable source; - private final Context context; - - TracingCallableObservable(Observable source, Context context) { - this.source = source; - this.context = context; - } - - @Override - protected void subscribeActual(Observer observer) { - try { - Field callableField = source.getClass().getDeclaredField("callable"); - callableField.setAccessible(true); - @SuppressWarnings("unchecked") - Callable originalCallable = (Callable) callableField.get(source); - - if (originalCallable != null) { - Callable wrappedCallable = - () -> { - try (Scope ignored = context.makeCurrent()) { - return originalCallable.call(); - } - }; - - callableField.set(source, wrappedCallable); - } - } catch (Exception e) { - // If reflection fails, fall back to original behavior - } - - source.subscribe(observer); - } -} diff --git a/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java b/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java index 36872f47b5f1..c44a2041b4c0 100644 --- a/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java +++ b/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java @@ -11,6 +11,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.util.ThrowingRunnable; @@ -23,14 +24,18 @@ import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.internal.operators.flowable.FlowablePublish; import io.reactivex.rxjava3.internal.operators.observable.ObservablePublish; import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Stream; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.params.ParameterizedTest; @@ -224,6 +229,37 @@ public void twoOperationsFlowable() { .hasParent(trace.getSpan(0)))); } + @Test + void observableFromCallableContextPropagation() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference traceId = new AtomicReference<>(); + AtomicReference innerObservableTraceId = new AtomicReference<>(); + AtomicReference endObservableTraceId = new AtomicReference<>(); + + createParentSpan( + () -> { + traceId.set(Span.current().getSpanContext().getTraceId()); + Disposable unused = + Observable.fromCallable( + () -> { + innerObservableTraceId.set(Span.current().getSpanContext().getTraceId()); + return "success"; + }) + .subscribeOn(Schedulers.io()) + .observeOn(Schedulers.single()) + .subscribe( + data -> { + endObservableTraceId.set(Span.current().getSpanContext().getTraceId()); + latch.countDown(); + }); + assertThat(unused).isNotNull(); + }); + + latch.await(); + Assertions.assertThat(innerObservableTraceId.get()).isEqualTo(traceId.get()); + Assertions.assertThat(endObservableTraceId.get()).isEqualTo(traceId.get()); + } + @Test public void delayedFlowable() { List result = diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java index 191a2ebf9b1f..ac98ebdf72ba 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java @@ -66,6 +66,11 @@ public final class TracingAssembly { private static BiFunction oldOnObservableSubscribe; + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + @Nullable + private static Function oldOnObservableAssembly; + @SuppressWarnings("rawtypes") @GuardedBy("TracingAssembly.class") @Nullable @@ -122,6 +127,8 @@ public void enable() { enableObservable(); + enableObservableAssembly(); + enableCompletable(); enableSingle(); @@ -146,6 +153,8 @@ public void disable() { disableObservable(); + disableObservableAssembly(); + disableCompletable(); disableSingle(); @@ -221,6 +230,19 @@ private static void enableObservable() { })); } + @GuardedBy("TracingAssembly.class") + private static void enableObservableAssembly() { + RxJavaPlugins.setScheduleHandler( + runnable -> { + Context context = Context.current(); + return () -> { + try (Scope ignored = context.makeCurrent()) { + runnable.run(); + } + }; + }); + } + @GuardedBy("TracingAssembly.class") @SuppressWarnings({"rawtypes", "unchecked"}) private static void enableSingle() { @@ -276,6 +298,12 @@ private static void disableObservable() { oldOnObservableSubscribe = null; } + @GuardedBy("TracingAssembly.class") + private static void disableObservableAssembly() { + RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly); + oldOnObservableAssembly = null; + } + @GuardedBy("TracingAssembly.class") private static void disableCompletable() { RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe); diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java index 10726a499728..14aec1f9656f 100644 --- a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java @@ -66,6 +66,11 @@ public final class TracingAssembly { private static BiFunction oldOnObservableSubscribe; + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + @Nullable + private static Function oldOnObservableAssembly; + @SuppressWarnings("rawtypes") @GuardedBy("TracingAssembly.class") @Nullable @@ -122,6 +127,8 @@ public void enable() { enableObservable(); + enableObservableAssembly(); + enableCompletable(); enableSingle(); @@ -146,6 +153,8 @@ public void disable() { disableObservable(); + disableObservableAssembly(); + disableCompletable(); disableSingle(); @@ -186,6 +195,19 @@ private static void enableCompletable() { })); } + @GuardedBy("TracingAssembly.class") + private static void enableObservableAssembly() { + RxJavaPlugins.setScheduleHandler( + runnable -> { + Context context = Context.current(); + return () -> { + try (Scope ignored = context.makeCurrent()) { + runnable.run(); + } + }; + }); + } + @GuardedBy("TracingAssembly.class") @SuppressWarnings({"rawtypes", "unchecked"}) private static void enableFlowable() { @@ -276,6 +298,12 @@ private static void disableObservable() { oldOnObservableSubscribe = null; } + @GuardedBy("TracingAssembly.class") + private static void disableObservableAssembly() { + RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly); + oldOnObservableAssembly = null; + } + @GuardedBy("TracingAssembly.class") private static void disableCompletable() { RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe); From ef7d69089703a864e726146e658385b90491e8ba Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Mon, 11 Aug 2025 09:54:18 -0400 Subject: [PATCH 07/12] revert compose change --- .../instrumentation/rxjava/v2_0/TracingAssembly.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java index e058def2ad2a..9befbdcd1c85 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java @@ -340,9 +340,6 @@ private static void disableWithSpanStrategy() { if (before == null) { return after; } - if (after == null) { - return before; - } return (T v) -> after.apply(before.apply(v)); } From 9de8b90740b90c58ab49b5dc467f5a83ef2f048d Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Mon, 11 Aug 2025 10:02:56 -0400 Subject: [PATCH 08/12] fix assertions import --- .../rxjava/v3/common/AbstractRxJava3Test.java | 74 +++++++++---------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java b/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java index c44a2041b4c0..78426dd42929 100644 --- a/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java +++ b/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java @@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Stream; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.params.ParameterizedTest; @@ -96,7 +95,7 @@ public void onComplete() {} } @Test - public void basicMaybe() { + void basicMaybe() { int result = createParentSpan(() -> Maybe.just(1).map(this::addOne).blockingGet()); assertThat(result).isEqualTo(2); testing() @@ -111,7 +110,7 @@ public void basicMaybe() { } @Test - public void twoOperationsMaybe() { + void twoOperationsMaybe() { int result = createParentSpan(() -> Maybe.just(2).map(this::addOne).map(this::addOne).blockingGet()); assertThat(result).isEqualTo(4); @@ -131,7 +130,7 @@ public void twoOperationsMaybe() { } @Test - public void delayedMaybe() { + void delayedMaybe() { int result = createParentSpan( () -> Maybe.just(3).delay(100, TimeUnit.MILLISECONDS).map(this::addOne).blockingGet()); @@ -148,7 +147,7 @@ public void delayedMaybe() { } @Test - public void delayedTwiceMaybe() { + void delayedTwiceMaybe() { int result = createParentSpan( () -> @@ -175,7 +174,7 @@ public void delayedTwiceMaybe() { } @Test - public void basicFlowable() { + void basicFlowable() { Iterable result = createParentSpan( () -> Flowable.fromIterable(asList(5, 6)).map(this::addOne).toList().blockingGet()); @@ -196,7 +195,7 @@ public void basicFlowable() { } @Test - public void twoOperationsFlowable() { + void twoOperationsFlowable() { List result = createParentSpan( () -> @@ -251,17 +250,18 @@ void observableFromCallableContextPropagation() throws InterruptedException { data -> { endObservableTraceId.set(Span.current().getSpanContext().getTraceId()); latch.countDown(); + latch.countDown(); }); assertThat(unused).isNotNull(); }); latch.await(); - Assertions.assertThat(innerObservableTraceId.get()).isEqualTo(traceId.get()); - Assertions.assertThat(endObservableTraceId.get()).isEqualTo(traceId.get()); + assertThat(innerObservableTraceId.get()).isEqualTo(traceId.get()); + assertThat(endObservableTraceId.get()).isEqualTo(traceId.get()); } @Test - public void delayedFlowable() { + void delayedFlowable() { List result = createParentSpan( () -> @@ -287,7 +287,7 @@ public void delayedFlowable() { } @Test - public void delayedTwiceFlowable() { + void delayedTwiceFlowable() { List result = createParentSpan( () -> @@ -323,7 +323,7 @@ public void delayedTwiceFlowable() { } @Test - public void maybeFromCallable() { + void maybeFromCallable() { Integer result = createParentSpan( () -> Maybe.fromCallable(() -> addOne(10)).map(this::addOne).blockingGet()); @@ -344,7 +344,7 @@ public void maybeFromCallable() { } @Test - public void basicSingle() { + void basicSingle() { Integer result = createParentSpan(() -> Single.just(0).map(this::addOne).blockingGet()); assertThat(result).isEqualTo(1); testing() @@ -359,7 +359,7 @@ public void basicSingle() { } @Test - public void basicObservable() { + void basicObservable() { List result = createParentSpan(() -> Observable.just(0).map(this::addOne).toList().blockingGet()); assertThat(result).contains(1); @@ -375,7 +375,7 @@ public void basicObservable() { } @Test - public void connectableFlowable() { + void connectableFlowable() { List result = createParentSpan( () -> @@ -397,7 +397,7 @@ public void connectableFlowable() { } @Test - public void connectableObservable() { + void connectableObservable() { List result = createParentSpan( () -> @@ -419,7 +419,7 @@ public void connectableObservable() { } @Test - public void maybeError() { + void maybeError() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy(() -> createParentSpan(() -> Maybe.error(error).blockingGet())) .isEqualTo(error); @@ -431,7 +431,7 @@ public void maybeError() { } @Test - public void flowableError() { + void flowableError() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy(() -> createParentSpan(() -> Flowable.error(error)).toList().blockingGet()) .isEqualTo(error); @@ -443,7 +443,7 @@ public void flowableError() { } @Test - public void singleError() { + void singleError() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy(() -> createParentSpan(() -> Single.error(error)).blockingGet()) .isEqualTo(error); @@ -455,7 +455,7 @@ public void singleError() { } @Test - public void observableError() { + void observableError() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy(() -> createParentSpan(() -> Observable.error(error).toList().blockingGet())) .isEqualTo(error); @@ -467,7 +467,7 @@ public void observableError() { } @Test - public void completableError() { + void completableError() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy( () -> createParentSpan(() -> Completable.error(error).toMaybe().blockingGet())) @@ -480,7 +480,7 @@ public void completableError() { } @Test - public void basicMaybeFailure() { + void basicMaybeFailure() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy( () -> @@ -506,7 +506,7 @@ public void basicMaybeFailure() { } @Test - public void basicFlowableFailure() { + void basicFlowableFailure() { IllegalStateException error = new IllegalStateException(EXCEPTION_MESSAGE); assertThatThrownBy( () -> @@ -533,7 +533,7 @@ public void basicFlowableFailure() { } @Test - public void basicMaybeCancel() { + void basicMaybeCancel() { createParentSpan( () -> Maybe.just(1).toFlowable().map(this::addOne).subscribe(CancellingSubscriber.INSTANCE)); @@ -545,7 +545,7 @@ public void basicMaybeCancel() { } @Test - public void basicFlowableCancel() { + void basicFlowableCancel() { createParentSpan( () -> Flowable.fromIterable(asList(5, 6)) @@ -559,7 +559,7 @@ public void basicFlowableCancel() { } @Test - public void basicSingleCancel() { + void basicSingleCancel() { createParentSpan( () -> Single.just(1).toFlowable().map(this::addOne).subscribe(CancellingSubscriber.INSTANCE)); @@ -571,7 +571,7 @@ public void basicSingleCancel() { } @Test - public void basicCompletableCancel() { + void basicCompletableCancel() { createParentSpan( () -> Completable.fromCallable(() -> 1) @@ -585,7 +585,7 @@ public void basicCompletableCancel() { } @Test - public void basicObservableCancel() { + void basicObservableCancel() { createParentSpan( () -> Observable.just(1) @@ -600,7 +600,7 @@ public void basicObservableCancel() { } @Test - public void basicMaybeChain() { + void basicMaybeChain() { createParentSpan( () -> Maybe.just(1) @@ -629,7 +629,7 @@ public void basicMaybeChain() { } @Test - public void basicFlowableChain() { + void basicFlowableChain() { createParentSpan( () -> Flowable.fromIterable(asList(5, 6)) @@ -667,7 +667,7 @@ public void basicFlowableChain() { // Publisher chain spans have the correct parents from subscription time @Test - public void maybeChainParentSpan() { + void maybeChainParentSpan() { Maybe maybe = Maybe.just(42).map(this::addOne).map(this::addTwo); testing().runWithSpan("trace-parent", () -> maybe.blockingGet()); testing() @@ -686,7 +686,7 @@ public void maybeChainParentSpan() { } @Test - public void maybeChainHasSubscriptionContext() { + void maybeChainHasSubscriptionContext() { Integer result = createParentSpan( () -> { @@ -716,7 +716,7 @@ public void maybeChainHasSubscriptionContext() { } @Test - public void flowableChainHasSubscriptionContext() { + void flowableChainHasSubscriptionContext() { List result = createParentSpan( () -> { @@ -755,7 +755,7 @@ public void flowableChainHasSubscriptionContext() { } @Test - public void singleChainHasSubscriptionContext() { + void singleChainHasSubscriptionContext() { Integer result = createParentSpan( () -> { @@ -785,7 +785,7 @@ public void singleChainHasSubscriptionContext() { } @Test - public void observableChainHasSubscriptionContext() { + void observableChainHasSubscriptionContext() { List result = createParentSpan( () -> { @@ -817,7 +817,7 @@ public void observableChainHasSubscriptionContext() { @ParameterizedTest @MethodSource("schedulers") - public void flowableMultiResults(Scheduler scheduler) { + void flowableMultiResults(Scheduler scheduler) { List result = testing() .runWithSpan( @@ -857,7 +857,7 @@ public void flowableMultiResults(Scheduler scheduler) { @ParameterizedTest @MethodSource("schedulers") - public void maybeMultipleTraceChains(Scheduler scheduler) { + void maybeMultipleTraceChains(Scheduler scheduler) { int iterations = 100; RxJava3ConcurrencyTestHelper.launchAndWait(scheduler, iterations, 60000, testing()); @SuppressWarnings("unchecked") From d4d2a40c003fa4e194372fc3b48339ec9bef326f Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Mon, 11 Aug 2025 11:04:47 -0400 Subject: [PATCH 09/12] remove unneeded var and fix disable method --- .../instrumentation/rxjava/v2_0/TracingAssembly.java | 8 +------- .../instrumentation/rxjava/v3_0/TracingAssembly.java | 8 +------- .../instrumentation/rxjava/v3_1_1/TracingAssembly.java | 8 +------- 3 files changed, 3 insertions(+), 21 deletions(-) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java index 9befbdcd1c85..40663b456633 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java @@ -93,11 +93,6 @@ public final class TracingAssembly { private static Function oldOnParallelAssembly; - @SuppressWarnings("rawtypes") - @GuardedBy("TracingAssembly.class") - @Nullable - private static Function oldOnObservableAssembly; - @GuardedBy("TracingAssembly.class") private static boolean enabled; @@ -298,8 +293,7 @@ private static void disableObservable() { @GuardedBy("TracingAssembly.class") private static void disableObservableAssembly() { - RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly); - oldOnObservableAssembly = null; + RxJavaPlugins.setScheduleHandler(null); } @GuardedBy("TracingAssembly.class") diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java index ac98ebdf72ba..4092781cb67e 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java @@ -66,11 +66,6 @@ public final class TracingAssembly { private static BiFunction oldOnObservableSubscribe; - @SuppressWarnings("rawtypes") - @GuardedBy("TracingAssembly.class") - @Nullable - private static Function oldOnObservableAssembly; - @SuppressWarnings("rawtypes") @GuardedBy("TracingAssembly.class") @Nullable @@ -300,8 +295,7 @@ private static void disableObservable() { @GuardedBy("TracingAssembly.class") private static void disableObservableAssembly() { - RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly); - oldOnObservableAssembly = null; + RxJavaPlugins.setScheduleHandler(null); } @GuardedBy("TracingAssembly.class") diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java index 14aec1f9656f..590a7b4b23bc 100644 --- a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java @@ -66,11 +66,6 @@ public final class TracingAssembly { private static BiFunction oldOnObservableSubscribe; - @SuppressWarnings("rawtypes") - @GuardedBy("TracingAssembly.class") - @Nullable - private static Function oldOnObservableAssembly; - @SuppressWarnings("rawtypes") @GuardedBy("TracingAssembly.class") @Nullable @@ -300,8 +295,7 @@ private static void disableObservable() { @GuardedBy("TracingAssembly.class") private static void disableObservableAssembly() { - RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly); - oldOnObservableAssembly = null; + RxJavaPlugins.setScheduleHandler(null); } @GuardedBy("TracingAssembly.class") From 71ff0ff34c0f4bff4c9d3b8cdb91a5332478c6cb Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Mon, 11 Aug 2025 11:23:55 -0400 Subject: [PATCH 10/12] support wrapper chaining --- .../rxjava/v2_0/TracingAssembly.java | 23 +++++--- .../rxjava/v2_0/AbstractRxJava2Test.java | 53 +++++++++++++++++++ .../rxjava/v3/common/AbstractRxJava3Test.java | 53 +++++++++++++++++++ .../rxjava/v3_0/TracingAssembly.java | 23 +++++--- .../rxjava/v3_1_1/TracingAssembly.java | 23 +++++--- 5 files changed, 157 insertions(+), 18 deletions(-) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java index 40663b456633..a6c62112665a 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java @@ -93,6 +93,10 @@ public final class TracingAssembly { private static Function oldOnParallelAssembly; + @GuardedBy("TracingAssembly.class") + @Nullable + private static Function oldScheduleHandler; + @GuardedBy("TracingAssembly.class") private static boolean enabled; @@ -225,14 +229,20 @@ private static void enableObservable() { @GuardedBy("TracingAssembly.class") private static void enableObservableAssembly() { + oldScheduleHandler = RxJavaPlugins.getScheduleHandler(); RxJavaPlugins.setScheduleHandler( runnable -> { Context context = Context.current(); - return () -> { - try (Scope ignored = context.makeCurrent()) { - runnable.run(); - } - }; + Runnable wrappedRunnable = + () -> { + try (Scope ignored = context.makeCurrent()) { + runnable.run(); + } + }; + // If there was a previous handler, apply it to our wrapped runnable + return oldScheduleHandler != null + ? oldScheduleHandler.apply(wrappedRunnable) + : wrappedRunnable; }); } @@ -293,7 +303,8 @@ private static void disableObservable() { @GuardedBy("TracingAssembly.class") private static void disableObservableAssembly() { - RxJavaPlugins.setScheduleHandler(null); + RxJavaPlugins.setScheduleHandler(oldScheduleHandler); + oldScheduleHandler = null; } @GuardedBy("TracingAssembly.class") diff --git a/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java b/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java index 2c2cc13474fd..4aaf66115dd9 100644 --- a/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java +++ b/instrumentation/rxjava/rxjava-2.0/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/AbstractRxJava2Test.java @@ -25,13 +25,17 @@ import io.reactivex.Scheduler; import io.reactivex.Single; import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Function; import io.reactivex.internal.operators.flowable.FlowablePublish; import io.reactivex.internal.operators.observable.ObservablePublish; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; import java.util.Comparator; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Stream; @@ -886,4 +890,53 @@ void maybeMultipleTraceChains(Scheduler scheduler) { assertions); testing().clearData(); } + + @Test + void scheduleHandlerChainingPreservesExistingHandler() throws InterruptedException { + AtomicInteger customHandlerCallCount = new AtomicInteger(0); + AtomicBoolean customHandlerExecuted = new AtomicBoolean(false); + + Function originalHandler = + RxJavaPlugins.getScheduleHandler(); + Function customHandler = + runnable -> + () -> { + customHandlerCallCount.incrementAndGet(); + customHandlerExecuted.set(true); + runnable.run(); + }; + + try { + RxJavaPlugins.setScheduleHandler(customHandler); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean observableExecuted = new AtomicBoolean(false); + AtomicReference traceId = new AtomicReference<>(); + + createParentSpan( + () -> { + traceId.set(Span.current().getSpanContext().getTraceId()); + Disposable unused = + Observable.fromCallable( + () -> { + observableExecuted.set(true); + return "test"; + }) + .subscribeOn(Schedulers.io()) + .subscribe(result -> latch.countDown()); + }); + + latch.await(); + assertThat(observableExecuted.get()).isTrue(); + assertThat(customHandlerExecuted.get()).isTrue(); + assertThat(customHandlerCallCount.get()).isGreaterThan(0); + + assertThat(traceId.get()).isNotNull(); + assertThat(traceId.get()).isNotEqualTo("00000000000000000000000000000000"); + + } finally { + // Restore original handler to avoid affecting other tests + RxJavaPlugins.setScheduleHandler(originalHandler); + } + } } diff --git a/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java b/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java index 78426dd42929..d801ca473301 100644 --- a/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java +++ b/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava/v3/common/AbstractRxJava3Test.java @@ -25,13 +25,17 @@ import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.internal.operators.flowable.FlowablePublish; import io.reactivex.rxjava3.internal.operators.observable.ObservablePublish; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.Comparator; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Stream; @@ -887,4 +891,53 @@ void maybeMultipleTraceChains(Scheduler scheduler) { assertions); testing().clearData(); } + + @Test + void scheduleHandlerChainingPreservesExistingHandler() throws InterruptedException { + AtomicInteger customHandlerCallCount = new AtomicInteger(0); + AtomicBoolean customHandlerExecuted = new AtomicBoolean(false); + + Function originalHandler = + RxJavaPlugins.getScheduleHandler(); + Function customHandler = + runnable -> + () -> { + customHandlerCallCount.incrementAndGet(); + customHandlerExecuted.set(true); + runnable.run(); + }; + + try { + RxJavaPlugins.setScheduleHandler(customHandler); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean observableExecuted = new AtomicBoolean(false); + AtomicReference traceId = new AtomicReference<>(); + + createParentSpan( + () -> { + traceId.set(Span.current().getSpanContext().getTraceId()); + Disposable unused = + Observable.fromCallable( + () -> { + observableExecuted.set(true); + return "test"; + }) + .subscribeOn(Schedulers.io()) + .subscribe(result -> latch.countDown()); + }); + + latch.await(); + assertThat(observableExecuted.get()).isTrue(); + assertThat(customHandlerExecuted.get()).isTrue(); + assertThat(customHandlerCallCount.get()).isGreaterThan(0); + + assertThat(traceId.get()).isNotNull(); + assertThat(traceId.get()).isNotEqualTo("00000000000000000000000000000000"); + + } finally { + // Restore original handler to avoid affecting other tests + RxJavaPlugins.setScheduleHandler(originalHandler); + } + } } diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java index 4092781cb67e..b66c1e0d5717 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java @@ -97,6 +97,10 @@ public final class TracingAssembly { private static Function oldOnParallelAssembly; + @GuardedBy("TracingAssembly.class") + @Nullable + private static Function oldScheduleHandler; + @GuardedBy("TracingAssembly.class") private static boolean enabled; @@ -227,14 +231,20 @@ private static void enableObservable() { @GuardedBy("TracingAssembly.class") private static void enableObservableAssembly() { + oldScheduleHandler = RxJavaPlugins.getScheduleHandler(); RxJavaPlugins.setScheduleHandler( runnable -> { Context context = Context.current(); - return () -> { - try (Scope ignored = context.makeCurrent()) { - runnable.run(); - } - }; + Runnable wrappedRunnable = + () -> { + try (Scope ignored = context.makeCurrent()) { + runnable.run(); + } + }; + // If there was a previous handler, apply it to our wrapped runnable + return oldScheduleHandler != null + ? oldScheduleHandler.apply(wrappedRunnable) + : wrappedRunnable; }); } @@ -295,7 +305,8 @@ private static void disableObservable() { @GuardedBy("TracingAssembly.class") private static void disableObservableAssembly() { - RxJavaPlugins.setScheduleHandler(null); + RxJavaPlugins.setScheduleHandler(oldScheduleHandler); + oldScheduleHandler = null; } @GuardedBy("TracingAssembly.class") diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java index 590a7b4b23bc..734bdfe2db18 100644 --- a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java @@ -97,6 +97,10 @@ public final class TracingAssembly { private static Function oldOnParallelAssembly; + @GuardedBy("TracingAssembly.class") + @Nullable + private static Function oldScheduleHandler; + @GuardedBy("TracingAssembly.class") private static boolean enabled; @@ -192,14 +196,20 @@ private static void enableCompletable() { @GuardedBy("TracingAssembly.class") private static void enableObservableAssembly() { + oldScheduleHandler = RxJavaPlugins.getScheduleHandler(); RxJavaPlugins.setScheduleHandler( runnable -> { Context context = Context.current(); - return () -> { - try (Scope ignored = context.makeCurrent()) { - runnable.run(); - } - }; + Runnable wrappedRunnable = + () -> { + try (Scope ignored = context.makeCurrent()) { + runnable.run(); + } + }; + // If there was a previous handler, apply it to our wrapped runnable + return oldScheduleHandler != null + ? oldScheduleHandler.apply(wrappedRunnable) + : wrappedRunnable; }); } @@ -295,7 +305,8 @@ private static void disableObservable() { @GuardedBy("TracingAssembly.class") private static void disableObservableAssembly() { - RxJavaPlugins.setScheduleHandler(null); + RxJavaPlugins.setScheduleHandler(oldScheduleHandler); + oldScheduleHandler = null; } @GuardedBy("TracingAssembly.class") From c81dee0ef5010547053dc236ff86a071435ef44d Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Tue, 12 Aug 2025 06:35:39 -0400 Subject: [PATCH 11/12] rename method --- .../instrumentation/rxjava/v2_0/TracingAssembly.java | 4 ++-- .../instrumentation/rxjava/v3_0/TracingAssembly.java | 4 ++-- .../instrumentation/rxjava/v3_1_1/TracingAssembly.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java index a6c62112665a..410c1538b15b 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java @@ -122,7 +122,7 @@ public void enable() { enableObservable(); - enableObservableAssembly(); + enableWrappedScheduleHandler(); enableCompletable(); @@ -228,7 +228,7 @@ private static void enableObservable() { } @GuardedBy("TracingAssembly.class") - private static void enableObservableAssembly() { + private static void enableWrappedScheduleHandler() { oldScheduleHandler = RxJavaPlugins.getScheduleHandler(); RxJavaPlugins.setScheduleHandler( runnable -> { diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java index b66c1e0d5717..79dbf7f5991b 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java @@ -126,7 +126,7 @@ public void enable() { enableObservable(); - enableObservableAssembly(); + enableWrappedScheduleHandler(); enableCompletable(); @@ -230,7 +230,7 @@ private static void enableObservable() { } @GuardedBy("TracingAssembly.class") - private static void enableObservableAssembly() { + private static void enableWrappedScheduleHandler() { oldScheduleHandler = RxJavaPlugins.getScheduleHandler(); RxJavaPlugins.setScheduleHandler( runnable -> { diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java index 734bdfe2db18..0e710f25e289 100644 --- a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java @@ -126,7 +126,7 @@ public void enable() { enableObservable(); - enableObservableAssembly(); + enableWrappedScheduleHandler(); enableCompletable(); @@ -195,7 +195,7 @@ private static void enableCompletable() { } @GuardedBy("TracingAssembly.class") - private static void enableObservableAssembly() { + private static void enableWrappedScheduleHandler() { oldScheduleHandler = RxJavaPlugins.getScheduleHandler(); RxJavaPlugins.setScheduleHandler( runnable -> { From bb66f4b748f3da25d5e91ecc3b8ce6632d180e5b Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Tue, 12 Aug 2025 07:16:57 -0400 Subject: [PATCH 12/12] rename other method --- .../instrumentation/rxjava/v2_0/TracingAssembly.java | 4 ++-- .../instrumentation/rxjava/v3_0/TracingAssembly.java | 4 ++-- .../instrumentation/rxjava/v3_1_1/TracingAssembly.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java index 410c1538b15b..71596414b08d 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v2_0/TracingAssembly.java @@ -148,7 +148,7 @@ public void disable() { disableObservable(); - disableObservableAssembly(); + disableWrappedScheduleHandler(); disableCompletable(); @@ -302,7 +302,7 @@ private static void disableObservable() { } @GuardedBy("TracingAssembly.class") - private static void disableObservableAssembly() { + private static void disableWrappedScheduleHandler() { RxJavaPlugins.setScheduleHandler(oldScheduleHandler); oldScheduleHandler = null; } diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java index 79dbf7f5991b..b9c056a2c3b9 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_0/TracingAssembly.java @@ -152,7 +152,7 @@ public void disable() { disableObservable(); - disableObservableAssembly(); + disableWrappedScheduleHandler(); disableCompletable(); @@ -304,7 +304,7 @@ private static void disableObservable() { } @GuardedBy("TracingAssembly.class") - private static void disableObservableAssembly() { + private static void disableWrappedScheduleHandler() { RxJavaPlugins.setScheduleHandler(oldScheduleHandler); oldScheduleHandler = null; } diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java index 0e710f25e289..0b4c60938213 100644 --- a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava/v3_1_1/TracingAssembly.java @@ -152,7 +152,7 @@ public void disable() { disableObservable(); - disableObservableAssembly(); + disableWrappedScheduleHandler(); disableCompletable(); @@ -304,7 +304,7 @@ private static void disableObservable() { } @GuardedBy("TracingAssembly.class") - private static void disableObservableAssembly() { + private static void disableWrappedScheduleHandler() { RxJavaPlugins.setScheduleHandler(oldScheduleHandler); oldScheduleHandler = null; }