diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/build.gradle.kts b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/build.gradle.kts index cd1cdaaa78d8..49dc47e87439 100644 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/build.gradle.kts +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/build.gradle.kts @@ -22,9 +22,13 @@ dependencies { testInstrumentation(project(":instrumentation:rxjava:rxjava-2.0:javaagent")) testInstrumentation(project(":instrumentation:vertx:vertx-http-client:vertx-http-client-3.0:javaagent")) testInstrumentation(project(":instrumentation:vertx:vertx-http-client:vertx-http-client-4.0:javaagent")) + testInstrumentation(project(":instrumentation:vertx:vertx-http-client:vertx-http-client-5.0:javaagent")) + testInstrumentation(project(":instrumentation:vertx:vertx-sql-client:vertx-sql-client-5.0:javaagent")) testInstrumentation(project(":instrumentation:vertx:vertx-web-3.0:javaagent")) } +val testLatestDeps = findProperty("testLatestDeps") as Boolean + testing { suites { val version35Test by registering(JvmTestSuite::class) { @@ -33,50 +37,66 @@ testing { // inclusion of this artifact inside :testing-common compileOnly(project.dependencies.project(":testing:armeria-shaded-for-testing", configuration = "shadow")) + val version = if (testLatestDeps) "3.+" else "3.5.0" + implementation("org.hsqldb:hsqldb:2.3.4") + compileOnly("io.vertx:vertx-codegen:$version") + implementation("io.vertx:vertx-web:$version") + implementation("io.vertx:vertx-rx-java2:$version") + implementation("io.vertx:vertx-web-client:$version") + implementation("io.vertx:vertx-jdbc-client:$version") + implementation("io.vertx:vertx-circuit-breaker:$version") + } + } + + val version41Test by registering(JvmTestSuite::class) { + dependencies { + // this only exists to make Intellij happy since it doesn't (currently at least) understand our + // inclusion of this artifact inside :testing-common + compileOnly(project.dependencies.project(":testing:armeria-shaded-for-testing", configuration = "shadow")) + + val version = if (testLatestDeps) "4.+" else "4.1.0" implementation("org.hsqldb:hsqldb:2.3.4") - compileOnly("io.vertx:vertx-codegen:$vertxVersion") - implementation("io.vertx:vertx-web:$vertxVersion") - implementation("io.vertx:vertx-rx-java2:$vertxVersion") - implementation("io.vertx:vertx-web-client:$vertxVersion") - implementation("io.vertx:vertx-jdbc-client:$vertxVersion") - implementation("io.vertx:vertx-circuit-breaker:$vertxVersion") + compileOnly("io.vertx:vertx-codegen:$version") + implementation("io.vertx:vertx-web:$version") + implementation("io.vertx:vertx-rx-java2:$version") + implementation("io.vertx:vertx-web-client:$version") + implementation("io.vertx:vertx-jdbc-client:$version") + implementation("io.vertx:vertx-circuit-breaker:$version") } } - val latestDepTest by registering(JvmTestSuite::class) { + val version5Test by registering(JvmTestSuite::class) { dependencies { // this only exists to make Intellij happy since it doesn't (currently at least) understand our // inclusion of this artifact inside :testing-common compileOnly(project.dependencies.project(":testing:armeria-shaded-for-testing", configuration = "shadow")) + val version = if (testLatestDeps) "latest.release" else "5.0.0" implementation("org.hsqldb:hsqldb:2.3.4") - implementation("io.vertx:vertx-web:4.+") - implementation("io.vertx:vertx-rx-java2:4.+") - implementation("io.vertx:vertx-web-client:4.+") - implementation("io.vertx:vertx-jdbc-client:4.+") - implementation("io.vertx:vertx-circuit-breaker:4.+") + compileOnly("io.vertx:vertx-codegen:$version") + implementation("io.vertx:vertx-web:$version") + implementation("io.vertx:vertx-rx-java2:$version") + implementation("io.vertx:vertx-web-client:$version") + implementation("io.vertx:vertx-jdbc-client:$version") + implementation("io.vertx:vertx-circuit-breaker:$version") } } } } -val testLatestDeps = findProperty("testLatestDeps") as Boolean - tasks { - if (testLatestDeps) { - // disable regular test running and compiling tasks when latest dep test task is run - named("test") { - enabled = false - } - named("compileTestGroovy") { + named("compileVersion5TestJava", JavaCompile::class).configure { + options.release.set(11) + } + val testJavaVersion = + gradle.startParameter.projectProperties.get("testJavaVersion")?.let(JavaVersion::toVersion) + ?: JavaVersion.current() + if (!testJavaVersion.isCompatibleWith(JavaVersion.VERSION_11)) { + named("version5Test", Test::class).configure { enabled = false } } - named("latestDepTest") { - enabled = testLatestDeps - } - check { dependsOn(testing.suites) } diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/netty/handler/codec/haproxy/HAProxyMessage.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/netty/handler/codec/haproxy/HAProxyMessage.java similarity index 100% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/netty/handler/codec/haproxy/HAProxyMessage.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/netty/handler/codec/haproxy/HAProxyMessage.java diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/netty/handler/codec/haproxy/HAProxyProxiedProtocol.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/netty/handler/codec/haproxy/HAProxyProxiedProtocol.java similarity index 100% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/netty/handler/codec/haproxy/HAProxyProxiedProtocol.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/netty/handler/codec/haproxy/HAProxyProxiedProtocol.java diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java similarity index 100% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java similarity index 100% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java similarity index 100% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java similarity index 100% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxHttpServerTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxHttpServerTest.java similarity index 100% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxHttpServerTest.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxHttpServerTest.java diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java similarity index 100% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java similarity index 100% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java similarity index 100% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java similarity index 100% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java similarity index 100% rename from instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/latestDepTest/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java rename to instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/netty/handler/codec/haproxy/HAProxyMessage.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/netty/handler/codec/haproxy/HAProxyMessage.java new file mode 100644 index 000000000000..3dcf2cafedfe --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/netty/handler/codec/haproxy/HAProxyMessage.java @@ -0,0 +1,10 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.netty.handler.codec.haproxy; + +// instrumentation fails without this class +@SuppressWarnings("checkstyle:AbbreviationAsWordInName") +public class HAProxyMessage {} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/netty/handler/codec/haproxy/HAProxyProxiedProtocol.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/netty/handler/codec/haproxy/HAProxyProxiedProtocol.java new file mode 100644 index 000000000000..5659024f96a0 --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/netty/handler/codec/haproxy/HAProxyProxiedProtocol.java @@ -0,0 +1,10 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.netty.handler.codec.haproxy; + +// instrumentation fails without this class +@SuppressWarnings("checkstyle:AbbreviationAsWordInName") +public class HAProxyProxiedProtocol {} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java new file mode 100644 index 000000000000..deaddbe083a0 --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerSingleConnection.java @@ -0,0 +1,51 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client; + +import io.vertx.core.AsyncResult; +import io.vertx.reactivex.circuitbreaker.CircuitBreaker; +import io.vertx.reactivex.ext.web.client.HttpRequest; +import io.vertx.reactivex.ext.web.client.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +public class VertxRxCircuitBreakerSingleConnection extends VertxRxSingleConnection { + private final CircuitBreaker breaker; + + public VertxRxCircuitBreakerSingleConnection(String host, int port, CircuitBreaker breaker) { + super(host, port); + this.breaker = breaker; + } + + @Override + protected HttpResponse fetchResponse(HttpRequest request) { + CompletableFuture future = new CompletableFuture<>(); + + sendRequestWithCallback( + request, + it -> { + if (it.succeeded()) { + future.complete(it.result()); + } else { + future.completeExceptionally(it.cause()); + } + }); + + return (HttpResponse) future.join(); + } + + private void sendRequestWithCallback(HttpRequest request, Consumer> consumer) { + breaker + .execute( + command -> + request + .rxSend() + .doOnSuccess(command::complete) + .doOnError(command::fail) + .subscribe()) + .onComplete(consumer::accept); + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java new file mode 100644 index 000000000000..df2b057e87ea --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxCircuitBreakerWebClientTest.java @@ -0,0 +1,143 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; +import io.vertx.circuitbreaker.CircuitBreakerOptions; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.VertxOptions; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpMethod; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.reactivex.circuitbreaker.CircuitBreaker; +import io.vertx.reactivex.core.Promise; +import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.ext.web.client.HttpRequest; +import io.vertx.reactivex.ext.web.client.HttpResponse; +import io.vertx.reactivex.ext.web.client.WebClient; +import java.net.URI; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxRxCircuitBreakerWebClientTest extends AbstractHttpClientTest> { + + @RegisterExtension + static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent(); + + private final Vertx vertx = Vertx.vertx(new VertxOptions()); + private final WebClient httpClient = buildClient(vertx); + private final CircuitBreaker breaker = + CircuitBreaker.create( + "my-circuit-breaker", + vertx, + new CircuitBreakerOptions() + .setTimeout(-1) // Disable the timeout otherwise it makes each test take this long. + ); + + private static WebClient buildClient(Vertx vertx) { + WebClientOptions clientOptions = + new WebClientOptions().setConnectTimeout(Math.toIntExact(CONNECTION_TIMEOUT.toMillis())); + return WebClient.create(vertx, clientOptions); + } + + @Override + public HttpRequest buildRequest(String method, URI uri, Map headers) { + HttpRequest request = httpClient.requestAbs(HttpMethod.valueOf(method), uri.toString()); + headers.forEach(request::putHeader); + return request; + } + + @Override + public int sendRequest( + HttpRequest request, String method, URI uri, Map headers) + throws ExecutionException, InterruptedException { + // VertxRx doesn't seem to provide a synchronous API at all for circuit breaker. Bridge through + // a callback. + CompletableFuture future = new CompletableFuture<>(); + sendRequestWithCallback( + request, + result -> { + if (result.succeeded()) { + future.complete(result.result().statusCode()); + } else { + future.completeExceptionally(result.cause()); + } + }); + + return future.get(); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + private void sendRequestWithCallback( + HttpRequest request, Consumer>> consumer) { + breaker + .execute( + (Handler>>) + command -> request.rxSend().subscribe(command::complete, command::fail)) + .onComplete(consumer::accept); + } + + @Override + public void sendRequestWithCallback( + HttpRequest request, + String method, + URI uri, + Map headers, + HttpClientResult requestResult) { + sendRequestWithCallback( + request, + result -> { + if (result.succeeded()) { + requestResult.complete(result.result().statusCode()); + } else { + requestResult.complete(result.cause()); + } + }); + } + + @Override + protected void configure(HttpClientTestOptions.Builder optionsBuilder) { + optionsBuilder.disableTestRedirects(); + optionsBuilder.disableTestHttps(); + optionsBuilder.disableTestReadTimeout(); + optionsBuilder.setHttpAttributes(VertxRxCircuitBreakerWebClientTest::getHttpAttributes); + optionsBuilder.setExpectedClientSpanNameMapper( + VertxRxCircuitBreakerWebClientTest::expectedClientSpanName); + optionsBuilder.setSingleConnectionFactory( + (host, port) -> new VertxRxCircuitBreakerSingleConnection(host, port, breaker)); + } + + private static Set> getHttpAttributes(URI uri) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + return Collections.emptySet(); + default: + return HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES; + } + } + + private static String expectedClientSpanName(URI uri, String method) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + return "CONNECT"; + default: + return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply(uri, method); + } + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java new file mode 100644 index 000000000000..cda07b5c89cc --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxSingleConnection.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client; + +import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection; +import io.vertx.core.VertxOptions; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.PoolOptions; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.ext.web.client.HttpRequest; +import io.vertx.reactivex.ext.web.client.HttpResponse; +import io.vertx.reactivex.ext.web.client.WebClient; +import java.util.Map; +import java.util.Objects; + +public class VertxRxSingleConnection implements SingleConnection { + private final WebClient webClient; + private final String host; + private final int port; + + public VertxRxSingleConnection(String host, int port) { + this.host = host; + this.port = port; + + WebClientOptions clientOptions = + new WebClientOptions().setConnectTimeout(5000).setKeepAlive(true).setPipelining(true); + PoolOptions poolOptions = new PoolOptions().setHttp1MaxSize(1); + + Vertx vertx = Vertx.vertx(new VertxOptions()); + this.webClient = WebClient.create(vertx, clientOptions, poolOptions); + } + + @Override + public int doRequest(String path, Map headers) { + String requestId = Objects.requireNonNull(headers.get(REQUEST_ID_HEADER)); + + HttpRequest request = webClient.request(HttpMethod.GET, port, host, path); + headers.forEach(request::putHeader); + + HttpResponse response = fetchResponse(request); + + String responseId = response.getHeader(REQUEST_ID_HEADER); + if (!requestId.equals(responseId)) { + throw new IllegalStateException( + String.format("Received response with id %s, expected %s", responseId, requestId)); + } + + return response.statusCode(); + } + + protected HttpResponse fetchResponse(HttpRequest request) { + return request.rxSend().blockingGet(); + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java new file mode 100644 index 000000000000..afb69b9f0db3 --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/client/VertxRxWebClientTest.java @@ -0,0 +1,116 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.client; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; +import io.reactivex.functions.Consumer; +import io.vertx.core.VertxOptions; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpMethod; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.ext.web.client.HttpRequest; +import io.vertx.reactivex.ext.web.client.HttpResponse; +import io.vertx.reactivex.ext.web.client.WebClient; +import java.net.URI; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxRxWebClientTest extends AbstractHttpClientTest> { + + @RegisterExtension + static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent(); + + private final WebClient httpClient = buildClient(); + + private static WebClient buildClient() { + Vertx vertx = Vertx.vertx(new VertxOptions()); + WebClientOptions clientOptions = + new WebClientOptions().setConnectTimeout(Math.toIntExact(CONNECTION_TIMEOUT.toMillis())); + return WebClient.create(vertx, clientOptions); + } + + @Override + public HttpRequest buildRequest(String method, URI uri, Map headers) { + HttpRequest request = httpClient.requestAbs(HttpMethod.valueOf(method), uri.toString()); + headers.forEach(request::putHeader); + return request; + } + + @Override + public int sendRequest( + HttpRequest request, String method, URI uri, Map headers) { + return request.rxSend().blockingGet().statusCode(); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + @Override + public void sendRequestWithCallback( + HttpRequest request, + String method, + URI uri, + Map headers, + HttpClientResult requestResult) { + request + .rxSend() + .subscribe( + (Consumer>) + httpResponse -> requestResult.complete(httpResponse.statusCode()), + requestResult::complete); + } + + @Override + protected void configure(HttpClientTestOptions.Builder optionsBuilder) { + optionsBuilder.disableTestRedirects(); + optionsBuilder.disableTestHttps(); + optionsBuilder.disableTestReadTimeout(); + optionsBuilder.setHttpAttributes(VertxRxWebClientTest::getHttpAttributes); + optionsBuilder.setClientSpanErrorMapper(VertxRxWebClientTest::clientSpanError); + optionsBuilder.setExpectedClientSpanNameMapper(VertxRxWebClientTest::expectedClientSpanName); + optionsBuilder.setSingleConnectionFactory(VertxRxSingleConnection::new); + } + + private static Set> getHttpAttributes(URI uri) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + return Collections.emptySet(); + default: + return HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES; + } + } + + private static Throwable clientSpanError(URI uri, Throwable exception) { + if (exception.getClass() == RuntimeException.class) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + exception = exception.getCause(); + break; + default: + break; + } + } + return exception; + } + + private static String expectedClientSpanName(URI uri, String method) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + return "CONNECT"; + default: + return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply(uri, method); + } + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxHttpServerTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxHttpServerTest.java new file mode 100644 index 000000000000..fcd029eb195e --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxHttpServerTest.java @@ -0,0 +1,110 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION; + +import io.opentelemetry.instrumentation.api.internal.HttpConstants; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions; +import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.json.JsonObject; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.ext.web.Router; +import io.vertx.reactivex.ext.web.RoutingContext; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +abstract class AbstractVertxRxHttpServerTest extends AbstractHttpServerTest { + static final String CONFIG_HTTP_SERVER_PORT = "http.server.port"; + + @Override + protected Vertx setupServer() throws Exception { + Vertx server = + Vertx.vertx( + new VertxOptions() + // Useful for debugging: + // .setBlockedThreadCheckInterval(Integer.MAX_VALUE) + ); + CompletableFuture future = new CompletableFuture<>(); + server + .deployVerticle( + verticle().getName(), + new DeploymentOptions() + .setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port)) + .setInstances(3)) + .onComplete( + result -> { + if (!result.succeeded()) { + throw new IllegalStateException("Cannot deploy server Verticle", result.cause()); + } + future.complete(null); + }); + + future.get(30, TimeUnit.SECONDS); + return server; + } + + @Override + protected void stopServer(Vertx vertx) { + vertx.close(); + } + + @Override + protected void configure(HttpServerTestOptions options) { + super.configure(options); + + options.setTestPathParam(true); + // server spans are ended inside the controller spans + options.setVerifyServerSpanEndTime(false); + options.setExpectedHttpRoute( + (endpoint, method) -> { + if (HttpConstants._OTHER.equals(method)) { + return getContextPath() + endpoint.getPath(); + } + return expectedHttpRoute(endpoint, method); + }); + } + + protected Class verticle() { + return VertxReactiveWebServer.class; + } + + public static class VertxReactiveWebServer extends AbstractVertxRxVerticle { + @Override + void handle(RoutingContext ctx, ServerEndpoint endpoint, Runnable action) { + controller(endpoint, action::run); + } + + @Override + public void start(Promise startFuture) { + int port = config().getInteger(CONFIG_HTTP_SERVER_PORT); + Router router = Router.router(vertx); + + configure(router); + router + .route(EXCEPTION.getPath()) + .handler( + ctx -> + handle( + ctx, + EXCEPTION, + () -> { + throw new IllegalStateException(EXCEPTION.getBody()); + })); + + vertx + .createHttpServer() + .requestHandler(router) + .listen(port) + .onComplete(httpServerAsyncResult -> startFuture.complete()); + } + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java new file mode 100644 index 000000000000..b3cd1ad73637 --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/AbstractVertxRxVerticle.java @@ -0,0 +1,110 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.PATH_PARAM; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS; + +import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.ext.web.Router; +import io.vertx.reactivex.ext.web.RoutingContext; + +abstract class AbstractVertxRxVerticle extends AbstractVerticle { + + abstract void handle(RoutingContext ctx, ServerEndpoint endpoint, Runnable action); + + void configure(Router router) { + router + .route(SUCCESS.getPath()) + .handler( + ctx -> + handle( + ctx, + SUCCESS, + () -> + ctx.response().setStatusCode(SUCCESS.getStatus()).end(SUCCESS.getBody()))); + + router + .route(INDEXED_CHILD.getPath()) + .handler( + ctx -> + handle( + ctx, + INDEXED_CHILD, + () -> { + INDEXED_CHILD.collectSpanAttributes( + parameter -> ctx.request().params().get(parameter)); + ctx.response() + .setStatusCode(INDEXED_CHILD.getStatus()) + .end(INDEXED_CHILD.getBody()); + })); + + router + .route(QUERY_PARAM.getPath()) + .handler( + ctx -> + handle( + ctx, + QUERY_PARAM, + () -> + ctx.response() + .setStatusCode(QUERY_PARAM.getStatus()) + .end(ctx.request().query()))); + + router + .route(REDIRECT.getPath()) + .handler( + ctx -> + handle( + ctx, + REDIRECT, + () -> + ctx.response() + .setStatusCode(REDIRECT.getStatus()) + .putHeader("location", REDIRECT.getBody()) + .end())); + + router + .route(ERROR.getPath()) + .handler( + ctx -> + handle( + ctx, + ERROR, + () -> ctx.response().setStatusCode(ERROR.getStatus()).end(ERROR.getBody()))); + + router + .route("/path/:id/param") + .handler( + ctx -> + handle( + ctx, + PATH_PARAM, + () -> + ctx.response() + .setStatusCode(PATH_PARAM.getStatus()) + .end(ctx.request().getParam("id")))); + + router + .route(CAPTURE_HEADERS.getPath()) + .handler( + ctx -> + handle( + ctx, + CAPTURE_HEADERS, + () -> + ctx.response() + .setStatusCode(CAPTURE_HEADERS.getStatus()) + .putHeader("X-Test-Response", ctx.request().getHeader("X-Test-Request")) + .end(CAPTURE_HEADERS.getBody()))); + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java new file mode 100644 index 000000000000..0d830e9c2a0a --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactivePropagationTest.java @@ -0,0 +1,229 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS; +import static io.opentelemetry.javaagent.instrumentation.vertx.reactive.server.VertxReactiveWebServer.TEST_REQUEST_ID_ATTRIBUTE; +import static io.opentelemetry.javaagent.instrumentation.vertx.reactive.server.VertxReactiveWebServer.TEST_REQUEST_ID_PARAMETER; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.ClientAttributes.CLIENT_ADDRESS; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_ROUTE; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PEER_ADDRESS; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PEER_PORT; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.UrlAttributes.URL_PATH; +import static io.opentelemetry.semconv.UrlAttributes.URL_QUERY; +import static io.opentelemetry.semconv.UrlAttributes.URL_SCHEME; +import static io.opentelemetry.semconv.UserAgentAttributes.USER_AGENT_ORIGINAL; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_OPERATION; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_SQL_TABLE; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_STATEMENT; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapSetter; +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.TraceAssert; +import io.opentelemetry.testing.internal.armeria.client.WebClient; +import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse; +import io.opentelemetry.testing.internal.armeria.common.HttpRequest; +import io.opentelemetry.testing.internal.armeria.common.HttpRequestBuilder; +import io.vertx.reactivex.core.Vertx; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxReactivePropagationTest { + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + + private static WebClient client; + private static int port; + private static Vertx server; + + @BeforeAll + static void setUp() throws ExecutionException, InterruptedException, TimeoutException { + port = PortUtils.findOpenPort(); + server = VertxReactiveWebServer.start(port); + client = WebClient.of("h1c://localhost:" + port); + } + + @AfterAll + static void cleanUp() { + server.close(); + } + + // Verifies that context is correctly propagated and sql query span has correct parent. + // Tests io.opentelemetry.javaagent.instrumentation.vertx.reactive.VertxRxInstrumentation + @SuppressWarnings("deprecation") // uses deprecated db semconv + @Test + void contextPropagation() { + AggregatedHttpResponse response = client.get("/listProducts").aggregate().join(); + assertThat(response.status().code()).isEqualTo(SUCCESS.getStatus()); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("GET /listProducts") + .hasKind(SpanKind.SERVER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + equalTo(NETWORK_PEER_ADDRESS, "127.0.0.1"), + satisfies(NETWORK_PEER_PORT, val -> val.isInstanceOf(Long.class)), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, port), + equalTo(CLIENT_ADDRESS, "127.0.0.1"), + equalTo(URL_PATH, "/listProducts"), + equalTo(HTTP_REQUEST_METHOD, "GET"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(URL_SCHEME, "http"), + satisfies(USER_AGENT_ORIGINAL, val -> val.isInstanceOf(String.class)), + equalTo(HTTP_ROUTE, "/listProducts")), + span -> + span.hasName("handleListProducts") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)), + span -> + span.hasName("listProducts") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)), + span -> + span.hasName("SELECT products") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + equalTo( + maybeStable(DB_STATEMENT), + "SELECT id, name, price, weight FROM products"), + equalTo(maybeStable(DB_OPERATION), "SELECT"), + equalTo(maybeStable(DB_SQL_TABLE), "products")))); + } + + @SuppressWarnings("deprecation") // uses deprecated db semconv + @Test + void highConcurrency() { + int count = 100; + String baseUrl = "/listProducts"; + CountDownLatch latch = new CountDownLatch(1); + + ExecutorService pool = Executors.newFixedThreadPool(8); + cleanup.deferCleanup(pool::shutdownNow); + TextMapPropagator propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + TextMapSetter setter = + (carrier, name, value) -> carrier.header(name, value); + + for (int i = 0; i < count; i++) { + int index = i; + pool.submit( + () -> { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + testing.runWithSpan( + "client " + index, + () -> { + HttpRequestBuilder builder = + HttpRequest.builder() + .get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index); + Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index); + propagator.inject(Context.current(), builder, setter); + client.execute(builder.build()).aggregate().join(); + }); + }); + } + + latch.countDown(); + + List> assertions = new ArrayList<>(); + for (int i = 0; i < count; i++) { + assertions.add( + trace -> { + long requestId = + Long.parseLong(trace.getSpan(0).getName().substring("client ".length())); + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("client " + requestId) + .hasKind(SpanKind.INTERNAL) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(longKey(TEST_REQUEST_ID_ATTRIBUTE), requestId)), + span -> + span.hasName("GET /listProducts") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + equalTo(NETWORK_PEER_ADDRESS, "127.0.0.1"), + satisfies(NETWORK_PEER_PORT, val -> val.isInstanceOf(Long.class)), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, port), + equalTo(CLIENT_ADDRESS, "127.0.0.1"), + equalTo(URL_PATH, baseUrl), + equalTo(URL_QUERY, TEST_REQUEST_ID_PARAMETER + "=" + requestId), + equalTo(HTTP_REQUEST_METHOD, "GET"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(URL_SCHEME, "http"), + satisfies(USER_AGENT_ORIGINAL, val -> val.isInstanceOf(String.class)), + equalTo(HTTP_ROUTE, "/listProducts"), + equalTo(longKey(TEST_REQUEST_ID_ATTRIBUTE), requestId)), + span -> + span.hasName("handleListProducts") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(longKey(TEST_REQUEST_ID_ATTRIBUTE), requestId)), + span -> + span.hasName("listProducts") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + equalTo(longKey(TEST_REQUEST_ID_ATTRIBUTE), requestId)), + span -> + span.hasName("SELECT products") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(3)) + .hasAttributesSatisfyingExactly( + equalTo( + maybeStable(DB_STATEMENT), + "SELECT id AS request" + + requestId + + ", name, price, weight FROM products"), + equalTo(maybeStable(DB_OPERATION), "SELECT"), + equalTo(maybeStable(DB_SQL_TABLE), "products"))); + }); + } + testing.waitAndAssertTraces(assertions); + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java new file mode 100644 index 000000000000..b2e569281e9a --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxReactiveWebServer.java @@ -0,0 +1,197 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.Single; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.VertxOptions; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.jdbcclient.JDBCConnectOptions; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.core.http.HttpServerResponse; +import io.vertx.reactivex.ext.web.Router; +import io.vertx.reactivex.ext.web.RoutingContext; +import io.vertx.reactivex.jdbcclient.JDBCPool; +import io.vertx.reactivex.sqlclient.SqlConnection; +import io.vertx.sqlclient.PoolOptions; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VertxReactiveWebServer extends AbstractVerticle { + + private static final Logger logger = LoggerFactory.getLogger(VertxReactiveWebServer.class); + + private static final Tracer tracer = GlobalOpenTelemetry.getTracer("test"); + + public static final String TEST_REQUEST_ID_PARAMETER = "test-request-id"; + public static final String TEST_REQUEST_ID_ATTRIBUTE = "test.request.id"; + + private static final String CONFIG_HTTP_SERVER_PORT = "http.server.port"; + private static io.vertx.reactivex.sqlclient.Pool client; + + public static Vertx start(int port) + throws ExecutionException, InterruptedException, TimeoutException { + /* This is highly against Vertx ideas, but our tests are synchronous + so we have to make sure server is up and running */ + CompletableFuture future = new CompletableFuture<>(); + + Vertx server = Vertx.newInstance(io.vertx.core.Vertx.vertx(new VertxOptions())); + client = + JDBCPool.pool( + server, + new JDBCConnectOptions().setJdbcUrl("jdbc:hsqldb:mem:test?shutdown=true"), + new PoolOptions()); + + logger.info("Starting on port {}", port); + server + .deployVerticle( + VertxReactiveWebServer.class.getName(), + new DeploymentOptions().setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port))) + .onComplete( + res -> { + if (!res.succeeded()) { + RuntimeException exception = + new RuntimeException("Cannot deploy server Verticle", res.cause()); + future.completeExceptionally(exception); + } + future.complete(null); + }); + + // block until vertx server is up + future.get(30, TimeUnit.SECONDS); + + return server; + } + + @Override + public void start(Promise startPromise) { + setUpInitialData( + ready -> { + Router router = Router.router(vertx); + int port = config().getInteger(CONFIG_HTTP_SERVER_PORT); + logger.info("Listening on port {}", port); + router + .route(SUCCESS.getPath()) + .handler( + ctx -> ctx.response().setStatusCode(SUCCESS.getStatus()).end(SUCCESS.getBody())); + + router.route("/listProducts").handler(VertxReactiveWebServer::handleListProducts); + + vertx + .createHttpServer() + .requestHandler(router) + .listen(port) + .onComplete(it -> startPromise.complete()); + }); + } + + @SuppressWarnings("CheckReturnValue") + private static void handleListProducts(RoutingContext routingContext) { + Long requestId = extractRequestId(routingContext); + attachRequestIdToCurrentSpan(requestId); + + Span span = tracer.spanBuilder("handleListProducts").startSpan(); + try (Scope ignored = Context.current().with(span).makeCurrent()) { + attachRequestIdToCurrentSpan(requestId); + + HttpServerResponse response = routingContext.response(); + Single jsonArraySingle = listProducts(requestId); + + jsonArraySingle.subscribe( + arr -> response.putHeader("content-type", "application/json").end(arr.encode())); + } finally { + span.end(); + } + } + + private static Single listProducts(Long requestId) { + Span span = tracer.spanBuilder("listProducts").startSpan(); + try (Scope ignored = Context.current().with(span).makeCurrent()) { + attachRequestIdToCurrentSpan(requestId); + String queryInfix = requestId != null ? " AS request" + requestId : ""; + + return client + .query("SELECT id" + queryInfix + ", name, price, weight FROM products") + .rxExecute() + .flatMap( + result -> { + JsonArray arr = new JsonArray(); + result.forEach( + row -> { + JsonArray values = new JsonArray(); + for (int i = 0; i < 4; i++) { + values.add(row.getValue(i)); + } + arr.add(values); + }); + return Single.just(arr); + }); + } finally { + span.end(); + } + } + + private static Long extractRequestId(RoutingContext routingContext) { + String requestIdString = routingContext.request().params().get(TEST_REQUEST_ID_PARAMETER); + return requestIdString != null ? Long.valueOf(requestIdString) : null; + } + + private static void attachRequestIdToCurrentSpan(Long requestId) { + if (requestId != null) { + Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, requestId); + } + } + + private static void setUpInitialData(Handler done) { + client + .getConnection() + .onComplete( + res -> { + if (res.failed()) { + throw new IllegalStateException(res.cause()); + } + + SqlConnection conn = res.result(); + + conn.query( + "CREATE TABLE IF NOT EXISTS products(id INT IDENTITY, name VARCHAR(255), price FLOAT, weight INT)") + .execute() + .onComplete( + ddl -> { + if (ddl.failed()) { + throw new IllegalStateException(ddl.cause()); + } + + conn.query( + "INSERT INTO products (name, price, weight) VALUES ('Egg Whisk', 3.99, 150), ('Tea Cosy', 5.99, 100), ('Spatula', 1.00, 80)") + .execute() + .onComplete( + fixtures -> { + if (fixtures.failed()) { + throw new IllegalStateException(fixtures.cause()); + } + + done.handle(null); + }); + }); + }); + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java new file mode 100644 index 000000000000..e1562b5b42b1 --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxCircuitBreakerHttpServerTest.java @@ -0,0 +1,98 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions; +import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; +import io.vertx.circuitbreaker.CircuitBreakerOptions; +import io.vertx.core.Promise; +import io.vertx.reactivex.circuitbreaker.CircuitBreaker; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.ext.web.Router; +import io.vertx.reactivex.ext.web.RoutingContext; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxRxCircuitBreakerHttpServerTest extends AbstractVertxRxHttpServerTest { + + @RegisterExtension + static final InstrumentationExtension testing = HttpServerInstrumentationExtension.forAgent(); + + @Override + protected void configure(HttpServerTestOptions options) { + super.configure(options); + + options.setTestHttpPipelining(false); + options.setHasExceptionOnServerSpan(endpoint -> endpoint != EXCEPTION); + } + + @Override + protected Class verticle() { + return VertxRxCircuitBreakerWebTestServer.class; + } + + public static class VertxRxCircuitBreakerWebTestServer extends AbstractVertxRxVerticle { + CircuitBreaker breaker; + + @Override + void handle(RoutingContext ctx, ServerEndpoint endpoint, Runnable action) { + breaker + .execute(future -> future.complete(endpoint)) + .onComplete( + result -> { + if (result.failed()) { + throw new IllegalStateException(result.cause()); + } + controller(endpoint, action::run); + }); + } + + @Override + public void start(Promise startPromise) { + int port = config().getInteger(CONFIG_HTTP_SERVER_PORT); + Router router = Router.router(vertx); + breaker = + CircuitBreaker.create( + "my-circuit-breaker", + vertx, + // Disable the timeout otherwise it makes each test take this long. + new CircuitBreakerOptions().setTimeout(-1)); + + configure(router); + router + .route(EXCEPTION.getPath()) + .handler( + ctx -> + breaker + .execute( + future -> future.fail(new IllegalStateException(EXCEPTION.getBody()))) + .onComplete( + result -> { + try { + Throwable cause = result.cause(); + controller( + EXCEPTION, + () -> { + throw cause; + }); + } catch (Throwable throwable) { + ctx.response() + .setStatusCode(EXCEPTION.getStatus()) + .end(throwable.getMessage()); + } + })); + + vertx + .createHttpServer() + .requestHandler(router) + .listen(port) + .onComplete(httpServerAsyncResult -> startPromise.complete()); + } + } +} diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java new file mode 100644 index 000000000000..c3958e1285f8 --- /dev/null +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/reactive/server/VertxRxHttpServerTest.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.reactive.server; + +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; +import io.vertx.core.Promise; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.ext.web.Router; +import io.vertx.reactivex.ext.web.RoutingContext; +import org.junit.jupiter.api.extension.RegisterExtension; + +class VertxRxHttpServerTest extends AbstractVertxRxHttpServerTest { + + @RegisterExtension + static final InstrumentationExtension testing = HttpServerInstrumentationExtension.forAgent(); + + @Override + protected Class verticle() { + return VertxReactiveWebServer.class; + } + + public static class VertxReactiveWebServer extends AbstractVertxRxVerticle { + @Override + void handle(RoutingContext ctx, ServerEndpoint endpoint, Runnable action) { + controller(endpoint, action::run); + } + + @Override + public void start(Promise startFuture) { + int port = config().getInteger(CONFIG_HTTP_SERVER_PORT); + Router router = Router.router(vertx); + + configure(router); + router + .route(EXCEPTION.getPath()) + .handler( + ctx -> + handle( + ctx, + EXCEPTION, + () -> { + throw new IllegalStateException(EXCEPTION.getBody()); + })); + + vertx + .createHttpServer() + .requestHandler(router) + .listen(port) + .onComplete(httpServerAsyncResult -> startFuture.complete()); + } + } +}