diff --git a/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/ContinuationStreamInstrumentation.java b/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/ContinuationStreamInstrumentation.java new file mode 100644 index 000000000000..11ebae1c892e --- /dev/null +++ b/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/ContinuationStreamInstrumentation.java @@ -0,0 +1,48 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.ratpack; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import ratpack.func.Block; + +public class ContinuationStreamInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("ratpack.exec.internal.ContinuationStream"); + } + + @Override + public ElementMatcher typeMatcher() { + return implementsInterface(named("ratpack.exec.internal.ContinuationStream")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + namedOneOf("complete", "event").and(takesArgument(0, named("ratpack.func.Block"))), + ContinuationStreamInstrumentation.class.getName() + "$WrapBlockAdvice"); + } + + @SuppressWarnings("unused") + public static class WrapBlockAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void wrap(@Advice.Argument(value = 0, readOnly = false) Block block) { + block = BlockWrapper.wrapIfNeeded(block); + } + } +} diff --git a/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackInstrumentationModule.java b/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackInstrumentationModule.java index ea02d096b3db..1ba8efc192bc 100644 --- a/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackInstrumentationModule.java +++ b/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackInstrumentationModule.java @@ -30,6 +30,7 @@ public String getModuleGroup() { public List typeInstrumentations() { return asList( new ContinuationInstrumentation(), + new ContinuationStreamInstrumentation(), new DefaultExecutionInstrumentation(), new DefaultExecStarterInstrumentation(), new ServerErrorHandlerInstrumentation(), diff --git a/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackSingletons.java b/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackSingletons.java index dca161ebffef..6796026d6dfd 100644 --- a/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackSingletons.java +++ b/instrumentation/ratpack/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackSingletons.java @@ -12,6 +12,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute; import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource; +import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import ratpack.handling.Context; public final class RatpackSingletons { @@ -19,6 +20,7 @@ public final class RatpackSingletons { private static final Instrumenter INSTRUMENTER = Instrumenter.builder( GlobalOpenTelemetry.get(), "io.opentelemetry.ratpack-1.4", s -> s) + .setEnabled(ExperimentalConfig.get().controllerTelemetryEnabled()) .buildInstrumenter(); public static Instrumenter instrumenter() { @@ -28,7 +30,9 @@ public static Instrumenter instrumenter() { public static void updateSpanNames(io.opentelemetry.context.Context otelContext, Context ctx) { String matchedRoute = updateServerSpanName(otelContext, ctx); // update ratpack span name - Span.fromContext(otelContext).updateName(matchedRoute); + if (ExperimentalConfig.get().controllerTelemetryEnabled()) { + Span.fromContext(otelContext).updateName(matchedRoute); + } } public static String updateServerSpanName( diff --git a/instrumentation/ratpack/ratpack-1.4/javaagent/src/test/groovy/server/RatpackForkedHttpServerTest.groovy b/instrumentation/ratpack/ratpack-1.4/javaagent/src/test/groovy/server/RatpackForkedHttpServerTest.groovy index 87dc2848b1ff..40ae557a9400 100644 --- a/instrumentation/ratpack/ratpack-1.4/javaagent/src/test/groovy/server/RatpackForkedHttpServerTest.groovy +++ b/instrumentation/ratpack/ratpack-1.4/javaagent/src/test/groovy/server/RatpackForkedHttpServerTest.groovy @@ -24,4 +24,10 @@ class RatpackForkedHttpServerTest extends AbstractRatpackForkedHttpServerTest im boolean testHttpPipelining() { false } + + @Override + boolean testPostStream() { + // controller span is parent of onNext span which is not expected + Boolean.getBoolean("testLatestDeps") + } } diff --git a/instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackAsyncHttpServerTest.groovy b/instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackAsyncHttpServerTest.groovy index f8cda9195c4f..4a207b4c3cf7 100644 --- a/instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackAsyncHttpServerTest.groovy +++ b/instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackAsyncHttpServerTest.groovy @@ -122,6 +122,15 @@ abstract class AbstractRatpackAsyncHttpServerTest extends AbstractRatpackHttpSer } } } + it.prefix(POST_STREAM.rawPath()) { + it.all { context -> + Promise.sync { + POST_STREAM + } then { + handlePostStream(context) + } + } + } } configure(it) } diff --git a/instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackForkedHttpServerTest.groovy b/instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackForkedHttpServerTest.groovy index cd4e3dc1fd38..925e409769e1 100644 --- a/instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackForkedHttpServerTest.groovy +++ b/instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackForkedHttpServerTest.groovy @@ -130,6 +130,15 @@ abstract class AbstractRatpackForkedHttpServerTest extends AbstractRatpackHttpSe } } } + it.prefix(POST_STREAM.rawPath()) { + it.all { context -> + Promise.sync { + POST_STREAM + }.fork().then { + handlePostStream(context) + } + } + } it.prefix("fork_and_yieldAll") { it.all { context -> def promise = Promise.async { upstream -> diff --git a/instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackHttpServerTest.groovy b/instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackHttpServerTest.groovy index 61735d098984..3b81fe793697 100644 --- a/instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackHttpServerTest.groovy +++ b/instrumentation/ratpack/ratpack-1.4/testing/src/main/groovy/io/opentelemetry/instrumentation/ratpack/server/AbstractRatpackHttpServerTest.groovy @@ -5,18 +5,23 @@ package io.opentelemetry.instrumentation.ratpack.server - +import io.netty.buffer.ByteBuf +import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.StatusCode import io.opentelemetry.instrumentation.test.asserts.TraceAssert import io.opentelemetry.instrumentation.test.base.HttpServerTest import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint import io.opentelemetry.sdk.trace.data.SpanData +import org.junit.Assume +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription import ratpack.error.ServerErrorHandler import ratpack.handling.Context import ratpack.server.RatpackServer import ratpack.server.RatpackServerSpec import static io.opentelemetry.api.trace.SpanKind.INTERNAL +import static io.opentelemetry.api.trace.SpanKind.SERVER import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION @@ -28,6 +33,14 @@ import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint abstract class AbstractRatpackHttpServerTest extends HttpServerTest { + protected static final ServerEndpoint POST_STREAM = + new ServerEndpoint( + "POST_STREAM", + "post-stream", + SUCCESS.getStatus(), + SUCCESS.getBody(), + false) + abstract void configure(RatpackServerSpec serverSpec) @Override @@ -100,6 +113,11 @@ abstract class AbstractRatpackHttpServerTest extends HttpServerTest + handlePostStream(context) + } + } } configure(it) } @@ -108,6 +126,49 @@ abstract class AbstractRatpackHttpServerTest extends HttpServerTest() { + private Subscription subscription + private int count + private String traceId + + @Override + void onSubscribe(Subscription subscription) { + this.subscription = subscription + traceId = Span.current().getSpanContext().getTraceId() + subscription.request(1) + } + + @Override + void onNext(ByteBuf byteBuf) { + assert traceId == Span.current().getSpanContext().getTraceId() + if (count < 2) { + runWithSpan("onNext") { + count++ + } + } + byteBuf.release() + subscription.request(1) + } + + @Override + void onError(Throwable throwable) { + // prints the assertion error from onNext + throwable.printStackTrace() + context.response.status(500).send(throwable.message) + } + + @Override + void onComplete() { + runWithSpan("onComplete") { + context.response.status(200).send(POST_STREAM.body) + } + } + }) + } + } + // TODO(anuraaga): The default Ratpack error handler also returns a 500 which is all we test, so // we don't actually have test coverage ensuring our instrumentation correctly delegates to this // user registered handler. @@ -156,4 +217,55 @@ abstract class AbstractRatpackHttpServerTest extends HttpServerTest