Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator;
import io.opentelemetry.instrumentation.spring.webflux.v5_3.internal.WebClientTracingFilter;
import java.util.List;
import org.springframework.web.reactive.function.client.ClientRequest;
Expand Down Expand Up @@ -43,6 +44,12 @@ public static SpringWebfluxClientTelemetryBuilder builder(OpenTelemetry openTele
this.propagators = propagators;
}

/**
* Adds the OpenTelemetry telemetry producing {@link ExchangeFilterFunction} to the provided list
* of filter functions.
*
* @param exchangeFilterFunctions existing filter functions
*/
public void addFilter(List<ExchangeFilterFunction> exchangeFilterFunctions) {
for (ExchangeFilterFunction filterFunction : exchangeFilterFunctions) {
if (filterFunction instanceof WebClientTracingFilter) {
Expand All @@ -51,4 +58,21 @@ public void addFilter(List<ExchangeFilterFunction> exchangeFilterFunctions) {
}
exchangeFilterFunctions.add(new WebClientTracingFilter(clientInstrumenter, propagators));
}

/**
* Adds the OpenTelemetry telemetry producing {@link ExchangeFilterFunction} to the provided list
* of filter functions. Also registers the Reactor context propagation hook {@link
* ContextPropagationOperator} for propagating OpenTelemetry context into reactive pipelines.
*
* @param exchangeFilterFunctions existing filter functions
*/
public void addFilterAndRegisterReactorHook(
List<ExchangeFilterFunction> exchangeFilterFunctions) {
registerReactorHook();
addFilter(exchangeFilterFunctions);
}

private static void registerReactorHook() {
ContextPropagationOperator.builder().build().registerOnEachOperator();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any issue with calling this twice?

is the instrumentation broken in 7.0 if you don't call this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any issue with calling this twice?

There shouldn't be an issue, webflux server instrumentation is doing it exactly the same way. ContextPropagationOperator has a boolean that avoids doing anything after it has already been registered once.

is the instrumentation broken in 7.0 if you don't call this?

This is only needed for the library instrumentation. Instrumentation is mostly functional just that context isn't propagated into reactive callbacks. I believe that this is not a regression and the same issue was already present for older webflux versions, our tests just didn't catch it.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,24 @@ public static SpringWebfluxServerTelemetryBuilder builder(OpenTelemetry openTele
this.serverInstrumenter = serverInstrumenter;
}

/**
* Returns an OpenTelemetry telemetry producing {@link WebFilter} that can be registered with
* Spring Webflux to instrument HTTP server requests.
*
* @return OpenTelemetry telemetry producing {@link WebFilter}
*/
public WebFilter createWebFilter() {
return new TelemetryProducingWebFilter(serverInstrumenter);
}

/**
* Returns an OpenTelemetry telemetry producing {@link WebFilter} that can be registered with
* Spring Webflux to instrument HTTP server requests. Also registers the Reactor context
* propagation hook {@link ContextPropagationOperator} for propagating OpenTelemetry context into
* reactive pipelines.
*
* @return OpenTelemetry telemetry producing {@link WebFilter}
*/
public WebFilter createWebFilterAndRegisterReactorHook() {
registerReactorHook();
return this.createWebFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ protected WebClient.Builder instrument(WebClient.Builder builder) {
.setCapturedResponseHeaders(
Collections.singletonList(AbstractHttpClientTest.TEST_RESPONSE_HEADER))
.build();
return builder.filters(instrumentation::addFilter);
return builder.filters(instrumentation::addFilterAndRegisterReactorHook);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
Expand Down Expand Up @@ -77,16 +75,8 @@ public void sendRequestWithCallback(
Map<String, String> headers,
HttpClientResult httpClientResult) {
if (Webflux7Util.isWebflux7) {
// FIXME: context is not propagated to the callback, this needs to be fixed
Context context = Context.current();
Webflux7Util.sendRequestWithCallback(
request,
status -> {
try (Scope ignore = context.makeCurrent()) {
httpClientResult.complete(status);
}
},
httpClientResult::complete);
request, httpClientResult::complete, httpClientResult::complete);
} else {
request
.exchange()
Expand Down
Loading