Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.ratpack;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import ratpack.func.Block;

public class ContinuationStreamInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("ratpack.exec.internal.ContinuationStream");
}

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(named("ratpack.exec.internal.ContinuationStream"));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
namedOneOf("complete", "event").and(takesArgument(0, named("ratpack.func.Block"))),
ContinuationStreamInstrumentation.class.getName() + "$WrapBlockAdvice");
}

@SuppressWarnings("unused")
public static class WrapBlockAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrap(@Advice.Argument(value = 0, readOnly = false) Block block) {
block = BlockWrapper.wrapIfNeeded(block);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public String getModuleGroup() {
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new ContinuationInstrumentation(),
new ContinuationStreamInstrumentation(),
new DefaultExecutionInstrumentation(),
new DefaultExecStarterInstrumentation(),
new ServerErrorHandlerInstrumentation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute;
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import ratpack.handling.Context;

public final class RatpackSingletons {

private static final Instrumenter<String, Void> INSTRUMENTER =
Instrumenter.<String, Void>builder(
GlobalOpenTelemetry.get(), "io.opentelemetry.ratpack-1.4", s -> s)
.setEnabled(ExperimentalConfig.get().controllerTelemetryEnabled())
.buildInstrumenter();

public static Instrumenter<String, Void> instrumenter() {
Expand All @@ -28,7 +30,9 @@ public static Instrumenter<String, Void> instrumenter() {
public static void updateSpanNames(io.opentelemetry.context.Context otelContext, Context ctx) {
String matchedRoute = updateServerSpanName(otelContext, ctx);
// update ratpack span name
Span.fromContext(otelContext).updateName(matchedRoute);
if (ExperimentalConfig.get().controllerTelemetryEnabled()) {
Span.fromContext(otelContext).updateName(matchedRoute);
}
}

public static String updateServerSpanName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,10 @@ class RatpackForkedHttpServerTest extends AbstractRatpackForkedHttpServerTest im
boolean testHttpPipelining() {
false
}

@Override
boolean testPostStream() {
// controller span is parent of onNext span which is not expected
Boolean.getBoolean("testLatestDeps")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ abstract class AbstractRatpackAsyncHttpServerTest extends AbstractRatpackHttpSer
}
}
}
it.prefix(POST_STREAM.rawPath()) {
it.all { context ->
Promise.sync {
POST_STREAM
} then {
handlePostStream(context)
}
}
}
}
configure(it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ abstract class AbstractRatpackForkedHttpServerTest extends AbstractRatpackHttpSe
}
}
}
it.prefix(POST_STREAM.rawPath()) {
it.all { context ->
Promise.sync {
POST_STREAM
}.fork().then {
handlePostStream(context)
}
}
}
it.prefix("fork_and_yieldAll") {
it.all { context ->
def promise = Promise.async { upstream ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@

package io.opentelemetry.instrumentation.ratpack.server


import io.netty.buffer.ByteBuf
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint
import io.opentelemetry.sdk.trace.data.SpanData
import org.junit.Assume
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import ratpack.error.ServerErrorHandler
import ratpack.handling.Context
import ratpack.server.RatpackServer
import ratpack.server.RatpackServerSpec

import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.SERVER
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION
Expand All @@ -28,6 +33,14 @@ import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint

abstract class AbstractRatpackHttpServerTest extends HttpServerTest<RatpackServer> {

protected static final ServerEndpoint POST_STREAM =
new ServerEndpoint(
"POST_STREAM",
"post-stream",
SUCCESS.getStatus(),
SUCCESS.getBody(),
false)

abstract void configure(RatpackServerSpec serverSpec)

@Override
Expand Down Expand Up @@ -100,6 +113,11 @@ abstract class AbstractRatpackHttpServerTest extends HttpServerTest<RatpackServe
}
}
}
it.prefix(POST_STREAM.rawPath()) {
it.all { context ->
handlePostStream(context)
}
}
}
configure(it)
}
Expand All @@ -108,6 +126,49 @@ abstract class AbstractRatpackHttpServerTest extends HttpServerTest<RatpackServe
return ratpack
}

def handlePostStream(context) {
controller(POST_STREAM) {
context.request.bodyStream.subscribe(new Subscriber<ByteBuf>() {
private Subscription subscription
private int count
private String traceId

@Override
void onSubscribe(Subscription subscription) {
this.subscription = subscription
traceId = Span.current().getSpanContext().getTraceId()
subscription.request(1)
}

@Override
void onNext(ByteBuf byteBuf) {
assert traceId == Span.current().getSpanContext().getTraceId()
if (count < 2) {
runWithSpan("onNext") {
count++
}
}
byteBuf.release()
subscription.request(1)
}

@Override
void onError(Throwable throwable) {
// prints the assertion error from onNext
throwable.printStackTrace()
context.response.status(500).send(throwable.message)
}

@Override
void onComplete() {
runWithSpan("onComplete") {
context.response.status(200).send(POST_STREAM.body)
}
}
})
}
}

// TODO(anuraaga): The default Ratpack error handler also returns a 500 which is all we test, so
// we don't actually have test coverage ensuring our instrumentation correctly delegates to this
// user registered handler.
Expand Down Expand Up @@ -156,4 +217,55 @@ abstract class AbstractRatpackHttpServerTest extends HttpServerTest<RatpackServe
String expectedHttpRoute(ServerEndpoint endpoint, String method) {
return endpoint.status == 404 ? "/" : endpoint == PATH_PARAM ? "/path/:id/param" : endpoint.path
}

boolean testPostStream() {
true
}

def "test post stream"() {
Assume.assumeTrue(testPostStream())

when:
// body should be large enough to trigger multiple calls to onNext
def body = "foobar" * 10000
def response = client.post(resolveAddress(POST_STREAM), body).aggregate().join()

then:
response.status().code() == POST_STREAM.status
response.contentUtf8() == POST_STREAM.body

def hasHandlerSpan = hasHandlerSpan(POST_STREAM)
assertTraces(1) {
trace(0, 5 + (hasHandlerSpan ? 1 : 0)) {
span(0) {
name "POST /post-stream"
kind SERVER
hasNoParent()
}
if (hasHandlerSpan) {
span(1) {
name "/post-stream"
childOf span(0)
}
}
def offset = hasHandlerSpan ? 1 : 0
span(1 + offset) {
name "controller"
childOf span(offset)
}
span(2 + offset) {
name "onNext"
childOf span(0)
}
span(3 + offset) {
name "onNext"
childOf span(0)
}
span(4 + offset) {
name "onComplete"
childOf span(0)
}
}
}
}
}
Loading