Skip to content

Commit 56ffab8

Browse files
committed
fix formatting, enable http-pipelining test + fixes
1 parent 1bc0337 commit 56ffab8

File tree

6 files changed

+48
-24
lines changed

6 files changed

+48
-24
lines changed

instrumentation/pekko/pekko-http-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/server/PekkoFlowWrapper.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static Context getContext(OutHandler outHandler) {
4343
// be started before the first one has returned a response, because of this the first request
4444
// in the queue is always the one that is currently being processed.
4545
PekkoTracingRequest request =
46-
((TracingLogic.ApplicationOutHandler) outHandler).getRequests().peek();
46+
((TracingLogic.ApplicationOutHandler) outHandler).getRequests().poll();
4747
if (request != null) {
4848
return request.context;
4949
}
@@ -110,7 +110,8 @@ public void onPush() {
110110
request
111111
.getAttribute(PekkoTracingRequest.ATTR_KEY)
112112
.orElse(PekkoTracingRequest.EMPTY);
113-
if (tracingRequest == PekkoTracingRequest.EMPTY) {
113+
if (tracingRequest != PekkoTracingRequest.EMPTY) {
114+
// Remove HttpRequest attribute before passing it to user code
114115
request = (HttpRequest) request.removeAttribute(PekkoTracingRequest.ATTR_KEY);
115116
}
116117
// event if span wasn't started we need to push TracingRequest to match response
@@ -138,19 +139,16 @@ public void onUpstreamFailure(Throwable exception) {
138139
@Override
139140
public void onPush() {
140141
HttpResponse response = grab(responseIn);
141-
requests.poll();
142142
push(responseOut, response);
143143
}
144144

145145
@Override
146146
public void onUpstreamFailure(Throwable exception) {
147-
requests.clear();
148147
fail(responseOut, exception);
149148
}
150149

151150
@Override
152151
public void onUpstreamFinish() {
153-
requests.clear();
154152
completeStage();
155153
}
156154
});

instrumentation/pekko/pekko-http-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/server/PekkoHttpServerTracer.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void onPush() {
101101
if (instrumenter().shouldStart(parentContext, request)) {
102102
Context context = instrumenter().start(parentContext, request);
103103
context = PekkoRouteHolder.init(context);
104-
tracingRequest = new PekkoTracingRequest(context, request);
104+
tracingRequest = new PekkoTracingRequest(context, parentContext, request);
105105
request =
106106
(HttpRequest)
107107
request.addAttribute(PekkoTracingRequest.ATTR_KEY, tracingRequest);
@@ -144,24 +144,41 @@ public void onPush() {
144144
response = (HttpResponse) response.addHeaders(headers);
145145
}
146146

147+
// When GraphInterpreterInstrumentation sets `tracingRequest.context` as the current
148+
// context, the akka Envelope + Actor instrumentation propagates it all the way
149+
// back to here and follows the HttpResponse up the stack of stages.
150+
// If http-pipelining is enabled, it will also propagate this context to the handling
151+
// of the next request, leading to context-leaking errors.
152+
// To prevent this, we reset the context to what it was before creating it.
153+
tracingRequest.parentContext.makeCurrent();
154+
147155
instrumenter().end(tracingRequest.context, tracingRequest.request, response, null);
148156
}
149157
push(responseOut, response);
150158
}
151159

152160
@Override
153161
public void onUpstreamFailure(Throwable exception) {
154-
endOngoingSpans(exception);
162+
// End the span for the request that failed
163+
PekkoTracingRequest tracingRequest = requests.poll();
164+
if (tracingRequest != null && tracingRequest != PekkoTracingRequest.EMPTY) {
165+
// see comment above
166+
tracingRequest.parentContext.makeCurrent();
167+
instrumenter()
168+
.end(
169+
tracingRequest.context,
170+
tracingRequest.request,
171+
PekkoHttpServerSingletons.errorResponse(),
172+
exception);
173+
}
174+
155175
fail(responseOut, exception);
156176
}
157177

178+
158179
@Override
159180
public void onUpstreamFinish() {
160-
endOngoingSpans(null);
161-
completeStage();
162-
}
163-
164-
private void endOngoingSpans(Throwable exception) {
181+
// End any ongoing spans, though there should be none.
165182
PekkoTracingRequest tracingRequest;
166183
while ((tracingRequest = requests.poll()) != null) {
167184
if (tracingRequest == PekkoTracingRequest.EMPTY) {
@@ -172,8 +189,9 @@ private void endOngoingSpans(Throwable exception) {
172189
tracingRequest.context,
173190
tracingRequest.request,
174191
PekkoHttpServerSingletons.errorResponse(),
175-
exception);
192+
null);
176193
}
194+
completeStage();
177195
}
178196
});
179197
}

instrumentation/pekko/pekko-http-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/server/PekkoTracingRequest.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,31 @@
55

66
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
77

8+
import static io.opentelemetry.context.ContextKey.named;
9+
810
import io.opentelemetry.context.Context;
11+
import io.opentelemetry.context.ContextKey;
12+
import io.opentelemetry.context.ImplicitContextKeyed;
913
import org.apache.pekko.http.scaladsl.model.AttributeKey;
1014
import org.apache.pekko.http.scaladsl.model.HttpRequest;
1115

12-
public class PekkoTracingRequest {
16+
public class PekkoTracingRequest implements ImplicitContextKeyed {
17+
private static final ContextKey<PekkoTracingRequest> CONTEXT_KEY = named("opentelemetry-pekko-tracing-request");
1318
static final AttributeKey<PekkoTracingRequest> ATTR_KEY =
1419
new AttributeKey<>("_otel_ctx", PekkoTracingRequest.class);
15-
static final PekkoTracingRequest EMPTY = new PekkoTracingRequest(null, null);
20+
static final PekkoTracingRequest EMPTY = new PekkoTracingRequest(null, null, null);
1621
final Context context;
22+
final Context parentContext;
1723
final HttpRequest request;
1824

19-
PekkoTracingRequest(Context context, HttpRequest request) {
25+
PekkoTracingRequest(Context context, Context parentContext, HttpRequest request) {
2026
this.context = context;
27+
this.parentContext = parentContext;
2128
this.request = request;
2229
}
30+
31+
@Override
32+
public Context storeInContext(Context context) {
33+
return context.with(CONTEXT_KEY, this);
34+
}
2335
}

instrumentation/pekko/pekko-http-1.0/javaagent/src/test/resources/application.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,7 @@ pekko.http {
88
connecting-timeout = 5s
99
}
1010
}
11+
server {
12+
pipelining-limit = 4 # default is 1
13+
}
1114
}

instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerInstrumentationTestAsync.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,4 @@ class PekkoHttpServerInstrumentationTestAsync
2525

2626
override protected def stopServer(server: Object): Unit =
2727
PekkoHttpTestAsyncWebServer.stop()
28-
29-
override protected def configure(
30-
options: HttpServerTestOptions
31-
): Unit = {
32-
super.configure(options)
33-
options.setTestHttpPipelining(false)
34-
}
3528
}

instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestAsyncWebServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ object PekkoHttpTestAsyncWebServer {
6464
if (null == binding) {
6565
import scala.concurrent.duration._
6666
binding = Await.result(
67-
Http().bindAndHandleAsync(asyncHandler, "localhost", port),
67+
Http().bindAndHandleAsync(asyncHandler, "localhost", port, parallelism = 2),
6868
10.seconds
6969
)
7070
}

0 commit comments

Comments
 (0)