Skip to content

Commit 24b65ab

Browse files
authored
Clear context before flux retry (#8456)
1 parent 178b285 commit 24b65ab

File tree

2 files changed

+166
-3
lines changed

2 files changed

+166
-3
lines changed

instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/TracingSubscriber.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
* https://github.com/opentracing-contrib/java-reactor/blob/master/src/main/java/io/opentracing/contrib/reactor/TracedSubscriber.java
3333
*/
3434
public class TracingSubscriber<T> implements CoreSubscriber<T> {
35+
private static final Class<?> fluxRetrySubscriberClass = getFluxRetrySubscriberClass();
36+
private static final Class<?> fluxRetryWhenSubscriberClass = getFluxRetryWhenSubscriberClass();
3537
private final io.opentelemetry.context.Context traceContext;
3638
private final Subscriber<? super T> subscriber;
3739
private final Context context;
@@ -64,7 +66,15 @@ public void onNext(T o) {
6466

6567
@Override
6668
public void onError(Throwable throwable) {
67-
withActiveSpan(() -> subscriber.onError(throwable));
69+
if (!hasContextToPropagate
70+
&& (fluxRetrySubscriberClass == subscriber.getClass()
71+
|| fluxRetryWhenSubscriberClass == subscriber.getClass())) {
72+
// clear context for retry to avoid having retried operations run with currently active
73+
// context as parent context
74+
withActiveSpan(io.opentelemetry.context.Context.root(), () -> subscriber.onError(throwable));
75+
} else {
76+
withActiveSpan(() -> subscriber.onError(throwable));
77+
}
6878
}
6979

7080
@Override
@@ -78,12 +88,32 @@ public Context currentContext() {
7888
}
7989

8090
private void withActiveSpan(Runnable runnable) {
81-
if (hasContextToPropagate) {
82-
try (Scope ignored = traceContext.makeCurrent()) {
91+
withActiveSpan(hasContextToPropagate ? traceContext : null, runnable);
92+
}
93+
94+
private static void withActiveSpan(io.opentelemetry.context.Context context, Runnable runnable) {
95+
if (context != null) {
96+
try (Scope ignored = context.makeCurrent()) {
8397
runnable.run();
8498
}
8599
} else {
86100
runnable.run();
87101
}
88102
}
103+
104+
private static Class<?> getFluxRetrySubscriberClass() {
105+
try {
106+
return Class.forName("reactor.core.publisher.FluxRetry$RetrySubscriber");
107+
} catch (ClassNotFoundException exception) {
108+
return null;
109+
}
110+
}
111+
112+
private static Class<?> getFluxRetryWhenSubscriberClass() {
113+
try {
114+
return Class.forName("reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber");
115+
} catch (ClassNotFoundException exception) {
116+
return null;
117+
}
118+
}
89119
}

instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/v3_1/ReactorCoreTest.java

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package io.opentelemetry.instrumentation.reactor.v3_1;
77

8+
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName;
89
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
910
import static java.lang.invoke.MethodType.methodType;
1011
import static org.assertj.core.api.Assertions.assertThat;
@@ -20,12 +21,17 @@
2021
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
2122
import java.lang.invoke.MethodHandle;
2223
import java.lang.invoke.MethodHandles;
24+
import java.lang.reflect.Method;
2325
import java.time.Duration;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2427
import java.util.function.Function;
2528
import org.junit.jupiter.api.AfterAll;
2629
import org.junit.jupiter.api.BeforeAll;
2730
import org.junit.jupiter.api.Test;
2831
import org.junit.jupiter.api.extension.RegisterExtension;
32+
import org.junit.jupiter.params.ParameterizedTest;
33+
import org.junit.jupiter.params.provider.ValueSource;
34+
import org.reactivestreams.Publisher;
2935
import reactor.core.Scannable;
3036
import reactor.core.publisher.Flux;
3137
import reactor.core.publisher.Mono;
@@ -406,6 +412,133 @@ void doesNotOverrideInnerCurrentSpansWithThereIsOuterCurrent() {
406412
.hasAttributes(attributeEntry("onNext", true))));
407413
}
408414

415+
@ParameterizedTest
416+
@ValueSource(strings = {"retry", "retryWhen"})
417+
void doesNotLeakContextOnRetry(String retryKind) {
418+
// retry calls subscribe again from onError where we have active context, check that this
419+
// context is not used as parent for retried operations
420+
AtomicBoolean beforeRetry = new AtomicBoolean(true);
421+
Flux<Integer> publish =
422+
Flux.create(
423+
sink -> {
424+
for (int i = 0; i < 2; i++) {
425+
int index = i;
426+
testing.runWithSpan(
427+
"produce " + (beforeRetry.get() ? "before" : "after") + " retry " + i,
428+
() -> sink.next(index));
429+
}
430+
});
431+
432+
Flux<Integer> flux =
433+
Flux.defer(() -> publish.delaySubscription(Duration.ofMillis(1)))
434+
.doOnNext(message -> testing.runWithSpan("process " + message, () -> {}))
435+
.handle(
436+
(message, sink) -> {
437+
if (message == 1 && beforeRetry.compareAndSet(true, false)) {
438+
sink.error(new RuntimeException("Message has error"));
439+
} else {
440+
sink.next(message);
441+
}
442+
});
443+
444+
switch (retryKind) {
445+
case "retry":
446+
flux = flux.retry();
447+
break;
448+
case "retryWhen":
449+
flux = retryWhen(flux);
450+
break;
451+
default:
452+
throw new IllegalStateException("Unsupported retry kind " + retryKind);
453+
}
454+
455+
flux.subscribe();
456+
457+
testing.waitAndAssertSortedTraces(
458+
orderByRootSpanName(
459+
"produce before retry 0",
460+
"produce before retry 1",
461+
"produce after retry 0",
462+
"produce after retry 1"),
463+
trace ->
464+
trace.hasSpansSatisfyingExactly(
465+
span -> span.hasName("produce before retry 0").hasNoParent(),
466+
span -> span.hasName("process 0").hasParent(trace.getSpan(0))),
467+
trace ->
468+
trace.hasSpansSatisfyingExactly(
469+
span -> span.hasName("produce before retry 1").hasNoParent(),
470+
span -> span.hasName("process 1").hasParent(trace.getSpan(0))),
471+
trace ->
472+
trace.hasSpansSatisfyingExactly(
473+
span -> span.hasName("produce after retry 0").hasNoParent(),
474+
span -> span.hasName("process 0").hasParent(trace.getSpan(0))),
475+
trace ->
476+
trace.hasSpansSatisfyingExactly(
477+
span -> span.hasName("produce after retry 1").hasNoParent(),
478+
span -> span.hasName("process 1").hasParent(trace.getSpan(0))));
479+
}
480+
481+
@Test
482+
void retryWithParentSpan() {
483+
AtomicBoolean beforeRetry = new AtomicBoolean(true);
484+
Flux<Integer> publish =
485+
Flux.create(
486+
sink ->
487+
testing.runWithSpan(
488+
"produce " + (beforeRetry.get() ? "before" : "after") + " retry",
489+
() -> sink.next(0)));
490+
491+
Flux<Object> flux =
492+
Flux.defer(() -> publish.delaySubscription(Duration.ofMillis(1)))
493+
.doOnNext(message -> testing.runWithSpan("process", () -> {}))
494+
.handle(
495+
(message, sink) -> {
496+
if (beforeRetry.compareAndSet(true, false)) {
497+
sink.error(new RuntimeException("Message has error"));
498+
} else {
499+
sink.next(message);
500+
}
501+
})
502+
.retry();
503+
504+
testing.runWithSpan("parent", () -> flux.subscribe());
505+
506+
testing.waitAndAssertTraces(
507+
trace ->
508+
trace.hasSpansSatisfyingExactly(
509+
span -> span.hasName("parent").hasNoParent(),
510+
span -> span.hasName("produce before retry").hasParent(trace.getSpan(0)),
511+
span -> span.hasName("process").hasParent(trace.getSpan(0)),
512+
span -> span.hasName("produce after retry").hasParent(trace.getSpan(0)),
513+
span -> span.hasName("process").hasParent(trace.getSpan(0))));
514+
}
515+
516+
@SuppressWarnings("unchecked")
517+
private static <T> Flux<T> retryWhen(Flux<T> flux) {
518+
try {
519+
Method method = Flux.class.getMethod("retryWhen", Function.class);
520+
Function<Flux<Throwable>, ? extends Publisher<?>> function =
521+
err -> Flux.create(sink -> sink.next(-1));
522+
return (Flux<T>) method.invoke(flux, function);
523+
} catch (NoSuchMethodException exception) {
524+
// ignore
525+
} catch (Exception exception) {
526+
throw new IllegalStateException(exception);
527+
}
528+
529+
try {
530+
Class<?> retryClass = Class.forName("reactor.util.retry.Retry");
531+
Method retryWhenMethod = Flux.class.getMethod("retryWhen", retryClass);
532+
Method retrySpecMethod = retryClass.getMethod("indefinitely");
533+
return (Flux<T>) retryWhenMethod.invoke(flux, retrySpecMethod.invoke(retryClass));
534+
} catch (ClassNotFoundException | NoSuchMethodException exception) {
535+
// ignore
536+
} catch (Exception exception) {
537+
throw new IllegalStateException(exception);
538+
}
539+
throw new IllegalStateException("Could not find retryWhen method");
540+
}
541+
409542
@SuppressWarnings("unchecked")
410543
private <T> Mono<T> monoSpan(Mono<T> mono, String spanName) {
411544

0 commit comments

Comments
 (0)