diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/HeaderUtil.java b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/HeaderUtil.java new file mode 100644 index 000000000000..a1647d4f1526 --- /dev/null +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/HeaderUtil.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.spring.webflux.v5_3; + +import static java.util.Collections.emptyList; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.List; +import javax.annotation.Nullable; +import org.springframework.http.HttpHeaders; + +class HeaderUtil { + @Nullable private static final MethodHandle GET_HEADERS; + + static { + // since spring web 7.0 + MethodHandle methodHandle = + findGetHeadersMethod(MethodType.methodType(List.class, String.class, List.class)); + if (methodHandle == null) { + // up to spring web 7.0 + methodHandle = + findGetHeadersMethod(MethodType.methodType(Object.class, Object.class, Object.class)); + } + GET_HEADERS = methodHandle; + } + + private static MethodHandle findGetHeadersMethod(MethodType methodType) { + try { + return MethodHandles.lookup().findVirtual(HttpHeaders.class, "getOrDefault", methodType); + } catch (Throwable t) { + return null; + } + } + + @SuppressWarnings("unchecked") // casting MethodHandle.invoke result + static List getHeader(HttpHeaders headers, String name) { + if (GET_HEADERS != null) { + try { + return (List) GET_HEADERS.invoke(headers, name, emptyList()); + } catch (Throwable t) { + // ignore + } + } + return emptyList(); + } + + private HeaderUtil() {} +} diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/WebfluxServerHttpAttributesGetter.java b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/WebfluxServerHttpAttributesGetter.java index 73a8089878d5..da12d0eb1b25 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/WebfluxServerHttpAttributesGetter.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/WebfluxServerHttpAttributesGetter.java @@ -6,10 +6,13 @@ package io.opentelemetry.instrumentation.spring.webflux.v5_3; import io.opentelemetry.instrumentation.api.semconv.http.HttpServerAttributesGetter; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; import java.net.InetSocketAddress; -import java.util.Collections; import java.util.List; import javax.annotation.Nullable; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.web.reactive.HandlerMapping; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.util.pattern.PathPattern; @@ -18,6 +21,64 @@ enum WebfluxServerHttpAttributesGetter implements HttpServerAttributesGetter { INSTANCE; + private static final MethodHandle GET_RAW_STATUS_CODE; + private static final MethodHandle GET_STATUS_CODE; + private static final MethodHandle STATUS_CODE_VALUE; + + static { + MethodHandle getRawStatusCode = null; + MethodHandle getStatusCode = null; + MethodHandle statusCodeValue = null; + + MethodHandles.Lookup lookup = MethodHandles.publicLookup(); + + // up to webflux 7.0 + try { + getRawStatusCode = + lookup.findVirtual( + ServerHttpResponse.class, "getRawStatusCode", MethodType.methodType(Integer.class)); + } catch (Exception exception) { + // ignore + } + + // since webflux 7.0 + try { + Class httpStatusCodeClass = Class.forName("org.springframework.http.HttpStatusCode"); + getStatusCode = + lookup.findVirtual( + ServerHttpResponse.class, + "getStatusCode", + MethodType.methodType(httpStatusCodeClass)); + statusCodeValue = + lookup.findVirtual(httpStatusCodeClass, "value", MethodType.methodType(int.class)); + } catch (Exception exception) { + // ignore + } + + GET_RAW_STATUS_CODE = getRawStatusCode; + GET_STATUS_CODE = getStatusCode; + STATUS_CODE_VALUE = statusCodeValue; + } + + private static Integer getStatusCode(ServerHttpResponse response) { + if (GET_RAW_STATUS_CODE != null) { + try { + return (Integer) GET_RAW_STATUS_CODE.invoke(response); + } catch (Throwable e) { + // ignore + } + } + if (GET_STATUS_CODE != null && STATUS_CODE_VALUE != null) { + try { + Object statusCode = GET_STATUS_CODE.invoke(response); + return (Integer) STATUS_CODE_VALUE.invoke(statusCode); + } catch (Throwable e) { + // ignore + } + } + return null; + } + @Override public String getHttpRequestMethod(ServerWebExchange request) { return request.getRequest().getMethod().name(); @@ -25,20 +86,20 @@ public String getHttpRequestMethod(ServerWebExchange request) { @Override public List getHttpRequestHeader(ServerWebExchange request, String name) { - return request.getRequest().getHeaders().getOrDefault(name, Collections.emptyList()); + return HeaderUtil.getHeader(request.getRequest().getHeaders(), name); } @Nullable @Override public Integer getHttpResponseStatusCode( ServerWebExchange request, ServerWebExchange response, @Nullable Throwable error) { - return response.getResponse().getRawStatusCode(); + return getStatusCode(response.getResponse()); } @Override public List getHttpResponseHeader( ServerWebExchange request, ServerWebExchange response, String name) { - return response.getResponse().getHeaders().getOrDefault(name, Collections.emptyList()); + return HeaderUtil.getHeader(response.getResponse().getHeaders(), name); } @Nullable diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/WebfluxTextMapGetter.java b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/WebfluxTextMapGetter.java index 7bf8370c8451..e1f0f88c8155 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/WebfluxTextMapGetter.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/WebfluxTextMapGetter.java @@ -9,7 +9,6 @@ import io.opentelemetry.context.propagation.TextMapGetter; import java.util.Iterator; -import java.util.List; import javax.annotation.Nullable; import org.springframework.web.server.ServerWebExchange; @@ -35,7 +34,6 @@ public Iterator getAll(@Nullable ServerWebExchange exchange, String key) if (exchange == null) { return emptyIterator(); } - List list = exchange.getRequest().getHeaders().get(key); - return list != null ? list.iterator() : emptyIterator(); + return HeaderUtil.getHeader(exchange.getRequest().getHeaders(), key).iterator(); } } diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing-webflux7/build.gradle.kts b/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing-webflux7/build.gradle.kts new file mode 100644 index 000000000000..93650b685b6e --- /dev/null +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing-webflux7/build.gradle.kts @@ -0,0 +1,8 @@ +plugins { + id("otel.java-conventions") +} + +dependencies { + implementation("io.opentelemetry.javaagent:opentelemetry-testing-common") + compileOnly("org.springframework:spring-webflux:7.0.0") +} diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing-webflux7/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/Webflux7Util.java b/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing-webflux7/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/Webflux7Util.java new file mode 100644 index 000000000000..78715f655435 --- /dev/null +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing-webflux7/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/Webflux7Util.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.spring.webflux.client; + +import java.util.function.Consumer; +import java.util.function.Function; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +public class Webflux7Util { + static final boolean isWebflux7 = detectWebflux7(); + + private static boolean detectWebflux7() { + try { + WebClient.RequestBodySpec.class.getMethod("exchange"); + return false; + } catch (NoSuchMethodException e) { + return true; + } + } + + static Mono exchangeToMono(WebClient.RequestBodySpec request) { + return request.exchangeToMono(Mono::just); + } + + static T doRequest( + WebClient.RequestBodySpec request, Function> handler) { + return request.exchangeToMono(handler).block(); + } + + static int doRequest(WebClient.RequestBodySpec request) { + return doRequest(request, response -> Mono.just(response.statusCode().value())); + } + + static int getStatusCode(ClientResponse response) { + return response.statusCode().value(); + } + + static void sendRequestWithCallback( + WebClient.RequestBodySpec request, + Consumer callback, + Consumer errorCallback) { + request + .exchangeToMono(response -> Mono.just(response.statusCode().value())) + .subscribe(callback, errorCallback); + } + + private Webflux7Util() {} +} diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/build.gradle.kts b/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/build.gradle.kts index ff988484a79f..0fc385e348b2 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/build.gradle.kts +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/build.gradle.kts @@ -4,6 +4,7 @@ plugins { dependencies { implementation("io.opentelemetry.javaagent:opentelemetry-testing-common") + implementation(project(":instrumentation:spring:spring-webflux:spring-webflux-5.3:testing-webflux7")) compileOnly("org.springframework:spring-webflux:5.0.0.RELEASE") diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/AbstractSpringWebfluxClientInstrumentationTest.java b/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/AbstractSpringWebfluxClientInstrumentationTest.java index 2b99c0b37dac..8889a3780fd3 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/AbstractSpringWebfluxClientInstrumentationTest.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/AbstractSpringWebfluxClientInstrumentationTest.java @@ -19,6 +19,8 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult; import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; @@ -35,6 +37,7 @@ import org.springframework.http.HttpMethod; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; public abstract class AbstractSpringWebfluxClientInstrumentationTest extends AbstractHttpClientTest { @@ -58,6 +61,10 @@ public WebClient.RequestBodySpec buildRequest( @Override public int sendRequest( WebClient.RequestBodySpec request, String method, URI uri, Map headers) { + if (Webflux7Util.isWebflux7) { + return Webflux7Util.doRequest(request); + } + ClientResponse response = requireNonNull(request.exchange().block()); return getStatusCode(response); } @@ -69,11 +76,24 @@ public void sendRequestWithCallback( URI uri, Map headers, HttpClientResult httpClientResult) { - request - .exchange() - .subscribe( - response -> httpClientResult.complete(getStatusCode(response)), - httpClientResult::complete); + if (Webflux7Util.isWebflux7) { + // FIXME: context is not propagated to the callback, this needs to be fixed + Context context = Context.current(); + Webflux7Util.sendRequestWithCallback( + request, + status -> { + try (Scope ignore = context.makeCurrent()) { + httpClientResult.complete(status); + } + }, + httpClientResult::complete); + } else { + request + .exchange() + .subscribe( + response -> httpClientResult.complete(getStatusCode(response)), + httpClientResult::complete); + } } @Override @@ -164,12 +184,17 @@ void shouldEndSpanOnMonoTimeout() { () -> testing.runWithSpan( "parent", - () -> - buildRequest("GET", uri, emptyMap()) - .exchange() - // apply Mono timeout that is way shorter than HTTP request timeout - .timeout(Duration.ofSeconds(1)) - .block())); + () -> { + WebClient.RequestBodySpec request = buildRequest("GET", uri, emptyMap()); + Mono mono; + if (Webflux7Util.isWebflux7) { + mono = Webflux7Util.exchangeToMono(request); + } else { + mono = request.exchange(); + } + // apply Mono timeout that is way shorter than HTTP request timeout + return mono.timeout(Duration.ofSeconds(1)).block(); + })); testing.waitAndAssertTraces( trace -> diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/SpringWebfluxSingleConnection.java b/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/SpringWebfluxSingleConnection.java index 0b109f40a3ae..f7665ee6bd12 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/SpringWebfluxSingleConnection.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/SpringWebfluxSingleConnection.java @@ -16,6 +16,7 @@ import org.springframework.http.HttpMethod; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; final class SpringWebfluxSingleConnection implements SingleConnection { @@ -49,16 +50,31 @@ public int doRequest(String path, Map headers) throws Exception WebClient.RequestBodySpec request = webClient.method(HttpMethod.GET).uri(uri).headers(h -> headers.forEach(h::add)); - ClientResponse response = request.exchange().block(); - // read response body, this seems to be needed to ensure that the connection can be reused - response.bodyToMono(String.class).block(); + if (Webflux7Util.isWebflux7) { + return Webflux7Util.doRequest( + request, + response -> { + String responseId = response.headers().asHttpHeaders().getFirst(REQUEST_ID_HEADER); + if (!requestId.equals(responseId)) { + return Mono.error( + new IllegalStateException( + String.format( + "Received response with id %s, expected %s", responseId, requestId))); + } + return Mono.just(Webflux7Util.getStatusCode(response)); + }); + } else { + ClientResponse response = request.exchange().block(); + // read response body, this seems to be needed to ensure that the connection can be reused + response.bodyToMono(String.class).block(); - String responseId = response.headers().asHttpHeaders().getFirst(REQUEST_ID_HEADER); - if (!requestId.equals(responseId)) { - throw new IllegalStateException( - String.format("Received response with id %s, expected %s", responseId, requestId)); - } + String responseId = response.headers().asHttpHeaders().getFirst(REQUEST_ID_HEADER); + if (!requestId.equals(responseId)) { + throw new IllegalStateException( + String.format("Received response with id %s, expected %s", responseId, requestId)); + } - return response.statusCode().value(); + return response.statusCode().value(); + } } } diff --git a/settings.gradle.kts b/settings.gradle.kts index a4bf32bfa40d..c70af1f4cfc4 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -641,6 +641,7 @@ include(":instrumentation:spring:spring-webflux:spring-webflux-5.0:javaagent") include(":instrumentation:spring:spring-webflux:spring-webflux-5.0:testing") include(":instrumentation:spring:spring-webflux:spring-webflux-5.3:library") include(":instrumentation:spring:spring-webflux:spring-webflux-5.3:testing") +include(":instrumentation:spring:spring-webflux:spring-webflux-5.3:testing-webflux7") include(":instrumentation:spring:spring-webmvc:spring-webmvc-3.1:javaagent") include(":instrumentation:spring:spring-webmvc:spring-webmvc-3.1:wildfly-testing") include(":instrumentation:spring:spring-webmvc:spring-webmvc-5.3:library")