Skip to content

Commit 8e016a7

Browse files
authored
Fix flaky http pipelining test on akka http (#8437)
1 parent e9a74bd commit 8e016a7

File tree

7 files changed

+113
-27
lines changed

7 files changed

+113
-27
lines changed

instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorCellInstrumentation.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import akka.dispatch.sysmsg.SystemMessage;
1313
import io.opentelemetry.context.Scope;
1414
import io.opentelemetry.instrumentation.api.util.VirtualField;
15-
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
1615
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
1716
import io.opentelemetry.javaagent.bootstrap.executors.TaskAdviceHelper;
1817
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
@@ -53,11 +52,6 @@ public static void exit(@Advice.Enter Scope scope) {
5352
if (scope != null) {
5453
scope.close();
5554
}
56-
// akka-http instrumentation can leak scopes
57-
// reset the context to clear the leaked scopes
58-
if (Java8BytecodeBridge.currentContext() != Java8BytecodeBridge.rootContext()) {
59-
Java8BytecodeBridge.rootContext().makeCurrent();
60-
}
6155
}
6256
}
6357

instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaDispatcherInstrumentation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public static class DispatchEnvelopeAdvice {
4242
@Advice.OnMethodEnter(suppress = Throwable.class)
4343
public static PropagatedContext enterDispatch(@Advice.Argument(1) Envelope envelope) {
4444
Context context = Java8BytecodeBridge.currentContext();
45-
if (ExecutorAdviceHelper.shouldPropagateContext(context, envelope)) {
45+
if (ExecutorAdviceHelper.shouldPropagateContext(context, envelope.message())) {
4646
VirtualField<Envelope, PropagatedContext> virtualField =
4747
VirtualField.find(Envelope.class, PropagatedContext.class);
4848
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, envelope);

instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaFlowWrapper.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import akka.stream.stage.AbstractOutHandler;
2222
import akka.stream.stage.GraphStage;
2323
import akka.stream.stage.GraphStageLogic;
24+
import akka.stream.stage.OutHandler;
2425
import io.opentelemetry.context.Context;
25-
import io.opentelemetry.context.Scope;
2626
import io.opentelemetry.javaagent.bootstrap.http.HttpServerResponseCustomizerHolder;
2727
import java.util.ArrayDeque;
2828
import java.util.Deque;
@@ -43,6 +43,22 @@ public class AkkaFlowWrapper
4343
return handler.join(new AkkaFlowWrapper());
4444
}
4545

46+
public static Context getContext(OutHandler outHandler) {
47+
if (outHandler instanceof TracingLogic.ApplicationOutHandler) {
48+
// We have multiple requests here only when requests are pipelined on the same connection.
49+
// It appears that these requests are processed one by one so processing next request won't
50+
// be started before the first one has returned a response, because of this the first request
51+
// in the queue is always the one that is currently being processed.
52+
TracingRequest request =
53+
((TracingLogic.ApplicationOutHandler) outHandler).getRequests().peek();
54+
if (request != null) {
55+
return request.context;
56+
}
57+
}
58+
59+
return null;
60+
}
61+
4662
@Override
4763
public BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape() {
4864
return shape;
@@ -77,7 +93,7 @@ public void onDownstreamFinish() {
7793
// user code pulls request, pass request from server to user code
7894
setHandler(
7995
requestOut,
80-
new AbstractOutHandler() {
96+
new ApplicationOutHandler() {
8197
@Override
8298
public void onPull() {
8399
pull(requestIn);
@@ -102,9 +118,7 @@ public void onPush() {
102118
Context parentContext = currentContext();
103119
if (instrumenter().shouldStart(parentContext, request)) {
104120
Context context = instrumenter().start(parentContext, request);
105-
// scope opened here may leak, actor instrumentation will close it
106-
Scope scope = context.makeCurrent();
107-
tracingRequest = new TracingRequest(context, scope, request);
121+
tracingRequest = new TracingRequest(context, request);
108122
}
109123
// event if span wasn't started we need to push TracingRequest to match response
110124
// with request
@@ -134,10 +148,6 @@ public void onPush() {
134148

135149
TracingRequest tracingRequest = requests.poll();
136150
if (tracingRequest != null && tracingRequest != TracingRequest.EMPTY) {
137-
// this may happen on a different thread from the one that opened the scope
138-
// actor instrumentation will take care of the leaked scopes
139-
tracingRequest.scope.close();
140-
141151
// akka response is immutable so the customizer just captures the added headers
142152
AkkaHttpResponseMutator responseMutator = new AkkaHttpResponseMutator();
143153
HttpServerResponseCustomizerHolder.getCustomizer()
@@ -160,7 +170,6 @@ public void onUpstreamFailure(Throwable exception) {
160170
if (tracingRequest == TracingRequest.EMPTY) {
161171
continue;
162172
}
163-
tracingRequest.scope.close();
164173
instrumenter()
165174
.end(
166175
tracingRequest.context, tracingRequest.request, errorResponse(), exception);
@@ -175,17 +184,21 @@ public void onUpstreamFinish() {
175184
}
176185
});
177186
}
187+
188+
abstract class ApplicationOutHandler extends AbstractOutHandler {
189+
Deque<TracingRequest> getRequests() {
190+
return requests;
191+
}
192+
}
178193
}
179194

180195
private static class TracingRequest {
181-
static final TracingRequest EMPTY = new TracingRequest(null, null, null);
196+
static final TracingRequest EMPTY = new TracingRequest(null, null);
182197
final Context context;
183-
final Scope scope;
184198
final HttpRequest request;
185199

186-
TracingRequest(Context context, Scope scope, HttpRequest request) {
200+
TracingRequest(Context context, HttpRequest request) {
187201
this.context = context;
188-
this.scope = scope;
189202
this.request = request;
190203
}
191204
}

instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaHttpServerInstrumentationModule.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,30 @@
55

66
package io.opentelemetry.javaagent.instrumentation.akkahttp.server;
77

8-
import static java.util.Collections.singletonList;
8+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
9+
import static java.util.Arrays.asList;
910

1011
import com.google.auto.service.AutoService;
1112
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
1213
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1314
import java.util.List;
15+
import net.bytebuddy.matcher.ElementMatcher;
1416

1517
@AutoService(InstrumentationModule.class)
1618
public class AkkaHttpServerInstrumentationModule extends InstrumentationModule {
1719
public AkkaHttpServerInstrumentationModule() {
1820
super("akka-http", "akka-http-10.0", "akka-http-server");
1921
}
2022

23+
@Override
24+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
25+
// in GraphInterpreterInstrumentation we instrument a class that belongs to akka-streams, make
26+
// sure this runs only when akka-http is present to avoid muzzle failures
27+
return hasClassesNamed("akka.http.scaladsl.HttpExt");
28+
}
29+
2130
@Override
2231
public List<TypeInstrumentation> typeInstrumentations() {
23-
return singletonList(new HttpExtServerInstrumentation());
32+
return asList(new HttpExtServerInstrumentation(), new GraphInterpreterInstrumentation());
2433
}
2534
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.akkahttp.server;
7+
8+
import com.google.auto.service.AutoService;
9+
import io.opentelemetry.javaagent.extension.ignore.IgnoredTypesBuilder;
10+
import io.opentelemetry.javaagent.extension.ignore.IgnoredTypesConfigurer;
11+
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
12+
13+
@AutoService(IgnoredTypesConfigurer.class)
14+
public class AkkaServerIgnoredTypesConfigurer implements IgnoredTypesConfigurer {
15+
16+
@Override
17+
public void configure(IgnoredTypesBuilder builder, ConfigProperties config) {
18+
// in AkkaHttpServerInstrumentationTestAsync http pipeline test sending this message trigger
19+
// processing next request, we don't want to propagate context there
20+
builder.ignoreTaskClass("akka.stream.impl.fusing.ActorGraphInterpreter$AsyncInput");
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.akkahttp.server;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.named;
9+
10+
import akka.stream.impl.fusing.GraphInterpreter;
11+
import io.opentelemetry.context.Context;
12+
import io.opentelemetry.context.Scope;
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
15+
import net.bytebuddy.asm.Advice;
16+
import net.bytebuddy.description.type.TypeDescription;
17+
import net.bytebuddy.matcher.ElementMatcher;
18+
19+
public class GraphInterpreterInstrumentation implements TypeInstrumentation {
20+
@Override
21+
public ElementMatcher<TypeDescription> typeMatcher() {
22+
return named("akka.stream.impl.fusing.GraphInterpreter");
23+
}
24+
25+
@Override
26+
public void transform(TypeTransformer transformer) {
27+
transformer.applyAdviceToMethod(
28+
named("processPush"), GraphInterpreterInstrumentation.class.getName() + "$PushAdvice");
29+
}
30+
31+
@SuppressWarnings("unused")
32+
public static class PushAdvice {
33+
34+
@Advice.OnMethodEnter(suppress = Throwable.class)
35+
public static Scope onEnter(@Advice.Argument(0) GraphInterpreter.Connection connection) {
36+
// processPush is called when execution passes to application or server. Here we propagate the
37+
// context to the application code.
38+
Context context = AkkaFlowWrapper.getContext(connection.outHandler());
39+
if (context != null) {
40+
return context.makeCurrent();
41+
}
42+
return null;
43+
}
44+
45+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
46+
public static void exit(@Advice.Enter Scope scope) {
47+
if (scope != null) {
48+
scope.close();
49+
}
50+
}
51+
}
52+
}

instrumentation/play/play-mvc/play-mvc-2.6/javaagent/build.gradle.kts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,6 @@ tasks {
7474
dependsOn(testing.suites)
7575
}
7676
}
77-
78-
withType<Test>().configureEach {
79-
jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false")
80-
}
8177
}
8278

8379
// play-test depends on websocket-client

0 commit comments

Comments
 (0)