diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxClientTelemetry.java b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxClientTelemetry.java index 75bb07ce6f6a..653199de340a 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxClientTelemetry.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxClientTelemetry.java @@ -8,6 +8,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator; import io.opentelemetry.instrumentation.spring.webflux.v5_3.internal.WebClientTracingFilter; import java.util.List; import org.springframework.web.reactive.function.client.ClientRequest; @@ -43,6 +44,12 @@ public static SpringWebfluxClientTelemetryBuilder builder(OpenTelemetry openTele this.propagators = propagators; } + /** + * Adds the OpenTelemetry telemetry producing {@link ExchangeFilterFunction} to the provided list + * of filter functions. + * + * @param exchangeFilterFunctions existing filter functions + */ public void addFilter(List exchangeFilterFunctions) { for (ExchangeFilterFunction filterFunction : exchangeFilterFunctions) { if (filterFunction instanceof WebClientTracingFilter) { @@ -51,4 +58,21 @@ public void addFilter(List exchangeFilterFunctions) { } exchangeFilterFunctions.add(new WebClientTracingFilter(clientInstrumenter, propagators)); } + + /** + * Adds the OpenTelemetry telemetry producing {@link ExchangeFilterFunction} to the provided list + * of filter functions. Also registers the Reactor context propagation hook {@link + * ContextPropagationOperator} for propagating OpenTelemetry context into reactive pipelines. + * + * @param exchangeFilterFunctions existing filter functions + */ + public void addFilterAndRegisterReactorHook( + List exchangeFilterFunctions) { + registerReactorHook(); + addFilter(exchangeFilterFunctions); + } + + private static void registerReactorHook() { + ContextPropagationOperator.builder().build().registerOnEachOperator(); + } } diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxServerTelemetry.java b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxServerTelemetry.java index f3c1dacca15f..e91414c04982 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxServerTelemetry.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxServerTelemetry.java @@ -39,10 +39,24 @@ public static SpringWebfluxServerTelemetryBuilder builder(OpenTelemetry openTele this.serverInstrumenter = serverInstrumenter; } + /** + * Returns an OpenTelemetry telemetry producing {@link WebFilter} that can be registered with + * Spring Webflux to instrument HTTP server requests. + * + * @return OpenTelemetry telemetry producing {@link WebFilter} + */ public WebFilter createWebFilter() { return new TelemetryProducingWebFilter(serverInstrumenter); } + /** + * Returns an OpenTelemetry telemetry producing {@link WebFilter} that can be registered with + * Spring Webflux to instrument HTTP server requests. Also registers the Reactor context + * propagation hook {@link ContextPropagationOperator} for propagating OpenTelemetry context into + * reactive pipelines. + * + * @return OpenTelemetry telemetry producing {@link WebFilter} + */ public WebFilter createWebFilterAndRegisterReactorHook() { registerReactorHook(); return this.createWebFilter(); diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxClientInstrumentationTest.java b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxClientInstrumentationTest.java index 723c4fe63505..dfa998e30322 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxClientInstrumentationTest.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxClientInstrumentationTest.java @@ -28,6 +28,6 @@ protected WebClient.Builder instrument(WebClient.Builder builder) { .setCapturedResponseHeaders( Collections.singletonList(AbstractHttpClientTest.TEST_RESPONSE_HEADER)) .build(); - return builder.filters(instrumentation::addFilter); + return builder.filters(instrumentation::addFilterAndRegisterReactorHook); } } diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/AbstractSpringWebfluxClientInstrumentationTest.java b/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/AbstractSpringWebfluxClientInstrumentationTest.java index 8889a3780fd3..1a867e368868 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/AbstractSpringWebfluxClientInstrumentationTest.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/AbstractSpringWebfluxClientInstrumentationTest.java @@ -19,8 +19,6 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult; import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; @@ -77,16 +75,8 @@ public void sendRequestWithCallback( Map headers, HttpClientResult httpClientResult) { if (Webflux7Util.isWebflux7) { - // FIXME: context is not propagated to the callback, this needs to be fixed - Context context = Context.current(); Webflux7Util.sendRequestWithCallback( - request, - status -> { - try (Scope ignore = context.makeCurrent()) { - httpClientResult.complete(status); - } - }, - httpClientResult::complete); + request, httpClientResult::complete, httpClientResult::complete); } else { request .exchange()