Skip to content

Commit 1dfc154

Browse files
authored
Fix context propagation for ratpack request body stream (#12330)
1 parent 0b1cc2c commit 1dfc154

File tree

7 files changed

+191
-2
lines changed

7 files changed

+191
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.ratpack;
7+
8+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
9+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
10+
import static net.bytebuddy.matcher.ElementMatchers.named;
11+
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
12+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
13+
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
15+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
16+
import net.bytebuddy.asm.Advice;
17+
import net.bytebuddy.description.type.TypeDescription;
18+
import net.bytebuddy.matcher.ElementMatcher;
19+
import ratpack.func.Block;
20+
21+
public class ContinuationStreamInstrumentation implements TypeInstrumentation {
22+
23+
@Override
24+
public ElementMatcher<ClassLoader> classLoaderOptimization() {
25+
return hasClassesNamed("ratpack.exec.internal.ContinuationStream");
26+
}
27+
28+
@Override
29+
public ElementMatcher<TypeDescription> typeMatcher() {
30+
return implementsInterface(named("ratpack.exec.internal.ContinuationStream"));
31+
}
32+
33+
@Override
34+
public void transform(TypeTransformer transformer) {
35+
transformer.applyAdviceToMethod(
36+
namedOneOf("complete", "event").and(takesArgument(0, named("ratpack.func.Block"))),
37+
ContinuationStreamInstrumentation.class.getName() + "$WrapBlockAdvice");
38+
}
39+
40+
@SuppressWarnings("unused")
41+
public static class WrapBlockAdvice {
42+
43+
@Advice.OnMethodEnter(suppress = Throwable.class)
44+
public static void wrap(@Advice.Argument(value = 0, readOnly = false) Block block) {
45+
block = BlockWrapper.wrapIfNeeded(block);
46+
}
47+
}
48+
}

instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackInstrumentationModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public String getModuleGroup() {
3030
public List<TypeInstrumentation> typeInstrumentations() {
3131
return asList(
3232
new ContinuationInstrumentation(),
33+
new ContinuationStreamInstrumentation(),
3334
new DefaultExecutionInstrumentation(),
3435
new DefaultExecStarterInstrumentation(),
3536
new ServerErrorHandlerInstrumentation(),

instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackSingletons.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
1313
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute;
1414
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource;
15+
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
1516
import ratpack.handling.Context;
1617

1718
public final class RatpackSingletons {
1819

1920
private static final Instrumenter<String, Void> INSTRUMENTER =
2021
Instrumenter.<String, Void>builder(
2122
GlobalOpenTelemetry.get(), "io.opentelemetry.ratpack-1.4", s -> s)
23+
.setEnabled(ExperimentalConfig.get().controllerTelemetryEnabled())
2224
.buildInstrumenter();
2325

2426
public static Instrumenter<String, Void> instrumenter() {
@@ -28,7 +30,9 @@ public static Instrumenter<String, Void> instrumenter() {
2830
public static void updateSpanNames(io.opentelemetry.context.Context otelContext, Context ctx) {
2931
String matchedRoute = updateServerSpanName(otelContext, ctx);
3032
// update ratpack span name
31-
Span.fromContext(otelContext).updateName(matchedRoute);
33+
if (ExperimentalConfig.get().controllerTelemetryEnabled()) {
34+
Span.fromContext(otelContext).updateName(matchedRoute);
35+
}
3236
}
3337

3438
public static String updateServerSpanName(

instrumentation/ratpack/ratpack-1.4/javaagent/src/test/groovy/server/RatpackForkedHttpServerTest.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,10 @@ class RatpackForkedHttpServerTest extends AbstractRatpackForkedHttpServerTest im
2424
boolean testHttpPipelining() {
2525
false
2626
}
27+
28+
@Override
29+
boolean testPostStream() {
30+
// controller span is parent of onNext span which is not expected
31+
Boolean.getBoolean("testLatestDeps")
32+
}
2733
}

instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackAsyncHttpServerTest.groovy

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,15 @@ abstract class AbstractRatpackAsyncHttpServerTest extends AbstractRatpackHttpSer
122122
}
123123
}
124124
}
125+
it.prefix(POST_STREAM.rawPath()) {
126+
it.all { context ->
127+
Promise.sync {
128+
POST_STREAM
129+
} then {
130+
handlePostStream(context)
131+
}
132+
}
133+
}
125134
}
126135
configure(it)
127136
}

instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackForkedHttpServerTest.groovy

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,15 @@ abstract class AbstractRatpackForkedHttpServerTest extends AbstractRatpackHttpSe
130130
}
131131
}
132132
}
133+
it.prefix(POST_STREAM.rawPath()) {
134+
it.all { context ->
135+
Promise.sync {
136+
POST_STREAM
137+
}.fork().then {
138+
handlePostStream(context)
139+
}
140+
}
141+
}
133142
it.prefix("fork_and_yieldAll") {
134143
it.all { context ->
135144
def promise = Promise.async { upstream ->

instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackHttpServerTest.groovy

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,23 @@
55

66
package io.opentelemetry.instrumentation.ratpack.server
77

8-
8+
import io.netty.buffer.ByteBuf
9+
import io.opentelemetry.api.trace.Span
910
import io.opentelemetry.api.trace.StatusCode
1011
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
1112
import io.opentelemetry.instrumentation.test.base.HttpServerTest
1213
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint
1314
import io.opentelemetry.sdk.trace.data.SpanData
15+
import org.junit.Assume
16+
import org.reactivestreams.Subscriber
17+
import org.reactivestreams.Subscription
1418
import ratpack.error.ServerErrorHandler
1519
import ratpack.handling.Context
1620
import ratpack.server.RatpackServer
1721
import ratpack.server.RatpackServerSpec
1822

1923
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
24+
import static io.opentelemetry.api.trace.SpanKind.SERVER
2025
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS
2126
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR
2227
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION
@@ -28,6 +33,14 @@ import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint
2833

2934
abstract class AbstractRatpackHttpServerTest extends HttpServerTest<RatpackServer> {
3035

36+
protected static final ServerEndpoint POST_STREAM =
37+
new ServerEndpoint(
38+
"POST_STREAM",
39+
"post-stream",
40+
SUCCESS.getStatus(),
41+
SUCCESS.getBody(),
42+
false)
43+
3144
abstract void configure(RatpackServerSpec serverSpec)
3245

3346
@Override
@@ -100,6 +113,11 @@ abstract class AbstractRatpackHttpServerTest extends HttpServerTest<RatpackServe
100113
}
101114
}
102115
}
116+
it.prefix(POST_STREAM.rawPath()) {
117+
it.all { context ->
118+
handlePostStream(context)
119+
}
120+
}
103121
}
104122
configure(it)
105123
}
@@ -108,6 +126,49 @@ abstract class AbstractRatpackHttpServerTest extends HttpServerTest<RatpackServe
108126
return ratpack
109127
}
110128

129+
def handlePostStream(context) {
130+
controller(POST_STREAM) {
131+
context.request.bodyStream.subscribe(new Subscriber<ByteBuf>() {
132+
private Subscription subscription
133+
private int count
134+
private String traceId
135+
136+
@Override
137+
void onSubscribe(Subscription subscription) {
138+
this.subscription = subscription
139+
traceId = Span.current().getSpanContext().getTraceId()
140+
subscription.request(1)
141+
}
142+
143+
@Override
144+
void onNext(ByteBuf byteBuf) {
145+
assert traceId == Span.current().getSpanContext().getTraceId()
146+
if (count < 2) {
147+
runWithSpan("onNext") {
148+
count++
149+
}
150+
}
151+
byteBuf.release()
152+
subscription.request(1)
153+
}
154+
155+
@Override
156+
void onError(Throwable throwable) {
157+
// prints the assertion error from onNext
158+
throwable.printStackTrace()
159+
context.response.status(500).send(throwable.message)
160+
}
161+
162+
@Override
163+
void onComplete() {
164+
runWithSpan("onComplete") {
165+
context.response.status(200).send(POST_STREAM.body)
166+
}
167+
}
168+
})
169+
}
170+
}
171+
111172
// TODO(anuraaga): The default Ratpack error handler also returns a 500 which is all we test, so
112173
// we don't actually have test coverage ensuring our instrumentation correctly delegates to this
113174
// user registered handler.
@@ -156,4 +217,55 @@ abstract class AbstractRatpackHttpServerTest extends HttpServerTest<RatpackServe
156217
String expectedHttpRoute(ServerEndpoint endpoint, String method) {
157218
return endpoint.status == 404 ? "/" : endpoint == PATH_PARAM ? "/path/:id/param" : endpoint.path
158219
}
220+
221+
boolean testPostStream() {
222+
true
223+
}
224+
225+
def "test post stream"() {
226+
Assume.assumeTrue(testPostStream())
227+
228+
when:
229+
// body should be large enough to trigger multiple calls to onNext
230+
def body = "foobar" * 10000
231+
def response = client.post(resolveAddress(POST_STREAM), body).aggregate().join()
232+
233+
then:
234+
response.status().code() == POST_STREAM.status
235+
response.contentUtf8() == POST_STREAM.body
236+
237+
def hasHandlerSpan = hasHandlerSpan(POST_STREAM)
238+
assertTraces(1) {
239+
trace(0, 5 + (hasHandlerSpan ? 1 : 0)) {
240+
span(0) {
241+
name "POST /post-stream"
242+
kind SERVER
243+
hasNoParent()
244+
}
245+
if (hasHandlerSpan) {
246+
span(1) {
247+
name "/post-stream"
248+
childOf span(0)
249+
}
250+
}
251+
def offset = hasHandlerSpan ? 1 : 0
252+
span(1 + offset) {
253+
name "controller"
254+
childOf span(offset)
255+
}
256+
span(2 + offset) {
257+
name "onNext"
258+
childOf span(0)
259+
}
260+
span(3 + offset) {
261+
name "onNext"
262+
childOf span(0)
263+
}
264+
span(4 + offset) {
265+
name "onComplete"
266+
childOf span(0)
267+
}
268+
}
269+
}
270+
}
159271
}

0 commit comments

Comments
 (0)