Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ dependencies {
testInstrumentation(project(":instrumentation:rxjava:rxjava-2.0:javaagent"))
testInstrumentation(project(":instrumentation:vertx:vertx-http-client:vertx-http-client-3.0:javaagent"))
testInstrumentation(project(":instrumentation:vertx:vertx-http-client:vertx-http-client-4.0:javaagent"))
testInstrumentation(project(":instrumentation:vertx:vertx-http-client:vertx-http-client-5.0:javaagent"))
testInstrumentation(project(":instrumentation:vertx:vertx-sql-client:vertx-sql-client-5.0:javaagent"))
testInstrumentation(project(":instrumentation:vertx:vertx-web-3.0:javaagent"))
}

val testLatestDeps = findProperty("testLatestDeps") as Boolean

testing {
suites {
val version35Test by registering(JvmTestSuite::class) {
Expand All @@ -33,50 +37,66 @@ testing {
// inclusion of this artifact inside :testing-common
compileOnly(project.dependencies.project(":testing:armeria-shaded-for-testing", configuration = "shadow"))

val version = if (testLatestDeps) "3.+" else "3.5.0"
implementation("org.hsqldb:hsqldb:2.3.4")
compileOnly("io.vertx:vertx-codegen:$version")
implementation("io.vertx:vertx-web:$version")
implementation("io.vertx:vertx-rx-java2:$version")
implementation("io.vertx:vertx-web-client:$version")
implementation("io.vertx:vertx-jdbc-client:$version")
implementation("io.vertx:vertx-circuit-breaker:$version")
}
}

val version41Test by registering(JvmTestSuite::class) {
dependencies {
// this only exists to make Intellij happy since it doesn't (currently at least) understand our
// inclusion of this artifact inside :testing-common
compileOnly(project.dependencies.project(":testing:armeria-shaded-for-testing", configuration = "shadow"))

val version = if (testLatestDeps) "4.+" else "4.1.0"
implementation("org.hsqldb:hsqldb:2.3.4")
compileOnly("io.vertx:vertx-codegen:$vertxVersion")
implementation("io.vertx:vertx-web:$vertxVersion")
implementation("io.vertx:vertx-rx-java2:$vertxVersion")
implementation("io.vertx:vertx-web-client:$vertxVersion")
implementation("io.vertx:vertx-jdbc-client:$vertxVersion")
implementation("io.vertx:vertx-circuit-breaker:$vertxVersion")
compileOnly("io.vertx:vertx-codegen:$version")
implementation("io.vertx:vertx-web:$version")
implementation("io.vertx:vertx-rx-java2:$version")
implementation("io.vertx:vertx-web-client:$version")
implementation("io.vertx:vertx-jdbc-client:$version")
implementation("io.vertx:vertx-circuit-breaker:$version")
}
}

val latestDepTest by registering(JvmTestSuite::class) {
val version5Test by registering(JvmTestSuite::class) {
dependencies {
// this only exists to make Intellij happy since it doesn't (currently at least) understand our
// inclusion of this artifact inside :testing-common
compileOnly(project.dependencies.project(":testing:armeria-shaded-for-testing", configuration = "shadow"))

val version = if (testLatestDeps) "latest.release" else "5.0.0"
implementation("org.hsqldb:hsqldb:2.3.4")
implementation("io.vertx:vertx-web:4.+")
implementation("io.vertx:vertx-rx-java2:4.+")
implementation("io.vertx:vertx-web-client:4.+")
implementation("io.vertx:vertx-jdbc-client:4.+")
implementation("io.vertx:vertx-circuit-breaker:4.+")
compileOnly("io.vertx:vertx-codegen:$version")
implementation("io.vertx:vertx-web:$version")
implementation("io.vertx:vertx-rx-java2:$version")
implementation("io.vertx:vertx-web-client:$version")
implementation("io.vertx:vertx-jdbc-client:$version")
implementation("io.vertx:vertx-circuit-breaker:$version")
}
}
}
}

val testLatestDeps = findProperty("testLatestDeps") as Boolean

tasks {
if (testLatestDeps) {
// disable regular test running and compiling tasks when latest dep test task is run
named("test") {
enabled = false
}
named("compileTestGroovy") {
named("compileVersion5TestJava", JavaCompile::class).configure {
options.release.set(11)
}
val testJavaVersion =
gradle.startParameter.projectProperties.get("testJavaVersion")?.let(JavaVersion::toVersion)
?: JavaVersion.current()
if (!testJavaVersion.isCompatibleWith(JavaVersion.VERSION_11)) {
named("version5Test", Test::class).configure {
enabled = false
}
}

named("latestDepTest") {
enabled = testLatestDeps
}

check {
dependsOn(testing.suites)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.netty.handler.codec.haproxy;

// instrumentation fails without this class
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class HAProxyMessage {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.netty.handler.codec.haproxy;

// instrumentation fails without this class
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class HAProxyProxiedProtocol {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client;

import io.vertx.core.AsyncResult;
import io.vertx.reactivex.circuitbreaker.CircuitBreaker;
import io.vertx.reactivex.ext.web.client.HttpRequest;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

public class VertxRxCircuitBreakerSingleConnection extends VertxRxSingleConnection {
private final CircuitBreaker breaker;

public VertxRxCircuitBreakerSingleConnection(String host, int port, CircuitBreaker breaker) {
super(host, port);
this.breaker = breaker;
}

@Override
protected HttpResponse<?> fetchResponse(HttpRequest<?> request) {
CompletableFuture<Object> future = new CompletableFuture<>();

sendRequestWithCallback(
request,
it -> {
if (it.succeeded()) {
future.complete(it.result());
} else {
future.completeExceptionally(it.cause());
}
});

return (HttpResponse<?>) future.join();
}

private void sendRequestWithCallback(HttpRequest<?> request, Consumer<AsyncResult<?>> consumer) {
breaker
.execute(
command ->
request
.rxSend()
.doOnSuccess(command::complete)
.doOnError(command::fail)
.subscribe())
.onComplete(consumer::accept);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.reactivex.circuitbreaker.CircuitBreaker;
import io.vertx.reactivex.core.Promise;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.ext.web.client.HttpRequest;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import io.vertx.reactivex.ext.web.client.WebClient;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.junit.jupiter.api.extension.RegisterExtension;

class VertxRxCircuitBreakerWebClientTest extends AbstractHttpClientTest<HttpRequest<?>> {

@RegisterExtension
static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent();

private final Vertx vertx = Vertx.vertx(new VertxOptions());
private final WebClient httpClient = buildClient(vertx);
private final CircuitBreaker breaker =
CircuitBreaker.create(
"my-circuit-breaker",
vertx,
new CircuitBreakerOptions()
.setTimeout(-1) // Disable the timeout otherwise it makes each test take this long.
);

private static WebClient buildClient(Vertx vertx) {
WebClientOptions clientOptions =
new WebClientOptions().setConnectTimeout(Math.toIntExact(CONNECTION_TIMEOUT.toMillis()));
return WebClient.create(vertx, clientOptions);
}

@Override
public HttpRequest<Buffer> buildRequest(String method, URI uri, Map<String, String> headers) {
HttpRequest<Buffer> request = httpClient.requestAbs(HttpMethod.valueOf(method), uri.toString());
headers.forEach(request::putHeader);
return request;
}

@Override
public int sendRequest(
HttpRequest<?> request, String method, URI uri, Map<String, String> headers)
throws ExecutionException, InterruptedException {
// VertxRx doesn't seem to provide a synchronous API at all for circuit breaker. Bridge through
// a callback.
CompletableFuture<Integer> future = new CompletableFuture<>();
sendRequestWithCallback(
request,
result -> {
if (result.succeeded()) {
future.complete(result.result().statusCode());
} else {
future.completeExceptionally(result.cause());
}
});

return future.get();
}

@SuppressWarnings("ResultOfMethodCallIgnored")
private void sendRequestWithCallback(
HttpRequest<?> request, Consumer<AsyncResult<HttpResponse<?>>> consumer) {
breaker
.execute(
(Handler<Promise<HttpResponse<?>>>)
command -> request.rxSend().subscribe(command::complete, command::fail))
.onComplete(consumer::accept);
}

@Override
public void sendRequestWithCallback(
HttpRequest<?> request,
String method,
URI uri,
Map<String, String> headers,
HttpClientResult requestResult) {
sendRequestWithCallback(
request,
result -> {
if (result.succeeded()) {
requestResult.complete(result.result().statusCode());
} else {
requestResult.complete(result.cause());
}
});
}

@Override
protected void configure(HttpClientTestOptions.Builder optionsBuilder) {
optionsBuilder.disableTestRedirects();
optionsBuilder.disableTestHttps();
optionsBuilder.disableTestReadTimeout();
optionsBuilder.setHttpAttributes(VertxRxCircuitBreakerWebClientTest::getHttpAttributes);
optionsBuilder.setExpectedClientSpanNameMapper(
VertxRxCircuitBreakerWebClientTest::expectedClientSpanName);
optionsBuilder.setSingleConnectionFactory(
(host, port) -> new VertxRxCircuitBreakerSingleConnection(host, port, breaker));
}

private static Set<AttributeKey<?>> getHttpAttributes(URI uri) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "http://192.0.2.1/": // non routable address
return Collections.emptySet();
default:
return HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES;
}
}

private static String expectedClientSpanName(URI uri, String method) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "http://192.0.2.1/": // non routable address
return "CONNECT";
default:
return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply(uri, method);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client;

import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.PoolOptions;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.ext.web.client.HttpRequest;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import io.vertx.reactivex.ext.web.client.WebClient;
import java.util.Map;
import java.util.Objects;

public class VertxRxSingleConnection implements SingleConnection {
private final WebClient webClient;
private final String host;
private final int port;

public VertxRxSingleConnection(String host, int port) {
this.host = host;
this.port = port;

WebClientOptions clientOptions =
new WebClientOptions().setConnectTimeout(5000).setKeepAlive(true).setPipelining(true);
PoolOptions poolOptions = new PoolOptions().setHttp1MaxSize(1);

Vertx vertx = Vertx.vertx(new VertxOptions());
this.webClient = WebClient.create(vertx, clientOptions, poolOptions);
}

@Override
public int doRequest(String path, Map<String, String> headers) {
String requestId = Objects.requireNonNull(headers.get(REQUEST_ID_HEADER));

HttpRequest<Buffer> request = webClient.request(HttpMethod.GET, port, host, path);
headers.forEach(request::putHeader);

HttpResponse<?> response = fetchResponse(request);

String responseId = response.getHeader(REQUEST_ID_HEADER);
if (!requestId.equals(responseId)) {
throw new IllegalStateException(
String.format("Received response with id %s, expected %s", responseId, requestId));
}

return response.statusCode();
}

protected HttpResponse<?> fetchResponse(HttpRequest<?> request) {
return request.rxSend().blockingGet();
}
}
Loading
Loading