Skip to content

Commit d8a201a

Browse files
jetty-httpclient-12: send method must pass along implemented response listeners
The super HttpRequest.send call says: > The listener passed to this method may implement not only Response.CompleteListener > but also other response listener interfaces, and all the events implemented will be notified. The current implementation passes through a lambda that implements CompleteListener only, and does not delegate-in-scope to any other callback methods you might provide
1 parent 54693b6 commit d8a201a

File tree

3 files changed

+113
-7
lines changed

3 files changed

+113
-7
lines changed

instrumentation/jetty-httpclient/jetty-httpclient-12.0/library/src/main/java/io/opentelemetry/instrumentation/jetty/httpclient/v12_0/TracingHttpRequest.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.opentelemetry.context.Scope;
1010
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
1111
import io.opentelemetry.instrumentation.jetty.httpclient.v12_0.internal.JettyClientTracingListener;
12+
import io.opentelemetry.instrumentation.jetty.httpclient.v12_0.internal.JettyClientWrapUtil;
1213
import java.net.URI;
1314
import java.nio.ByteBuffer;
1415
import org.eclipse.jetty.client.HttpClient;
@@ -34,14 +35,9 @@ public TracingHttpRequest(
3435
@Override
3536
public void send(Response.CompleteListener listener) {
3637
parentContext = Context.current();
37-
// start span and attach listeners.
38+
// start span and attach listeners - must handle all listeners, not just CompleteListener.
3839
JettyClientTracingListener.handleRequest(parentContext, this, instrumenter);
39-
super.send(
40-
result -> {
41-
try (Scope scope = openScope()) {
42-
listener.onComplete(result);
43-
}
44-
});
40+
super.send(JettyClientWrapUtil.wrapTheListener(listener, parentContext));
4541
}
4642

4743
private Scope openScope() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.jetty.httpclient.v12_0.internal;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import java.lang.reflect.InvocationTargetException;
11+
import java.lang.reflect.Proxy;
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import org.eclipse.jetty.client.Response;
15+
16+
/**
17+
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
18+
* any time.
19+
*/
20+
public final class JettyClientWrapUtil {
21+
private static final Class<?>[] LISTENER_INTERFACES = {
22+
Response.CompleteListener.class,
23+
Response.FailureListener.class,
24+
Response.SuccessListener.class,
25+
Response.AsyncContentListener.class,
26+
Response.ContentSourceListener.class,
27+
Response.ContentListener.class,
28+
Response.HeadersListener.class,
29+
Response.HeaderListener.class,
30+
Response.BeginListener.class
31+
};
32+
33+
private JettyClientWrapUtil() {}
34+
35+
/**
36+
* Utility to wrap the response listeners only, this includes the important CompleteListener.
37+
*
38+
* @param context top level context that is above the Jetty client span context
39+
* @param listener listener passed to Jetty client send() method
40+
* @return wrapped listener
41+
*/
42+
public static Response.CompleteListener wrapTheListener(
43+
Response.CompleteListener listener, Context context) {
44+
if (listener == null) {
45+
return listener;
46+
}
47+
48+
Class<?> listenerClass = listener.getClass();
49+
List<Class<?>> interfaces = new ArrayList<>();
50+
for (Class<?> type : LISTENER_INTERFACES) {
51+
if (type.isInstance(listener)) {
52+
interfaces.add(type);
53+
}
54+
}
55+
if (interfaces.isEmpty()) {
56+
return listener;
57+
}
58+
59+
return (Response.CompleteListener)
60+
Proxy.newProxyInstance(
61+
listenerClass.getClassLoader(),
62+
interfaces.toArray(new Class<?>[0]),
63+
(proxy, method, args) -> {
64+
try (Scope ignored = context.makeCurrent()) {
65+
return method.invoke(listener, args);
66+
} catch (InvocationTargetException exception) {
67+
throw exception.getCause();
68+
}
69+
});
70+
}
71+
}

instrumentation/jetty-httpclient/jetty-httpclient-12.0/testing/src/main/java/io/opentelemetry/instrumentation/jetty/httpclient/v12_0/AbstractJettyClient12Test.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,30 @@
55

66
package io.opentelemetry.instrumentation.jetty.httpclient.v12_0;
77

8+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
9+
810
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
911
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
1012
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
1113
import java.net.URI;
1214
import java.util.Map;
1315
import java.util.Objects;
16+
import java.util.Set;
17+
import java.util.concurrent.ConcurrentHashMap;
1418
import java.util.concurrent.ExecutionException;
1519
import java.util.concurrent.TimeUnit;
1620
import java.util.concurrent.TimeoutException;
1721
import org.eclipse.jetty.client.ContentResponse;
22+
import org.eclipse.jetty.client.FutureResponseListener;
1823
import org.eclipse.jetty.client.HttpClient;
1924
import org.eclipse.jetty.client.Request;
2025
import org.eclipse.jetty.client.Response;
26+
import org.eclipse.jetty.client.Result;
2127
import org.eclipse.jetty.http.HttpField;
2228
import org.eclipse.jetty.util.ssl.SslContextFactory;
2329
import org.junit.jupiter.api.AfterEach;
2430
import org.junit.jupiter.api.BeforeEach;
31+
import org.junit.jupiter.api.Test;
2532

2633
public abstract class AbstractJettyClient12Test extends AbstractHttpClientTest<Request> {
2734

@@ -108,6 +115,38 @@ public void sendRequestWithCallback(
108115
});
109116
}
110117

118+
@Test
119+
void callbacksCalled() throws InterruptedException, ExecutionException {
120+
URI uri = resolveAddress("/success");
121+
Set<String> callbacks = ConcurrentHashMap.newKeySet();
122+
Request request = client.newRequest(uri).method("GET");
123+
FutureResponseListener responseListener =
124+
new FutureResponseListener(request) {
125+
@Override
126+
public void onHeaders(Response response) {
127+
callbacks.add("headers");
128+
super.onHeaders(response);
129+
}
130+
131+
@Override
132+
public void onSuccess(Response response) {
133+
callbacks.add("success");
134+
super.onSuccess(response);
135+
}
136+
137+
@Override
138+
public void onComplete(Result result) {
139+
callbacks.add("complete");
140+
super.onComplete(result);
141+
}
142+
};
143+
request.send(responseListener);
144+
Response response = responseListener.get();
145+
146+
assertThat(response.getStatus()).isEqualTo(200);
147+
assertThat(callbacks).containsExactlyInAnyOrder("headers", "success", "complete");
148+
}
149+
111150
private static class JettyClientListener
112151
implements Request.FailureListener, Response.FailureListener {
113152
volatile Throwable failure;

0 commit comments

Comments
 (0)