Skip to content

Commit da95542

Browse files
committed
Prevent duplicate HTTP server observations
Prior to this commit, HTTP server observations for Spring WebFlux could be recorded twice for a single request in some cases. The "COMPLETE" and "CANCEL" signals would race in the reactive pipeline and would trigger both the `doOnComplete()` and ` `doOnCancel()` operators, each calling `observation.stop()` on the current observation. This would in fact publish two different observations for the same request. This commit ensures that the instrumentation uses the `Mono#tap` operator to guard against this case and only call `Observation#stop` once for each request. Fixes gh-31417
1 parent 6669ab1 commit da95542

File tree

1 file changed

+63
-32
lines changed

1 file changed

+63
-32
lines changed

spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java

Lines changed: 63 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,12 +18,14 @@
1818

1919
import java.util.Optional;
2020
import java.util.Set;
21+
import java.util.concurrent.atomic.AtomicBoolean;
2122

2223
import io.micrometer.observation.Observation;
2324
import io.micrometer.observation.ObservationRegistry;
2425
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
25-
import org.reactivestreams.Publisher;
26+
import reactor.core.observability.DefaultSignalListener;
2627
import reactor.core.publisher.Mono;
28+
import reactor.util.context.Context;
2729

2830
import org.springframework.http.server.reactive.ServerHttpResponse;
2931
import org.springframework.http.server.reactive.observation.DefaultServerRequestObservationConvention;
@@ -99,40 +101,69 @@ public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
99101
ServerRequestObservationContext observationContext = new ServerRequestObservationContext(exchange.getRequest(),
100102
exchange.getResponse(), exchange.getAttributes());
101103
exchange.getAttributes().put(CURRENT_OBSERVATION_CONTEXT_ATTRIBUTE, observationContext);
102-
return chain.filter(exchange).transformDeferred(call -> filter(exchange, observationContext, call));
104+
return chain.filter(exchange).tap(() -> new ObservationSignalListener(observationContext));
103105
}
104106

105-
private Publisher<Void> filter(ServerWebExchange exchange, ServerRequestObservationContext observationContext, Mono<Void> call) {
106-
Observation observation = ServerHttpObservationDocumentation.HTTP_REACTIVE_SERVER_REQUESTS.observation(this.observationConvention,
107-
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, this.observationRegistry);
108-
observation.start();
109-
return call.doOnEach(signal -> {
110-
Throwable throwable = signal.getThrowable();
111-
if (throwable != null) {
112-
if (DISCONNECTED_CLIENT_EXCEPTIONS.contains(throwable.getClass().getSimpleName())) {
113-
observationContext.setConnectionAborted(true);
114-
}
115-
observationContext.setError(throwable);
116-
}
117-
onTerminalSignal(observation, exchange);
118-
})
119-
.doOnCancel(() -> {
120-
observationContext.setConnectionAborted(true);
121-
observation.stop();
122-
})
123-
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation));
124-
}
107+
private final class ObservationSignalListener extends DefaultSignalListener<Void> {
108+
109+
private static final Set<String> DISCONNECTED_CLIENT_EXCEPTIONS = Set.of("AbortedException",
110+
"ClientAbortException", "EOFException", "EofException");
111+
112+
private final ServerRequestObservationContext observationContext;
113+
114+
private final Observation observation;
115+
116+
private AtomicBoolean observationRecorded = new AtomicBoolean();
117+
118+
public ObservationSignalListener(ServerRequestObservationContext observationContext) {
119+
this.observationContext = observationContext;
120+
this.observation = ServerHttpObservationDocumentation.HTTP_REACTIVE_SERVER_REQUESTS.observation(observationConvention,
121+
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
122+
}
125123

126-
private void onTerminalSignal(Observation observation, ServerWebExchange exchange) {
127-
ServerHttpResponse response = exchange.getResponse();
128-
if (response.isCommitted()) {
129-
observation.stop();
124+
@Override
125+
public void doOnSubscription() throws Throwable {
126+
this.observation.start();
130127
}
131-
else {
132-
response.beforeCommit(() -> {
133-
observation.stop();
134-
return Mono.empty();
135-
});
128+
129+
@Override
130+
public Context addToContext(Context originalContext) {
131+
return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation);
132+
}
133+
134+
@Override
135+
public void doOnCancel() throws Throwable {
136+
if (this.observationRecorded.compareAndSet(false, true)) {
137+
this.observationContext.setConnectionAborted(true);
138+
this.observation.stop();
139+
}
140+
}
141+
142+
@Override
143+
public void doOnComplete() throws Throwable {
144+
if (this.observationRecorded.compareAndSet(false, true)) {
145+
ServerHttpResponse response = this.observationContext.getResponse();
146+
if (response.isCommitted()) {
147+
this.observation.stop();
148+
}
149+
else {
150+
response.beforeCommit(() -> {
151+
this.observation.stop();
152+
return Mono.empty();
153+
});
154+
}
155+
}
156+
}
157+
158+
@Override
159+
public void doOnError(Throwable error) throws Throwable {
160+
if (this.observationRecorded.compareAndSet(false, true)) {
161+
if (DISCONNECTED_CLIENT_EXCEPTIONS.contains(error.getClass().getSimpleName())) {
162+
this.observationContext.setConnectionAborted(true);
163+
}
164+
this.observationContext.setError(error);
165+
this.observation.stop();
166+
}
136167
}
137168
}
138169

0 commit comments

Comments
 (0)