Skip to content

Commit c03ccb2

Browse files
marcingrzejszczakbclozel
authored andcommitted
Propagate context in reactive HTTP server and client
Prior to this commit, the ServerHttpObservationFilter would not add the current observation as a key in the Reactor context, preventing from being used or propagated during the HTTP exchange handling. Also, the client instrumentation in `DefaultWebClient` would start the observation once the request is fully formed and immutable, preventing the context from being propagated through HTTP request headers. This commit fixes both uses cases now by: * adding the current observation as a key in the reactor context on the server side * using the `ClientRequest.Builder` as a Carrier on the client side Closes gh-29388
1 parent 396336f commit c03ccb2

File tree

7 files changed

+111
-25
lines changed

7 files changed

+111
-25
lines changed

spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ public class ServerHttpObservationFilter implements WebFilter {
5959
private static final Set<String> DISCONNECTED_CLIENT_EXCEPTIONS = Set.of("AbortedException",
6060
"ClientAbortException", "EOFException", "EofException");
6161

62+
/**
63+
* Aligned with ObservationThreadLocalAccessor#KEY from micrometer-core.
64+
*/
65+
private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";
66+
6267
private final ObservationRegistry observationRegistry;
6368

6469
private final ServerRequestObservationConvention observationConvention;
@@ -117,7 +122,8 @@ private Publisher<Void> filter(ServerWebExchange exchange, ServerRequestObservat
117122
.doOnCancel(() -> {
118123
observationContext.setConnectionAborted(true);
119124
observation.stop();
120-
});
125+
})
126+
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, observation));
121127
}
122128

123129
private void onTerminalSignal(Observation observation, ServerWebExchange exchange) {

spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.Optional;
2121

22+
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
2223
import io.micrometer.observation.tck.TestObservationRegistry;
2324
import io.micrometer.observation.tck.TestObservationRegistryAssert;
2425
import org.assertj.core.api.ThrowingConsumer;
@@ -59,6 +60,18 @@ void filterShouldFillObservationContext() {
5960
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS");
6061
}
6162

63+
@Test
64+
void filterShouldAddNewObservationToReactorContext() {
65+
ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource"));
66+
exchange.getResponse().setRawStatusCode(200);
67+
WebFilterChain filterChain = webExchange -> Mono.deferContextual(contextView -> {
68+
assertThat(contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent();
69+
return Mono.empty();
70+
});
71+
this.filter.filter(exchange, filterChain).block();
72+
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS");
73+
}
74+
6275
@Test
6376
void filterShouldUseThrownException() {
6477
ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource"));

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientRequestObservationContext.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,24 @@
2727
* @author Brian Clozel
2828
* @since 6.0
2929
*/
30-
public class ClientRequestObservationContext extends RequestReplySenderContext<ClientRequest, ClientResponse> {
30+
public class ClientRequestObservationContext extends RequestReplySenderContext<ClientRequest.Builder, ClientResponse> {
3131

3232
@Nullable
3333
private String uriTemplate;
3434

3535
private boolean aborted;
3636

37+
@Nullable
38+
private ClientRequest builtRequest;
39+
3740

3841
public ClientRequestObservationContext() {
3942
super(ClientRequestObservationContext::setRequestHeader);
4043
}
4144

42-
private static void setRequestHeader(@Nullable ClientRequest request, String name, String value) {
45+
private static void setRequestHeader(@Nullable ClientRequest.Builder request, String name, String value) {
4346
if (request != null) {
44-
request.headers().set(name, value);
47+
request.header(name, value);
4548
}
4649
}
4750

@@ -75,4 +78,18 @@ public boolean isAborted() {
7578
void setAborted(boolean aborted) {
7679
this.aborted = aborted;
7780
}
81+
82+
/**
83+
* Return the built request.
84+
*/
85+
public ClientRequest getBuiltRequest() {
86+
return this.builtRequest;
87+
}
88+
89+
/**
90+
* Set the built request.
91+
*/
92+
public void setBuiltRequest(ClientRequest builtRequest) {
93+
this.builtRequest = builtRequest;
94+
}
7895
}

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientRequestObservationConvention.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public String getName() {
7979

8080
@Override
8181
public String getContextualName(ClientRequestObservationContext context) {
82-
return "http " + context.getCarrier().method().name().toLowerCase();
82+
return "http " + context.getBuiltRequest().method().name().toLowerCase();
8383
}
8484

8585
@Override
@@ -95,8 +95,8 @@ protected KeyValue uri(ClientRequestObservationContext context) {
9595
}
9696

9797
protected KeyValue method(ClientRequestObservationContext context) {
98-
if (context.getCarrier() != null) {
99-
return KeyValue.of(ClientHttpObservationDocumentation.LowCardinalityKeyNames.METHOD, context.getCarrier().method().name());
98+
if (context.getBuiltRequest() != null) {
99+
return KeyValue.of(ClientHttpObservationDocumentation.LowCardinalityKeyNames.METHOD, context.getBuiltRequest().method().name());
100100
}
101101
else {
102102
return METHOD_NONE;
@@ -143,15 +143,15 @@ public KeyValues getHighCardinalityKeyValues(ClientRequestObservationContext con
143143
}
144144

145145
protected KeyValue httpUrl(ClientRequestObservationContext context) {
146-
if (context.getCarrier() != null) {
147-
return KeyValue.of(ClientHttpObservationDocumentation.HighCardinalityKeyNames.HTTP_URL, context.getCarrier().url().toASCIIString());
146+
if (context.getBuiltRequest() != null) {
147+
return KeyValue.of(ClientHttpObservationDocumentation.HighCardinalityKeyNames.HTTP_URL, context.getBuiltRequest().url().toASCIIString());
148148
}
149149
return HTTP_URL_NONE;
150150
}
151151

152152
protected KeyValue clientName(ClientRequestObservationContext context) {
153-
if (context.getCarrier() != null && context.getCarrier().url().getHost() != null) {
154-
return KeyValue.of(ClientHttpObservationDocumentation.HighCardinalityKeyNames.CLIENT_NAME, context.getCarrier().url().getHost());
153+
if (context.getBuiltRequest() != null && context.getBuiltRequest().url().getHost() != null) {
154+
return KeyValue.of(ClientHttpObservationDocumentation.HighCardinalityKeyNames.CLIENT_NAME, context.getBuiltRequest().url().getHost());
155155
}
156156
return CLIENT_NAME_NONE;
157157
}

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ class DefaultWebClient implements WebClient {
7575

7676
private static final DefaultClientRequestObservationConvention DEFAULT_OBSERVATION_CONVENTION = new DefaultClientRequestObservationConvention();
7777

78+
/**
79+
* Aligned with ObservationThreadLocalAccessor#KEY from micrometer-core.
80+
*/
81+
private static final String MICROMETER_OBSERVATION = "micrometer.observation";
82+
7883
private final ExchangeFunction exchangeFunction;
7984

8085
private final UriBuilderFactory uriBuilderFactory;
@@ -450,14 +455,19 @@ public <V> Flux<V> exchangeToFlux(Function<ClientResponse, ? extends Flux<V>> re
450455
@SuppressWarnings("deprecation")
451456
public Mono<ClientResponse> exchange() {
452457
ClientRequestObservationContext observationContext = new ClientRequestObservationContext();
453-
ClientRequest request = (this.inserter != null ?
454-
initRequestBuilder().body(this.inserter).build() :
455-
initRequestBuilder().build());
456-
return Mono.defer(() -> {
458+
ClientRequest.Builder requestBuilder = this.inserter != null ?
459+
initRequestBuilder().body(this.inserter) :
460+
initRequestBuilder();
461+
return Mono.deferContextual(contextView -> {
457462
Observation observation = ClientHttpObservationDocumentation.HTTP_REQUEST.observation(observationConvention,
458-
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry).start();
459-
observationContext.setCarrier(request);
463+
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
464+
observationContext.setCarrier(requestBuilder);
465+
observation
466+
.parentObservation(contextView.getOrDefault(MICROMETER_OBSERVATION, null))
467+
.start();
468+
ClientRequest request = requestBuilder.build();
460469
observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
470+
observationContext.setBuiltRequest(request);
461471
Mono<ClientResponse> responseMono = exchangeFunction.exchange(request)
462472
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
463473
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR);

spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientRequestObservationConventionTests.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ void shouldHaveName() {
4444
@Test
4545
void shouldHaveContextualName() {
4646
ClientRequestObservationContext context = new ClientRequestObservationContext();
47-
context.setCarrier(ClientRequest.create(HttpMethod.GET, URI.create("/test")).build());
47+
context.setCarrier(ClientRequest.create(HttpMethod.GET, URI.create("/test")));
48+
context.setBuiltRequest(context.getCarrier().build());
4849
assertThat(this.observationConvention.getContextualName(context)).isEqualTo("http get");
4950
}
5051

@@ -77,10 +78,11 @@ void shouldAddKeyValuesForExchangeWithException() {
7778

7879
@Test
7980
void shouldAddKeyValuesForRequestWithUriTemplate() {
80-
ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/resource/42"))
81-
.attribute(WebClient.class.getName() + ".uriTemplate", "/resource/{id}").build();
81+
ClientRequest.Builder request = ClientRequest.create(HttpMethod.GET, URI.create("/resource/42"))
82+
.attribute(WebClient.class.getName() + ".uriTemplate", "/resource/{id}");
8283
ClientRequestObservationContext context = createContext(request);
8384
context.setUriTemplate("/resource/{id}");
85+
context.setBuiltRequest(context.getCarrier().build());
8486
assertThat(this.observationConvention.getLowCardinalityKeyValues(context))
8587
.contains(KeyValue.of("exception", "none"), KeyValue.of("method", "GET"), KeyValue.of("uri", "/resource/{id}"),
8688
KeyValue.of("status", "200"), KeyValue.of("outcome", "SUCCESS"));
@@ -90,19 +92,21 @@ void shouldAddKeyValuesForRequestWithUriTemplate() {
9092

9193
@Test
9294
void shouldAddKeyValuesForRequestWithoutUriTemplate() {
93-
ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("/resource/42")).build());
95+
ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("/resource/42")));
96+
context.setBuiltRequest(context.getCarrier().build());
9497
assertThat(this.observationConvention.getLowCardinalityKeyValues(context))
9598
.contains(KeyValue.of("method", "GET"), KeyValue.of("uri", "none"));
9699
assertThat(this.observationConvention.getHighCardinalityKeyValues(context)).hasSize(2).contains(KeyValue.of("http.url", "/resource/42"));
97100
}
98101

99102
@Test
100103
void shouldAddClientNameKeyValueForRequestWithHost() {
101-
ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("https://localhost:8080/resource/42")).build());
104+
ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("https://localhost:8080/resource/42")));
105+
context.setBuiltRequest(context.getCarrier().build());
102106
assertThat(this.observationConvention.getHighCardinalityKeyValues(context)).contains(KeyValue.of("client.name", "localhost"));
103107
}
104108

105-
private ClientRequestObservationContext createContext(ClientRequest request) {
109+
private ClientRequestObservationContext createContext(ClientRequest.Builder request) {
106110
ClientRequestObservationContext context = new ClientRequestObservationContext();
107111
context.setCarrier(request);
108112
context.setResponse(ClientResponse.create(HttpStatus.OK).build());

spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717
package org.springframework.web.reactive.function.client;
1818

1919
import java.time.Duration;
20+
import java.util.Collections;
2021

22+
import io.micrometer.observation.Observation;
23+
import io.micrometer.observation.ObservationHandler;
24+
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
2125
import io.micrometer.observation.tck.TestObservationRegistry;
2226
import io.micrometer.observation.tck.TestObservationRegistryAssert;
2327
import org.junit.jupiter.api.BeforeEach;
@@ -28,6 +32,7 @@
2832

2933
import org.springframework.http.HttpStatus;
3034

35+
import static org.assertj.core.api.Assertions.assertThat;
3136
import static org.mockito.ArgumentMatchers.any;
3237
import static org.mockito.BDDMockito.given;
3338
import static org.mockito.BDDMockito.when;
@@ -57,17 +62,35 @@ void setup() {
5762
when(mockResponse.bodyToMono(Void.class)).thenReturn(Mono.empty());
5863
given(this.exchangeFunction.exchange(this.request.capture())).willReturn(Mono.just(mockResponse));
5964
this.builder = WebClient.builder().baseUrl("/base").exchangeFunction(this.exchangeFunction).observationRegistry(this.observationRegistry);
65+
this.observationRegistry.observationConfig().observationHandler(new HeaderInjectingHandler());
6066
}
6167

62-
6368
@Test
6469
void recordsObservationForSuccessfulExchange() {
6570
this.builder.build().get().uri("/resource/{id}", 42)
6671
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
67-
verifyAndGetRequest();
72+
73+
ClientRequest clientRequest = verifyAndGetRequest();
6874

6975
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS")
7076
.hasLowCardinalityKeyValue("uri", "/resource/{id}");
77+
assertThat(clientRequest.headers()).containsEntry("foo", Collections.singletonList("bar"));
78+
}
79+
80+
@Test
81+
void recordsObservationForSuccessfulExchangeWithParentObservationInReactorContext() {
82+
Observation parent = Observation.start("parent", observationRegistry);
83+
try {
84+
this.builder.build().get().uri("/resource/{id}", 42)
85+
.retrieve().bodyToMono(Void.class).contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, parent)).block(Duration.ofSeconds(10));
86+
verifyAndGetRequest();
87+
88+
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS")
89+
.hasParentObservationEqualTo(parent);
90+
}
91+
finally {
92+
parent.stop();
93+
}
7194
}
7295

7396
@Test
@@ -102,4 +125,17 @@ private ClientRequest verifyAndGetRequest() {
102125
return request.getValue();
103126
}
104127

128+
static class HeaderInjectingHandler implements ObservationHandler<ClientRequestObservationContext> {
129+
130+
@Override
131+
public void onStart(ClientRequestObservationContext context) {
132+
context.getSetter().set(context.getCarrier(), "foo", "bar");
133+
}
134+
135+
@Override
136+
public boolean supportsContext(Observation.Context context) {
137+
return context instanceof ClientRequestObservationContext;
138+
}
139+
}
140+
105141
}

0 commit comments

Comments
 (0)