|
10 | 10 | import static org.assertj.core.api.Assertions.assertThat; |
11 | 11 | import static org.assertj.core.api.Assertions.assertThatThrownBy; |
12 | 12 |
|
13 | | -import com.google.errorprone.annotations.CanIgnoreReturnValue; |
14 | 13 | import io.opentelemetry.api.common.AttributeKey; |
| 14 | +import io.opentelemetry.api.trace.Span; |
15 | 15 | import io.opentelemetry.api.trace.SpanKind; |
16 | 16 | import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; |
17 | 17 | import io.opentelemetry.instrumentation.testing.util.ThrowingRunnable; |
|
30 | 30 | import io.reactivex.schedulers.Schedulers; |
31 | 31 | import java.util.Comparator; |
32 | 32 | import java.util.List; |
| 33 | +import java.util.concurrent.CountDownLatch; |
33 | 34 | import java.util.concurrent.TimeUnit; |
| 35 | +import java.util.concurrent.atomic.AtomicReference; |
34 | 36 | import java.util.function.Consumer; |
35 | 37 | import java.util.stream.Stream; |
36 | 38 | import org.junit.jupiter.api.Test; |
@@ -340,29 +342,44 @@ public void basicObservable() { |
340 | 342 | .hasParent(trace.getSpan(0)))); |
341 | 343 | } |
342 | 344 |
|
343 | | - |
344 | 345 | @Test |
345 | | - @SuppressWarnings("CanIgnoreReturnValue") |
346 | | - public void basicObservableFromCallable() { |
347 | | - Observable<String> test = createParentSpan(() -> Observable.fromCallable(() -> "success")); |
348 | | - assertThat(test).isNotNull(); |
349 | | - |
350 | | - // Actually subscribe to trigger the Observable execution |
351 | | - String result = test.blockingFirst(); |
352 | | - assertThat(result).isEqualTo("success"); |
353 | | - |
| 346 | + public void basicObservableFromCallable() throws InterruptedException { |
| 347 | + CountDownLatch countDownLatch = new CountDownLatch(1); |
| 348 | + AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
354 | 349 | testing() |
355 | | - .waitAndAssertTraces( |
356 | | - trace -> |
357 | | - trace.hasSpansSatisfyingExactly( |
358 | | - span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(), |
359 | | - span -> |
360 | | - span.hasName(ADD_ONE) |
361 | | - .hasKind(SpanKind.INTERNAL) |
362 | | - .hasParent(trace.getSpan(0)))); |
| 350 | + .runWithSpan( |
| 351 | + "parent", |
| 352 | + () -> { |
| 353 | + String traceId = Span.current().getSpanContext().getTraceId(); |
| 354 | + |
| 355 | + Disposable result = |
| 356 | + Observable.fromCallable( |
| 357 | + () -> { |
| 358 | + assertThat(traceId) |
| 359 | + .isEqualTo(Span.current().getSpanContext().getTraceId()); |
| 360 | + return "success"; |
| 361 | + }) |
| 362 | + .subscribeOn(Schedulers.io()) |
| 363 | + .observeOn(Schedulers.single()) |
| 364 | + .doOnNext( |
| 365 | + data -> |
| 366 | + assertThat(traceId) |
| 367 | + .isEqualTo(Span.current().getSpanContext().getTraceId())) |
| 368 | + .subscribe( |
| 369 | + data -> countDownLatch.countDown(), |
| 370 | + error -> { |
| 371 | + errorRef.set(error); |
| 372 | + countDownLatch.countDown(); |
| 373 | + }); |
| 374 | + assertThat(result).isNotNull(); |
| 375 | + }); |
| 376 | + countDownLatch.await(); |
| 377 | + if (errorRef.get() != null) { |
| 378 | + throw new AssertionError("Assertion failed in observable thread", errorRef.get()); |
| 379 | + } |
| 380 | + testing().waitForTraces(1); |
363 | 381 | } |
364 | 382 |
|
365 | | - |
366 | 383 | @Test |
367 | 384 | public void connectableFlowable() { |
368 | 385 | List<Integer> result = |
@@ -804,27 +821,6 @@ public void observableChainHasSubscriptionContext() { |
804 | 821 | .hasParent(trace.getSpan(0)))); |
805 | 822 | } |
806 | 823 |
|
807 | | - @Test |
808 | | - public void basicObservableFromCallableTest() { |
809 | | - Disposable disposable = Observable.fromCallable(() -> "success") |
810 | | - .onErrorReturnItem("") |
811 | | - .subscribeOn(Schedulers.io()) |
812 | | - .observeOn(Schedulers.single()) |
813 | | - .subscribe(data -> System.out.print("done")); |
814 | | - |
815 | | - assertThat(disposable).isNotNull(); |
816 | | - |
817 | | - testing() |
818 | | - .waitAndAssertTraces( |
819 | | - trace -> |
820 | | - trace.hasSpansSatisfyingExactly( |
821 | | - span -> span.hasName(PARENT).hasKind(SpanKind.INTERNAL).hasNoParent(), |
822 | | - span -> |
823 | | - span.hasName(ADD_ONE) |
824 | | - .hasKind(SpanKind.INTERNAL) |
825 | | - .hasParent(trace.getSpan(0)))); |
826 | | - } |
827 | | - |
828 | 824 | @MethodSource("schedulers") |
829 | 825 | @ParameterizedTest |
830 | 826 | public void flowableMultiResults(Scheduler scheduler) { |
|
0 commit comments