Skip to content

Commit d1e4580

Browse files
authored
Add body capturing for reactive spring webclient (#3754)
1 parent 0d27398 commit d1e4580

File tree

17 files changed

+508
-40
lines changed

17 files changed

+508
-40
lines changed

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/context/BodyCaptureImpl.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@ public void recycle(ByteBuffer object) {
4646
});
4747

4848
private enum CaptureState {
49-
NOT_ELIGIBLE,
50-
ELIGIBLE,
51-
STARTED
49+
NOT_ELIGIBLE, // initial state
50+
ELIGIBLE, // eligible but before preconditions evaluation
51+
PRECONDITIONS_PASSED, // post preconditions (passed), can start capture
52+
PRECONDITIONS_FAILED, // post preconditions (failed), no body will be captured
53+
STARTED // the body capturing has been started, a buffer was acquired
5254
}
5355

5456
private volatile CaptureState state;
@@ -95,17 +97,42 @@ public boolean isEligibleForCapturing() {
9597
}
9698

9799
@Override
98-
public boolean startCapture(@Nullable String requestCharset, int numBytesToCapture) {
100+
public boolean havePreconditionsBeenChecked() {
101+
return state == CaptureState.PRECONDITIONS_PASSED
102+
|| state == CaptureState.PRECONDITIONS_FAILED
103+
|| state == CaptureState.STARTED;
104+
}
105+
106+
@Override
107+
public void markPreconditionsFailed() {
108+
synchronized (this) {
109+
if (state == CaptureState.ELIGIBLE) {
110+
state = CaptureState.PRECONDITIONS_FAILED;
111+
}
112+
}
113+
}
114+
115+
@Override
116+
public void markPreconditionsPassed(@Nullable String requestCharset, int numBytesToCapture) {
99117
if (numBytesToCapture > WebConfiguration.MAX_BODY_CAPTURE_BYTES) {
100118
throw new IllegalArgumentException("Capturing " + numBytesToCapture + " bytes is not supported, maximum is " + WebConfiguration.MAX_BODY_CAPTURE_BYTES + " bytes");
101119
}
102-
if (state == CaptureState.ELIGIBLE) {
120+
synchronized (this) {
121+
if (state == CaptureState.ELIGIBLE) {
122+
if (requestCharset != null) {
123+
this.charset.append(requestCharset);
124+
}
125+
this.numBytesToCapture = numBytesToCapture;
126+
state = CaptureState.PRECONDITIONS_PASSED;
127+
}
128+
}
129+
}
130+
131+
@Override
132+
public boolean startCapture() {
133+
if (state == CaptureState.PRECONDITIONS_PASSED) {
103134
synchronized (this) {
104-
if (state == CaptureState.ELIGIBLE) {
105-
if (requestCharset != null) {
106-
this.charset.append(requestCharset);
107-
}
108-
this.numBytesToCapture = numBytesToCapture;
135+
if (state == CaptureState.PRECONDITIONS_PASSED) {
109136
state = CaptureState.STARTED;
110137
return true;
111138
}

apm-agent-core/src/test/java/co/elastic/apm/agent/impl/context/BodyCaptureImplTest.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public class BodyCaptureImplTest {
3232
public void testAppendTruncation() {
3333
BodyCaptureImpl capture = new BodyCaptureImpl();
3434
capture.markEligibleForCapturing();
35-
capture.startCapture("foobar", 10);
35+
capture.markPreconditionsPassed("foobar", 10);
36+
capture.startCapture();
3637
assertThat(capture.isFull()).isFalse();
3738

3839
capture.append("123Hello World!".getBytes(StandardCharsets.UTF_8), 3, 5);
@@ -55,21 +56,28 @@ public void testLifecycle() {
5556
BodyCaptureImpl capture = new BodyCaptureImpl();
5657

5758
assertThat(capture.isEligibleForCapturing()).isFalse();
58-
assertThat(capture.startCapture("foobar", 42))
59+
assertThat(capture.havePreconditionsBeenChecked()).isFalse();
60+
assertThat(capture.startCapture())
5961
.isFalse();
6062
assertThatThrownBy(() -> capture.append((byte) 42)).isInstanceOf(IllegalStateException.class);
6163

6264
capture.markEligibleForCapturing();
6365
assertThat(capture.isEligibleForCapturing()).isTrue();
66+
assertThat(capture.havePreconditionsBeenChecked()).isFalse();
6467
assertThatThrownBy(() -> capture.append((byte) 42)).isInstanceOf(IllegalStateException.class);
6568

66-
assertThat(capture.startCapture("foobar", 42))
67-
.isTrue();
69+
capture.markPreconditionsPassed("foobar", 42);
70+
assertThat(capture.isEligibleForCapturing()).isTrue();
71+
assertThat(capture.havePreconditionsBeenChecked()).isTrue();
72+
73+
74+
assertThat(capture.startCapture()).isTrue();
6875
capture.append((byte) 42); //ensure no exception thrown
6976

7077
// startCapture should return true only once
71-
assertThat(capture.startCapture("foobar", 42))
72-
.isFalse();
78+
assertThat(capture.havePreconditionsBeenChecked()).isTrue();
79+
assertThat(capture.startCapture()).isFalse();
80+
assertThat(capture.havePreconditionsBeenChecked()).isTrue();
7381

7482
capture.resetState();
7583
assertThat(capture.getCharset()).isNull();

apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/DslJsonSerializerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ private SpanImpl createSpanWithRequestBody(@Nullable byte[] bodyBytes, @Nullable
475475
SpanImpl span = new SpanImpl(tracer);
476476
BodyCaptureImpl bodyCapture = span.getContext().getHttp().getRequestBody();
477477
bodyCapture.markEligibleForCapturing();
478-
bodyCapture.startCapture(charset, WebConfiguration.MAX_BODY_CAPTURE_BYTES);
478+
bodyCapture.markPreconditionsPassed(charset, WebConfiguration.MAX_BODY_CAPTURE_BYTES);
479+
bodyCapture.startCapture();
479480

480481
if (bodyBytes != null) {
481482
bodyCapture.append(bodyBytes, 0, bodyBytes.length);

apm-agent-plugins/apm-apache-httpclient/apm-apache-httpclient-common/src/main/java/co/elastic/apm/agent/httpclient/common/RequestBodyCaptureRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public static <REQUEST, HTTPENTITY> void potentiallyCaptureRequestBody(
5656
ApacheHttpClientEntityAccessor<REQUEST, HTTPENTITY> adapter,
5757
TextHeaderGetter<REQUEST> headerGetter
5858
) {
59-
if (HttpClientHelper.startRequestBodyCapture(abstractSpan, request, headerGetter)) {
59+
if (HttpClientHelper.checkAndStartRequestBodyCapture(abstractSpan, request, headerGetter)) {
6060
Span<?> span = (Span<?>) abstractSpan;
6161
byte[] simpleBytes = adapter.getSimpleBodyBytes(request);
6262
if (simpleBytes != null) {

apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/HttpClientHelper.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import co.elastic.apm.agent.tracer.TraceState;
2828
import co.elastic.apm.agent.tracer.configuration.WebConfiguration;
2929
import co.elastic.apm.agent.tracer.dispatch.TextHeaderGetter;
30+
import co.elastic.apm.agent.tracer.metadata.BodyCapture;
3031

3132
import javax.annotation.Nullable;
3233
import java.net.URI;
@@ -75,29 +76,43 @@ public static Span<?> startHttpClientSpan(TraceState<?> activeContext, String me
7576
return span;
7677
}
7778

78-
public static <R> boolean startRequestBodyCapture(@Nullable AbstractSpan<?> abstractSpan, R request, TextHeaderGetter<R> headerGetter) {
79+
public static <R> void checkBodyCapturePreconditions(@Nullable AbstractSpan<?> abstractSpan, R request, TextHeaderGetter<R> headerGetter) {
7980
if (!(abstractSpan instanceof Span<?>)) {
80-
return false;
81+
return;
8182
}
8283
Span<?> span = (Span<?>) abstractSpan;
83-
if (!span.getContext().getHttp().getRequestBody().isEligibleForCapturing()) {
84-
return false;
84+
BodyCapture bodyCapture = span.getContext().getHttp().getRequestBody();
85+
if (!bodyCapture.isEligibleForCapturing()) {
86+
return;
87+
}
88+
if (bodyCapture.havePreconditionsBeenChecked()) {
89+
return;
8590
}
8691
WebConfiguration webConfig = GlobalTracer.get().getConfig(WebConfiguration.class);
8792
int byteCount = webConfig.getCaptureClientRequestBytes();
8893
if (byteCount == 0) {
89-
return false;
94+
bodyCapture.markPreconditionsFailed();
95+
return;
9096
}
9197
List<WildcardMatcher> contentTypes = webConfig.getCaptureContentTypes();
9298
String contentTypeHeader = headerGetter.getFirstHeader("Content-Type", request);
9399
if (contentTypeHeader == null) {
94100
contentTypeHeader = "";
95101
}
96102
if (WildcardMatcher.anyMatch(contentTypes, contentTypeHeader) == null) {
103+
bodyCapture.markPreconditionsFailed();
104+
return;
105+
}
106+
bodyCapture.markPreconditionsPassed(extractCharsetFromContentType(contentTypeHeader), byteCount);
107+
}
108+
109+
public static <R> boolean checkAndStartRequestBodyCapture(@Nullable AbstractSpan<?> abstractSpan, R request, TextHeaderGetter<R> headerGetter) {
110+
if (!(abstractSpan instanceof Span<?>)) {
97111
return false;
98112
}
99-
String charset = extractCharsetFromContentType(contentTypeHeader);
100-
return span.getContext().getHttp().getRequestBody().startCapture(charset, byteCount);
113+
checkBodyCapturePreconditions(abstractSpan, request, headerGetter);
114+
Span<?> span = (Span<?>) abstractSpan;
115+
return span.getContext().getHttp().getRequestBody().startCapture();
101116
}
102117

103118
//Visible for testing

apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelper.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import co.elastic.apm.agent.tracer.SpanEndListener;
55
import co.elastic.apm.agent.tracer.metadata.BodyCapture;
66

7-
class RequestBodyRecordingHelper implements SpanEndListener<Span<?>> {
7+
public class RequestBodyRecordingHelper implements SpanEndListener<Span<?>> {
88

99
/**
1010
* We do not need to participate in span reference counting here.
@@ -21,17 +21,25 @@ public RequestBodyRecordingHelper(Span<?> clientSpan) {
2121
}
2222
}
2323

24-
void appendToBody(byte b) {
24+
25+
/**
26+
* @param b the byte to append
27+
* @return false, if the body buffer is full and future calls would be no-op. True otherwise.
28+
*/
29+
public boolean appendToBody(byte b) {
2530
if (clientSpan != null) {
2631
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
2732
requestBody.append(b);
2833
if (requestBody.isFull()) {
2934
releaseSpan();
35+
} else {
36+
return true;
3037
}
3138
}
39+
return false;
3240
}
3341

34-
void appendToBody(byte[] b, int off, int len) {
42+
public void appendToBody(byte[] b, int off, int len) {
3543
if (clientSpan != null) {
3644
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
3745
requestBody.append(b, off, len);

apm-agent-plugins/apm-httpclient-core/src/test/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelperTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public void ensureNoModificationAfterSpanEnd() {
4040
SpanImpl span = rootTx.createSpan();
4141
BodyCaptureImpl spanBody = span.getContext().getHttp().getRequestBody();
4242
spanBody.markEligibleForCapturing();
43-
spanBody.startCapture(null, 100);
43+
spanBody.markPreconditionsPassed(null, 100);
44+
spanBody.startCapture();
4445

4546
RequestBodyRecordingHelper helper = new RequestBodyRecordingHelper(span);
4647
helper.appendToBody(new byte[]{1, 2, 3, 4}, 1, 2);
@@ -66,7 +67,8 @@ public void ensureLimitRespected() {
6667
SpanImpl span = rootTx.createSpan();
6768
BodyCaptureImpl spanBody = span.getContext().getHttp().getRequestBody();
6869
spanBody.markEligibleForCapturing();
69-
spanBody.startCapture(null, 3);
70+
spanBody.markPreconditionsPassed(null, 3);
71+
spanBody.startCapture();
7072

7173
RequestBodyRecordingHelper helper = new RequestBodyRecordingHelper(span);
7274
helper.appendToBody((byte) 1);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package co.elastic.apm.agent.springwebclient;
2+
3+
import co.elastic.apm.agent.httpclient.RequestBodyRecordingHelper;
4+
import co.elastic.apm.agent.sdk.weakconcurrent.WeakConcurrent;
5+
import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap;
6+
import co.elastic.apm.agent.tracer.AbstractSpan;
7+
import co.elastic.apm.agent.tracer.Span;
8+
import org.springframework.http.client.reactive.ClientHttpRequest;
9+
10+
import javax.annotation.Nullable;
11+
12+
public class BodyCaptureRegistry {
13+
14+
private static final WeakMap<ClientHttpRequest, RequestBodyRecordingHelper> PENDING_RECORDINGS = WeakConcurrent.buildMap();
15+
16+
public static void maybeCaptureBodyFor(AbstractSpan<?> abstractSpan, ClientHttpRequest request) {
17+
if (!(abstractSpan instanceof Span<?>)) {
18+
return;
19+
}
20+
Span<?> span = (Span<?>) abstractSpan;
21+
if (span.getContext().getHttp().getRequestBody().startCapture()) {
22+
PENDING_RECORDINGS.put(request, new RequestBodyRecordingHelper(span));
23+
}
24+
}
25+
26+
@Nullable
27+
public static RequestBodyRecordingHelper activateRecording(ClientHttpRequest request) {
28+
return PENDING_RECORDINGS.remove(request);
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package co.elastic.apm.agent.springwebclient;
20+
21+
import co.elastic.apm.agent.sdk.ElasticApmInstrumentation;
22+
import co.elastic.apm.agent.tracer.GlobalTracer;
23+
import co.elastic.apm.agent.tracer.TraceState;
24+
import co.elastic.apm.agent.tracer.Tracer;
25+
import net.bytebuddy.asm.Advice;
26+
import net.bytebuddy.description.NamedElement;
27+
import net.bytebuddy.description.method.MethodDescription;
28+
import net.bytebuddy.description.type.TypeDescription;
29+
import net.bytebuddy.matcher.ElementMatcher;
30+
import org.springframework.http.HttpMethod;
31+
import org.springframework.http.client.reactive.ClientHttpRequest;
32+
import reactor.core.publisher.Mono;
33+
34+
import javax.annotation.Nullable;
35+
import java.net.URI;
36+
import java.util.Arrays;
37+
import java.util.Collection;
38+
import java.util.function.Function;
39+
40+
import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
41+
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
42+
import static net.bytebuddy.matcher.ElementMatchers.nameContains;
43+
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
44+
import static net.bytebuddy.matcher.ElementMatchers.named;
45+
import static net.bytebuddy.matcher.ElementMatchers.not;
46+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
47+
48+
/**
49+
* Instruments all {@link org.springframework.http.client.reactive.ClientHttpConnector} types, to preserve the span context
50+
* within the callback passed to {@link org.springframework.http.client.reactive.ClientHttpConnector#connect(HttpMethod, URI, Function)}.
51+
* If the span is ready for it, this will cause the request body to be captured.
52+
*/
53+
public class ClientHttpConnectorInstrumentation extends ElasticApmInstrumentation {
54+
55+
private static final Tracer tracer = GlobalTracer.get();
56+
57+
@Override
58+
public ElementMatcher<? super NamedElement> getTypeMatcherPreFilter() {
59+
return nameStartsWith("org.springframework.http.")
60+
.and(nameContains("Connector"));
61+
}
62+
63+
@Override
64+
public ElementMatcher<? super TypeDescription> getTypeMatcher() {
65+
return hasSuperType(named("org.springframework.http.client.reactive.ClientHttpConnector"))
66+
.and(not(isInterface()));
67+
}
68+
69+
@Override
70+
public ElementMatcher<? super MethodDescription> getMethodMatcher() {
71+
return named("connect")
72+
.and(takesArgument(2, named("java.util.function.Function")));
73+
}
74+
75+
@Override
76+
public Collection<String> getInstrumentationGroupNames() {
77+
return Arrays.asList("http-client", "spring-webclient");
78+
}
79+
80+
public static class AdviceClass {
81+
82+
@Nullable
83+
@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
84+
@Advice.AssignReturned.ToArguments(@Advice.AssignReturned.ToArguments.ToArgument(2))
85+
public static Function<? super ClientHttpRequest, Mono<Void>> onBefore(@Advice.Argument(2) Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
86+
TraceState<?> context = tracer.currentContext();
87+
if (context.isEmpty()) {
88+
return requestCallback;
89+
}
90+
return new Function<ClientHttpRequest, Mono<Void>>() {
91+
@Override
92+
public Mono<Void> apply(ClientHttpRequest clientHttpRequest) {
93+
// Note that even though ClientHttpRequest exposes headers via the interface, those are empty
94+
// therefore we check the span capturing pre-conditions in the WebClientExchangeFunctionInstrumentation instead
95+
BodyCaptureRegistry.maybeCaptureBodyFor(context.getSpan(), clientHttpRequest);
96+
return requestCallback.apply(clientHttpRequest);
97+
}
98+
};
99+
}
100+
101+
}
102+
}

0 commit comments

Comments
 (0)