Skip to content

Commit fb2acf9

Browse files
authored
Fix spring webflux latest dep tests (#15304)
1 parent 187c585 commit fb2acf9

File tree

9 files changed

+243
-27
lines changed

9 files changed

+243
-27
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.spring.webflux.v5_3;
7+
8+
import static java.util.Collections.emptyList;
9+
10+
import java.lang.invoke.MethodHandle;
11+
import java.lang.invoke.MethodHandles;
12+
import java.lang.invoke.MethodType;
13+
import java.util.List;
14+
import javax.annotation.Nullable;
15+
import org.springframework.http.HttpHeaders;
16+
17+
class HeaderUtil {
18+
@Nullable private static final MethodHandle GET_HEADERS;
19+
20+
static {
21+
// since spring web 7.0
22+
MethodHandle methodHandle =
23+
findGetHeadersMethod(MethodType.methodType(List.class, String.class, List.class));
24+
if (methodHandle == null) {
25+
// up to spring web 7.0
26+
methodHandle =
27+
findGetHeadersMethod(MethodType.methodType(Object.class, Object.class, Object.class));
28+
}
29+
GET_HEADERS = methodHandle;
30+
}
31+
32+
private static MethodHandle findGetHeadersMethod(MethodType methodType) {
33+
try {
34+
return MethodHandles.lookup().findVirtual(HttpHeaders.class, "getOrDefault", methodType);
35+
} catch (Throwable t) {
36+
return null;
37+
}
38+
}
39+
40+
@SuppressWarnings("unchecked") // casting MethodHandle.invoke result
41+
static List<String> getHeader(HttpHeaders headers, String name) {
42+
if (GET_HEADERS != null) {
43+
try {
44+
return (List<String>) GET_HEADERS.invoke(headers, name, emptyList());
45+
} catch (Throwable t) {
46+
// ignore
47+
}
48+
}
49+
return emptyList();
50+
}
51+
52+
private HeaderUtil() {}
53+
}

instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/WebfluxServerHttpAttributesGetter.java

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@
66
package io.opentelemetry.instrumentation.spring.webflux.v5_3;
77

88
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerAttributesGetter;
9+
import java.lang.invoke.MethodHandle;
10+
import java.lang.invoke.MethodHandles;
11+
import java.lang.invoke.MethodType;
912
import java.net.InetSocketAddress;
10-
import java.util.Collections;
1113
import java.util.List;
1214
import javax.annotation.Nullable;
15+
import org.springframework.http.server.reactive.ServerHttpResponse;
1316
import org.springframework.web.reactive.HandlerMapping;
1417
import org.springframework.web.server.ServerWebExchange;
1518
import org.springframework.web.util.pattern.PathPattern;
@@ -18,27 +21,85 @@ enum WebfluxServerHttpAttributesGetter
1821
implements HttpServerAttributesGetter<ServerWebExchange, ServerWebExchange> {
1922
INSTANCE;
2023

24+
private static final MethodHandle GET_RAW_STATUS_CODE;
25+
private static final MethodHandle GET_STATUS_CODE;
26+
private static final MethodHandle STATUS_CODE_VALUE;
27+
28+
static {
29+
MethodHandle getRawStatusCode = null;
30+
MethodHandle getStatusCode = null;
31+
MethodHandle statusCodeValue = null;
32+
33+
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
34+
35+
// up to webflux 7.0
36+
try {
37+
getRawStatusCode =
38+
lookup.findVirtual(
39+
ServerHttpResponse.class, "getRawStatusCode", MethodType.methodType(Integer.class));
40+
} catch (Exception exception) {
41+
// ignore
42+
}
43+
44+
// since webflux 7.0
45+
try {
46+
Class<?> httpStatusCodeClass = Class.forName("org.springframework.http.HttpStatusCode");
47+
getStatusCode =
48+
lookup.findVirtual(
49+
ServerHttpResponse.class,
50+
"getStatusCode",
51+
MethodType.methodType(httpStatusCodeClass));
52+
statusCodeValue =
53+
lookup.findVirtual(httpStatusCodeClass, "value", MethodType.methodType(int.class));
54+
} catch (Exception exception) {
55+
// ignore
56+
}
57+
58+
GET_RAW_STATUS_CODE = getRawStatusCode;
59+
GET_STATUS_CODE = getStatusCode;
60+
STATUS_CODE_VALUE = statusCodeValue;
61+
}
62+
63+
private static Integer getStatusCode(ServerHttpResponse response) {
64+
if (GET_RAW_STATUS_CODE != null) {
65+
try {
66+
return (Integer) GET_RAW_STATUS_CODE.invoke(response);
67+
} catch (Throwable e) {
68+
// ignore
69+
}
70+
}
71+
if (GET_STATUS_CODE != null && STATUS_CODE_VALUE != null) {
72+
try {
73+
Object statusCode = GET_STATUS_CODE.invoke(response);
74+
return (Integer) STATUS_CODE_VALUE.invoke(statusCode);
75+
} catch (Throwable e) {
76+
// ignore
77+
}
78+
}
79+
return null;
80+
}
81+
2182
@Override
2283
public String getHttpRequestMethod(ServerWebExchange request) {
2384
return request.getRequest().getMethod().name();
2485
}
2586

2687
@Override
2788
public List<String> getHttpRequestHeader(ServerWebExchange request, String name) {
28-
return request.getRequest().getHeaders().getOrDefault(name, Collections.emptyList());
89+
return HeaderUtil.getHeader(request.getRequest().getHeaders(), name);
2990
}
3091

3192
@Nullable
3293
@Override
3394
public Integer getHttpResponseStatusCode(
3495
ServerWebExchange request, ServerWebExchange response, @Nullable Throwable error) {
35-
return response.getResponse().getRawStatusCode();
96+
return getStatusCode(response.getResponse());
3697
}
3798

3899
@Override
39100
public List<String> getHttpResponseHeader(
40101
ServerWebExchange request, ServerWebExchange response, String name) {
41-
return response.getResponse().getHeaders().getOrDefault(name, Collections.emptyList());
102+
return HeaderUtil.getHeader(response.getResponse().getHeaders(), name);
42103
}
43104

44105
@Nullable

instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/WebfluxTextMapGetter.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import io.opentelemetry.context.propagation.TextMapGetter;
1111
import java.util.Iterator;
12-
import java.util.List;
1312
import javax.annotation.Nullable;
1413
import org.springframework.web.server.ServerWebExchange;
1514

@@ -35,7 +34,6 @@ public Iterator<String> getAll(@Nullable ServerWebExchange exchange, String key)
3534
if (exchange == null) {
3635
return emptyIterator();
3736
}
38-
List<String> list = exchange.getRequest().getHeaders().get(key);
39-
return list != null ? list.iterator() : emptyIterator();
37+
return HeaderUtil.getHeader(exchange.getRequest().getHeaders(), key).iterator();
4038
}
4139
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
plugins {
2+
id("otel.java-conventions")
3+
}
4+
5+
dependencies {
6+
implementation("io.opentelemetry.javaagent:opentelemetry-testing-common")
7+
compileOnly("org.springframework:spring-webflux:7.0.0")
8+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.spring.webflux.client;
7+
8+
import java.util.function.Consumer;
9+
import java.util.function.Function;
10+
import org.springframework.web.reactive.function.client.ClientResponse;
11+
import org.springframework.web.reactive.function.client.WebClient;
12+
import reactor.core.publisher.Mono;
13+
14+
public class Webflux7Util {
15+
static final boolean isWebflux7 = detectWebflux7();
16+
17+
private static boolean detectWebflux7() {
18+
try {
19+
WebClient.RequestBodySpec.class.getMethod("exchange");
20+
return false;
21+
} catch (NoSuchMethodException e) {
22+
return true;
23+
}
24+
}
25+
26+
static Mono<ClientResponse> exchangeToMono(WebClient.RequestBodySpec request) {
27+
return request.exchangeToMono(Mono::just);
28+
}
29+
30+
static <T> T doRequest(
31+
WebClient.RequestBodySpec request, Function<ClientResponse, Mono<T>> handler) {
32+
return request.exchangeToMono(handler).block();
33+
}
34+
35+
static int doRequest(WebClient.RequestBodySpec request) {
36+
return doRequest(request, response -> Mono.just(response.statusCode().value()));
37+
}
38+
39+
static int getStatusCode(ClientResponse response) {
40+
return response.statusCode().value();
41+
}
42+
43+
static void sendRequestWithCallback(
44+
WebClient.RequestBodySpec request,
45+
Consumer<Integer> callback,
46+
Consumer<Throwable> errorCallback) {
47+
request
48+
.exchangeToMono(response -> Mono.just(response.statusCode().value()))
49+
.subscribe(callback, errorCallback);
50+
}
51+
52+
private Webflux7Util() {}
53+
}

instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ plugins {
44

55
dependencies {
66
implementation("io.opentelemetry.javaagent:opentelemetry-testing-common")
7+
implementation(project(":instrumentation:spring:spring-webflux:spring-webflux-5.3:testing-webflux7"))
78

89
compileOnly("org.springframework:spring-webflux:5.0.0.RELEASE")
910

instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/AbstractSpringWebfluxClientInstrumentationTest.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import io.opentelemetry.api.common.AttributeKey;
2121
import io.opentelemetry.api.trace.SpanKind;
22+
import io.opentelemetry.context.Context;
23+
import io.opentelemetry.context.Scope;
2224
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
2325
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
2426
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
@@ -35,6 +37,7 @@
3537
import org.springframework.http.HttpMethod;
3638
import org.springframework.web.reactive.function.client.ClientResponse;
3739
import org.springframework.web.reactive.function.client.WebClient;
40+
import reactor.core.publisher.Mono;
3841

3942
public abstract class AbstractSpringWebfluxClientInstrumentationTest
4043
extends AbstractHttpClientTest<WebClient.RequestBodySpec> {
@@ -58,6 +61,10 @@ public WebClient.RequestBodySpec buildRequest(
5861
@Override
5962
public int sendRequest(
6063
WebClient.RequestBodySpec request, String method, URI uri, Map<String, String> headers) {
64+
if (Webflux7Util.isWebflux7) {
65+
return Webflux7Util.doRequest(request);
66+
}
67+
6168
ClientResponse response = requireNonNull(request.exchange().block());
6269
return getStatusCode(response);
6370
}
@@ -69,11 +76,24 @@ public void sendRequestWithCallback(
6976
URI uri,
7077
Map<String, String> headers,
7178
HttpClientResult httpClientResult) {
72-
request
73-
.exchange()
74-
.subscribe(
75-
response -> httpClientResult.complete(getStatusCode(response)),
76-
httpClientResult::complete);
79+
if (Webflux7Util.isWebflux7) {
80+
// FIXME: context is not propagated to the callback, this needs to be fixed
81+
Context context = Context.current();
82+
Webflux7Util.sendRequestWithCallback(
83+
request,
84+
status -> {
85+
try (Scope ignore = context.makeCurrent()) {
86+
httpClientResult.complete(status);
87+
}
88+
},
89+
httpClientResult::complete);
90+
} else {
91+
request
92+
.exchange()
93+
.subscribe(
94+
response -> httpClientResult.complete(getStatusCode(response)),
95+
httpClientResult::complete);
96+
}
7797
}
7898

7999
@Override
@@ -164,12 +184,17 @@ void shouldEndSpanOnMonoTimeout() {
164184
() ->
165185
testing.runWithSpan(
166186
"parent",
167-
() ->
168-
buildRequest("GET", uri, emptyMap())
169-
.exchange()
170-
// apply Mono timeout that is way shorter than HTTP request timeout
171-
.timeout(Duration.ofSeconds(1))
172-
.block()));
187+
() -> {
188+
WebClient.RequestBodySpec request = buildRequest("GET", uri, emptyMap());
189+
Mono<ClientResponse> mono;
190+
if (Webflux7Util.isWebflux7) {
191+
mono = Webflux7Util.exchangeToMono(request);
192+
} else {
193+
mono = request.exchange();
194+
}
195+
// apply Mono timeout that is way shorter than HTTP request timeout
196+
return mono.timeout(Duration.ofSeconds(1)).block();
197+
}));
173198

174199
testing.waitAndAssertTraces(
175200
trace ->

instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/SpringWebfluxSingleConnection.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.springframework.http.HttpMethod;
1717
import org.springframework.web.reactive.function.client.ClientResponse;
1818
import org.springframework.web.reactive.function.client.WebClient;
19+
import reactor.core.publisher.Mono;
1920

2021
final class SpringWebfluxSingleConnection implements SingleConnection {
2122

@@ -49,16 +50,31 @@ public int doRequest(String path, Map<String, String> headers) throws Exception
4950
WebClient.RequestBodySpec request =
5051
webClient.method(HttpMethod.GET).uri(uri).headers(h -> headers.forEach(h::add));
5152

52-
ClientResponse response = request.exchange().block();
53-
// read response body, this seems to be needed to ensure that the connection can be reused
54-
response.bodyToMono(String.class).block();
53+
if (Webflux7Util.isWebflux7) {
54+
return Webflux7Util.doRequest(
55+
request,
56+
response -> {
57+
String responseId = response.headers().asHttpHeaders().getFirst(REQUEST_ID_HEADER);
58+
if (!requestId.equals(responseId)) {
59+
return Mono.error(
60+
new IllegalStateException(
61+
String.format(
62+
"Received response with id %s, expected %s", responseId, requestId)));
63+
}
64+
return Mono.just(Webflux7Util.getStatusCode(response));
65+
});
66+
} else {
67+
ClientResponse response = request.exchange().block();
68+
// read response body, this seems to be needed to ensure that the connection can be reused
69+
response.bodyToMono(String.class).block();
5570

56-
String responseId = response.headers().asHttpHeaders().getFirst(REQUEST_ID_HEADER);
57-
if (!requestId.equals(responseId)) {
58-
throw new IllegalStateException(
59-
String.format("Received response with id %s, expected %s", responseId, requestId));
60-
}
71+
String responseId = response.headers().asHttpHeaders().getFirst(REQUEST_ID_HEADER);
72+
if (!requestId.equals(responseId)) {
73+
throw new IllegalStateException(
74+
String.format("Received response with id %s, expected %s", responseId, requestId));
75+
}
6176

62-
return response.statusCode().value();
77+
return response.statusCode().value();
78+
}
6379
}
6480
}

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,7 @@ include(":instrumentation:spring:spring-webflux:spring-webflux-5.0:javaagent")
641641
include(":instrumentation:spring:spring-webflux:spring-webflux-5.0:testing")
642642
include(":instrumentation:spring:spring-webflux:spring-webflux-5.3:library")
643643
include(":instrumentation:spring:spring-webflux:spring-webflux-5.3:testing")
644+
include(":instrumentation:spring:spring-webflux:spring-webflux-5.3:testing-webflux7")
644645
include(":instrumentation:spring:spring-webmvc:spring-webmvc-3.1:javaagent")
645646
include(":instrumentation:spring:spring-webmvc:spring-webmvc-3.1:wildfly-testing")
646647
include(":instrumentation:spring:spring-webmvc:spring-webmvc-5.3:library")

0 commit comments

Comments
 (0)