Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.ratpack;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.reactivestreams.Subscriber;

public class ExecutionBoundPublisherInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf(
"ratpack.exec.internal.ExecutionBoundPublisher",
"ratpack.exec.internal.DefaultExecution$ExecutionBoundPublisher");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("subscribe").and(takesArgument(0, named("org.reactivestreams.Subscriber"))),
this.getClass().getName() + "$SubscribeAdvice");
}

@SuppressWarnings("unused")
public static class SubscribeAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static <T> void wrap(
@Advice.Argument(value = 0, readOnly = false) Subscriber<T> subscriber) {
subscriber = new TracingSubscriber<>(subscriber, Java8BytecodeBridge.currentContext());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
new ContinuationStreamInstrumentation(),
new DefaultExecutionInstrumentation(),
new DefaultExecStarterInstrumentation(),
new ExecutionBoundPublisherInstrumentation(),
new ServerErrorHandlerInstrumentation(),
new ServerRegistryInstrumentation());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.ratpack;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public final class TracingSubscriber<T> implements Subscriber<T> {
private final Subscriber<T> delegate;
private final Context context;

public TracingSubscriber(Subscriber<T> delegate, Context context) {
this.delegate = delegate;
this.context = context;
}

@Override
public void onSubscribe(Subscription subscription) {
try (Scope ignore = context.makeCurrent()) {
delegate.onSubscribe(subscription);
}
}

@Override
public void onNext(T t) {
try (Scope ignore = context.makeCurrent()) {
delegate.onNext(t);
}
}

@Override
public void onError(Throwable throwable) {
try (Scope ignore = context.makeCurrent()) {
delegate.onError(throwable);
}
}

@Override
public void onComplete() {
try (Scope ignore = context.makeCurrent()) {
delegate.onComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,4 @@ class RatpackForkedHttpServerTest extends AbstractRatpackForkedHttpServerTest im
boolean testHttpPipelining() {
false
}

@Override
boolean testPostStream() {
// controller span is parent of onNext span which is not expected
Boolean.getBoolean("testLatestDeps")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ abstract class AbstractRatpackHttpServerTest extends HttpServerTest<RatpackServe
response.contentUtf8() == POST_STREAM.body

def hasHandlerSpan = hasHandlerSpan(POST_STREAM)
// when using javaagent instrumentation the parent of reactive callbacks is the controller span
// where subscribe was called, for library instrumentation server span is the parent
def reactiveCallbackParent = hasHandlerSpan ? 2 : 0
assertTraces(1) {
trace(0, 5 + (hasHandlerSpan ? 1 : 0)) {
span(0) {
Expand All @@ -255,15 +258,15 @@ abstract class AbstractRatpackHttpServerTest extends HttpServerTest<RatpackServe
}
span(2 + offset) {
name "onNext"
childOf span(0)
childOf span(reactiveCallbackParent)
}
span(3 + offset) {
name "onNext"
childOf span(0)
childOf span(reactiveCallbackParent)
}
span(4 + offset) {
name "onComplete"
childOf span(0)
childOf span(reactiveCallbackParent)
}
}
}
Expand Down
Loading