Skip to content

Commit 5267462

Browse files
laurittrask
andauthored
Fix http concurrency test with large responses (#5648)
* Debug http client concurrency test failures * debugging * context porpagation to callbacks isn't really implemented * verify that request succeeds in single connection concurrency test * spotless * verify request status in http client concurrency test * update comment * remove large response * Trigger Build * Update instrumentation/netty/netty-3.8/javaagent/src/test/groovy/Netty38ClientTest.groovy Co-authored-by: Trask Stalnaker <[email protected]> Co-authored-by: Trask Stalnaker <[email protected]>
1 parent b668e73 commit 5267462

File tree

4 files changed

+50
-30
lines changed

4 files changed

+50
-30
lines changed

instrumentation/executors/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/AbstractExecutorServiceTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,7 @@ protected final void executeAndCancelTasks(Function<U, Future<?>> task) {
161161
});
162162

163163
// Just check there is a single trace, this test is primarily to make sure that scopes aren't
164-
// leak on
165-
// cancellation.
164+
// leaked on cancellation.
166165
testing.waitAndAssertTraces(trace -> {});
167166
}
168167
}

instrumentation/netty/netty-3.8/javaagent/src/test/groovy/Netty38ClientTest.groovy

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import com.ning.http.client.Request
1010
import com.ning.http.client.RequestBuilder
1111
import com.ning.http.client.Response
1212
import io.opentelemetry.api.common.AttributeKey
13+
import io.opentelemetry.context.Context
14+
import io.opentelemetry.context.Scope
1315
import io.opentelemetry.instrumentation.test.AgentTestTrait
1416
import io.opentelemetry.instrumentation.test.base.HttpClientTest
1517
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
@@ -70,17 +72,23 @@ class Netty38ClientTest extends HttpClientTest<Request> implements AgentTestTrai
7072

7173
@Override
7274
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, AbstractHttpClientTest.RequestResult requestResult) {
75+
// TODO: context is not automatically propagated into callbacks
76+
Context context = Context.current()
7377
// TODO(anuraaga): Do we also need to test ListenableFuture callback?
7478
client.executeRequest(request, new AsyncCompletionHandler<Void>() {
7579
@Override
7680
Void onCompleted(Response response) throws Exception {
77-
requestResult.complete(response.statusCode)
81+
try (Scope scope = context.makeCurrent()) {
82+
requestResult.complete(response.statusCode)
83+
}
7884
return null
7985
}
8086

8187
@Override
8288
void onThrowable(Throwable throwable) {
83-
requestResult.complete(throwable)
89+
try (Scope scope = context.makeCurrent()) {
90+
requestResult.complete(throwable)
91+
}
8492
}
8593
})
8694
}

instrumentation/spring/spring-webflux-5.0/javaagent/src/test/groovy/client/SpringWebFluxSingleConnection.groovy

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ import java.util.concurrent.TimeoutException
2121

2222
class SpringWebFluxSingleConnection implements SingleConnection {
2323
private final ReactorClientHttpConnector connector
24+
private final WebClient webClient
2425
private final String host
2526
private final int port
2627

2728
SpringWebFluxSingleConnection(boolean isOldVersion, String host, int port) {
2829
if (isOldVersion) {
2930
connector = new ReactorClientHttpConnector({ HttpClientOptions.Builder clientOptions ->
3031
clientOptions.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, HttpClientTest.CONNECT_TIMEOUT_MS)
31-
clientOptions.poolResources(PoolResources.fixed("pool", 1, HttpClientTest.CONNECT_TIMEOUT_MS))
32+
clientOptions.poolResources(PoolResources.fixed("pool", 1))
3233
})
3334
} else {
3435
def httpClient = HttpClient.create().tcpConfiguration({ tcpClient ->
@@ -40,6 +41,7 @@ class SpringWebFluxSingleConnection implements SingleConnection {
4041

4142
this.host = host
4243
this.port = port
44+
this.webClient = WebClient.builder().clientConnector(connector).build()
4345
}
4446

4547
@Override
@@ -53,11 +55,13 @@ class SpringWebFluxSingleConnection implements SingleConnection {
5355
throw new ExecutionException(e)
5456
}
5557

56-
def request = WebClient.builder().clientConnector(connector).build().method(HttpMethod.GET)
58+
def request = webClient.method(HttpMethod.GET)
5759
.uri(uri)
5860
.headers { h -> headers.forEach({ key, value -> h.add(key, value) }) }
5961

6062
def response = request.exchange().block()
63+
// read response body, this seems to be needed to ensure that the connection can be reused
64+
response.bodyToMono(String).block()
6165

6266
String responseId = response.headers().asHttpHeaders().getFirst(REQUEST_ID_HEADER)
6367
if (requestId != responseId) {

testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/AbstractHttpClientTest.java

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -680,17 +680,22 @@ void highConcurrency() {
680680
throw new AssertionError(e);
681681
}
682682
try {
683-
testing.runWithSpan(
684-
"Parent span " + index,
685-
() -> {
686-
Span.current().setAttribute("test.request.id", index);
687-
doRequest(
688-
method,
689-
uri,
690-
Collections.singletonMap("test-request-id", String.valueOf(index)));
691-
});
692-
} catch (Exception e) {
693-
throw new AssertionError(e);
683+
Integer result =
684+
testing.runWithSpan(
685+
"Parent span " + index,
686+
() -> {
687+
Span.current().setAttribute("test.request.id", index);
688+
return doRequest(
689+
method,
690+
uri,
691+
Collections.singletonMap("test-request-id", String.valueOf(index)));
692+
});
693+
assertThat(result).isEqualTo(200);
694+
} catch (Throwable throwable) {
695+
if (throwable instanceof AssertionError) {
696+
throw (AssertionError) throwable;
697+
}
698+
throw new AssertionError(throwable);
694699
}
695700
};
696701
pool.submit(job);
@@ -832,19 +837,23 @@ void highConcurrencyOnSingleConnection() {
832837
} catch (InterruptedException e) {
833838
throw new AssertionError(e);
834839
}
835-
testing.runWithSpan(
836-
"Parent span " + index,
837-
() -> {
838-
Span.current().setAttribute("test.request.id", index);
839-
try {
840-
singleConnection.doRequest(
841-
path, Collections.singletonMap("test-request-id", String.valueOf(index)));
842-
} catch (InterruptedException e) {
843-
throw new AssertionError(e);
844-
} catch (Exception e) {
845-
throw new AssertionError(e);
846-
}
847-
});
840+
try {
841+
Integer result =
842+
testing.runWithSpan(
843+
"Parent span " + index,
844+
() -> {
845+
Span.current().setAttribute("test.request.id", index);
846+
return singleConnection.doRequest(
847+
path,
848+
Collections.singletonMap("test-request-id", String.valueOf(index)));
849+
});
850+
assertThat(result).isEqualTo(200);
851+
} catch (Throwable throwable) {
852+
if (throwable instanceof AssertionError) {
853+
throw (AssertionError) throwable;
854+
}
855+
throw new AssertionError(throwable);
856+
}
848857
};
849858
pool.submit(job);
850859
}

0 commit comments

Comments
 (0)