diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/build.gradle.kts b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/build.gradle.kts index 816085cecaf9..89d8837adaf5 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/build.gradle.kts +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/build.gradle.kts @@ -15,63 +15,7 @@ dependencies { bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap")) implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common-0.11:library")) - library("io.vertx:vertx-kafka-client:3.6.0") + compileOnly("io.vertx:vertx-kafka-client:3.6.0") // vertx-codegen is needed for Xlint's annotation checking - library("io.vertx:vertx-codegen:3.6.0") - - testImplementation(project(":instrumentation:vertx:vertx-kafka-client-3.6:testing")) - - testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")) - - latestDepTestLibrary("io.vertx:vertx-kafka-client:4.+") // documented limitation, 5.x not supported yet - latestDepTestLibrary("io.vertx:vertx-codegen:4.+") // documented limitation, 5.x not supported yet -} - -val latestDepTest = findProperty("testLatestDeps") as Boolean - -testing { - suites { - val testNoReceiveTelemetry by registering(JvmTestSuite::class) { - dependencies { - implementation(project(":instrumentation:vertx:vertx-kafka-client-3.6:testing")) - - // the "library" configuration is not recognized by the test suite plugin - if (latestDepTest) { - implementation("io.vertx:vertx-kafka-client:4.+") - implementation("io.vertx:vertx-codegen:4.+") - } else { - implementation("io.vertx:vertx-kafka-client:3.6.0") - implementation("io.vertx:vertx-codegen:3.6.0") - } - } - - targets { - all { - testTask.configure { - usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) - - systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) - - jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=false") - jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false") - } - } - } - } - } -} - -tasks { - test { - usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) - - systemProperty("testLatestDeps", latestDepTest) - - jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") - jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") - } - - check { - dependsOn(testing.suites) - } + compileOnly("io.vertx:vertx-codegen:3.6.0") } diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsVertxKafkaTest.java deleted file mode 100644 index 8e0eda18a78f..000000000000 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsVertxKafkaTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; - -import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.sdk.trace.data.LinkData; -import io.opentelemetry.sdk.trace.data.SpanData; -import io.opentelemetry.sdk.trace.data.StatusData; -import io.vertx.kafka.client.producer.KafkaProducerRecord; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; -import org.junit.jupiter.api.Order; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestMethodOrder; - -// ordering is needed to ensure that the error case runs last - throwing errors in the batch handler -// is possible and tolerated, but it messes up the internal state of the vertx kafka consumer -@TestMethodOrder(OrderAnnotation.class) -class BatchRecordsVertxKafkaTest extends AbstractVertxKafkaTest { - - static final CountDownLatch consumerReady = new CountDownLatch(1); - - @BeforeAll - static void setUpTopicAndConsumer() { - // in Vertx, a batch handler is something that runs in addition to the regular single record - // handler -- the KafkaConsumer won't start polling unless you set the regular handler - kafkaConsumer.batchHandler(BatchRecordsHandler.INSTANCE); - kafkaConsumer.handler(record -> testing.runWithSpan("process " + record.value(), () -> {})); - - kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown()); - kafkaConsumer.subscribe("testBatchTopic"); - } - - @Order(1) - @Test - void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { - assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); - - KafkaProducerRecord record1 = - KafkaProducerRecord.create("testBatchTopic", "10", "testSpan1"); - KafkaProducerRecord record2 = - KafkaProducerRecord.create("testBatchTopic", "20", "testSpan2"); - sendBatchMessages(record1, record2); - - AtomicReference producer1 = new AtomicReference<>(); - AtomicReference producer2 = new AtomicReference<>(); - - testing.waitAndAssertSortedTraces( - orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), - trace -> { - trace.hasSpansSatisfyingExactlyInAnyOrder( - span -> span.hasName("producer"), - span -> - span.hasName("testBatchTopic publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(sendAttributes(record1)), - span -> - span.hasName("testBatchTopic publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(sendAttributes(record2))); - - producer1.set(trace.getSpan(1)); - producer2.set(trace.getSpan(2)); - }, - trace -> - trace.hasSpansSatisfyingExactlyInAnyOrder( - span -> - span.hasName("testBatchTopic receive") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasAttributesSatisfyingExactly(receiveAttributes("testBatchTopic")), - - // batch consumer - span -> - span.hasName("testBatchTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(0)) - .hasLinks( - LinkData.create(producer1.get().getSpanContext()), - LinkData.create(producer2.get().getSpanContext())) - .hasAttributesSatisfyingExactly(batchProcessAttributes("testBatchTopic")), - span -> span.hasName("batch consumer").hasParent(trace.getSpan(1)), - - // single consumer 1 - span -> - span.hasName("testBatchTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(0)) - .hasLinks(LinkData.create(producer1.get().getSpanContext())) - .hasAttributesSatisfyingExactly(processAttributes(record1)), - span -> span.hasName("process testSpan1").hasParent(trace.getSpan(3)), - - // single consumer 2 - span -> - span.hasName("testBatchTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(0)) - .hasLinks(LinkData.create(producer2.get().getSpanContext())) - .hasAttributesSatisfyingExactly(processAttributes(record2)), - span -> span.hasName("process testSpan2").hasParent(trace.getSpan(5)))); - } - - @Order(2) - @Test - void shouldHandleFailureInKafkaBatchListener() throws InterruptedException { - assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); - - KafkaProducerRecord record = - KafkaProducerRecord.create("testBatchTopic", "10", "error"); - sendBatchMessages(record); - // make sure that the consumer eats up any leftover records - kafkaConsumer.resume(); - - AtomicReference producer = new AtomicReference<>(); - - // the regular handler is not being called if the batch one fails - testing.waitAndAssertSortedTraces( - orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), - trace -> { - trace.hasSpansSatisfyingExactly( - span -> span.hasName("producer"), - span -> - span.hasName("testBatchTopic publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(sendAttributes(record))); - - producer.set(trace.getSpan(1)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasName("testBatchTopic receive") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasAttributesSatisfyingExactly(receiveAttributes("testBatchTopic")), - - // batch consumer - span -> - span.hasName("testBatchTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(0)) - .hasLinks(LinkData.create(producer.get().getSpanContext())) - .hasStatus(StatusData.error()) - .hasException(new IllegalArgumentException("boom")) - .hasAttributesSatisfyingExactly(batchProcessAttributes("testBatchTopic")), - span -> span.hasName("batch consumer").hasParent(trace.getSpan(1)), - - // single consumer - span -> - span.hasName("testBatchTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(processAttributes(record)), - span -> span.hasName("process error").hasParent(trace.getSpan(3)))); - } -} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafkaTest.java deleted file mode 100644 index 8cfd1043f422..000000000000 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafkaTest.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; - -import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.sdk.trace.data.LinkData; -import io.opentelemetry.sdk.trace.data.SpanData; -import io.opentelemetry.sdk.trace.data.StatusData; -import io.vertx.kafka.client.producer.KafkaProducerRecord; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -class SingleRecordVertxKafkaTest extends AbstractVertxKafkaTest { - - static final CountDownLatch consumerReady = new CountDownLatch(1); - - @BeforeAll - static void setUpTopicAndConsumer() { - kafkaConsumer.handler( - record -> { - testing.runWithSpan("consumer", () -> {}); - if (record.value().equals("error")) { - throw new IllegalArgumentException("boom"); - } - }); - - kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown()); - kafkaConsumer.subscribe("testSingleTopic"); - } - - @Test - void shouldCreateSpansForSingleRecordProcess() throws InterruptedException { - assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); - - KafkaProducerRecord record = - KafkaProducerRecord.create("testSingleTopic", "10", "testSpan"); - CountDownLatch sent = new CountDownLatch(1); - testing.runWithSpan("producer", () -> sendRecord(record, result -> sent.countDown())); - assertTrue(sent.await(30, TimeUnit.SECONDS)); - - AtomicReference producer = new AtomicReference<>(); - - testing.waitAndAssertSortedTraces( - orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), - trace -> { - trace.hasSpansSatisfyingExactly( - span -> span.hasName("producer"), - span -> - span.hasName("testSingleTopic publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(sendAttributes(record))); - - producer.set(trace.getSpan(1)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasName("testSingleTopic receive") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasAttributesSatisfyingExactly(receiveAttributes("testSingleTopic")), - span -> - span.hasName("testSingleTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(0)) - .hasLinks(LinkData.create(producer.get().getSpanContext())) - .hasAttributesSatisfyingExactly(processAttributes(record)), - span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); - } - - @Test - void shouldHandleFailureInSingleRecordHandler() throws InterruptedException { - assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); - - KafkaProducerRecord record = - KafkaProducerRecord.create("testSingleTopic", "10", "error"); - CountDownLatch sent = new CountDownLatch(1); - testing.runWithSpan("producer", () -> sendRecord(record, result -> sent.countDown())); - assertTrue(sent.await(30, TimeUnit.SECONDS)); - - AtomicReference producer = new AtomicReference<>(); - - testing.waitAndAssertSortedTraces( - orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), - trace -> { - trace.hasSpansSatisfyingExactly( - span -> span.hasName("producer"), - span -> - span.hasName("testSingleTopic publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(sendAttributes(record))); - - producer.set(trace.getSpan(1)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasName("testSingleTopic receive") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasAttributesSatisfyingExactly(receiveAttributes("testSingleTopic")), - span -> - span.hasName("testSingleTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(0)) - .hasLinks(LinkData.create(producer.get().getSpanContext())) - .hasStatus(StatusData.error()) - .hasException(new IllegalArgumentException("boom")) - .hasAttributesSatisfyingExactly(processAttributes(record)), - span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); - } -} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetryBatchRecordsVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetryBatchRecordsVertxKafkaTest.java deleted file mode 100644 index 99ff87a9a9dc..000000000000 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetryBatchRecordsVertxKafkaTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; - -import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.sdk.trace.data.LinkData; -import io.opentelemetry.sdk.trace.data.SpanData; -import io.opentelemetry.sdk.trace.data.StatusData; -import io.vertx.kafka.client.producer.KafkaProducerRecord; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.MethodOrderer; -import org.junit.jupiter.api.Order; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestMethodOrder; - -// ordering is needed to ensure that the error case runs last - throwing errors in the batch handler -// is possible and tolerated, but it messes up the internal state of the vertx kafka consumer -@TestMethodOrder(MethodOrderer.OrderAnnotation.class) -class NoReceiveTelemetryBatchRecordsVertxKafkaTest extends AbstractVertxKafkaTest { - - static final CountDownLatch consumerReady = new CountDownLatch(1); - - @BeforeAll - static void setUpTopicAndConsumer() { - // in Vertx, a batch handler is something that runs in addition to the regular single record - // handler -- the KafkaConsumer won't start polling unless you set the regular handler - kafkaConsumer.batchHandler(BatchRecordsHandler.INSTANCE); - kafkaConsumer.handler(record -> testing.runWithSpan("process " + record.value(), () -> {})); - - kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown()); - kafkaConsumer.subscribe("testBatchTopic"); - } - - @Order(1) - @Test - void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { - assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); - - KafkaProducerRecord record1 = - KafkaProducerRecord.create("testBatchTopic", "10", "testSpan1"); - KafkaProducerRecord record2 = - KafkaProducerRecord.create("testBatchTopic", "20", "testSpan2"); - sendBatchMessages(record1, record2); - - AtomicReference producer1 = new AtomicReference<>(); - AtomicReference producer2 = new AtomicReference<>(); - - testing.waitAndAssertSortedTraces( - orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), - trace -> { - trace.hasSpansSatisfyingExactlyInAnyOrder( - span -> span.hasName("producer"), - - // first record - span -> - span.hasName("testBatchTopic publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(sendAttributes(record1)), - span -> - span.hasName("testBatchTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(1)) - .hasAttributesSatisfyingExactly(processAttributes(record1)), - span -> span.hasName("process testSpan1").hasParent(trace.getSpan(2)), - - // second record - span -> - span.hasName("testBatchTopic publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(sendAttributes(record2)), - span -> - span.hasName("testBatchTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(4)) - .hasAttributesSatisfyingExactly(processAttributes(record2)), - span -> span.hasName("process testSpan2").hasParent(trace.getSpan(5))); - - producer1.set(trace.getSpan(1)); - producer2.set(trace.getSpan(4)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - // batch consumer - span -> - span.hasName("testBatchTopic process") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasLinks( - LinkData.create(producer1.get().getSpanContext()), - LinkData.create(producer2.get().getSpanContext())) - .hasAttributesSatisfyingExactly(batchProcessAttributes("testBatchTopic")), - span -> span.hasName("batch consumer").hasParent(trace.getSpan(0)))); - } - - @Order(2) - @Test - void shouldHandleFailureInKafkaBatchListener() throws InterruptedException { - assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); - - KafkaProducerRecord record = - KafkaProducerRecord.create("testBatchTopic", "10", "error"); - sendBatchMessages(record); - // make sure that the consumer eats up any leftover records - kafkaConsumer.resume(); - - AtomicReference producer = new AtomicReference<>(); - - // the regular handler is not being called if the batch one fails - testing.waitAndAssertSortedTraces( - orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), - trace -> { - trace.hasSpansSatisfyingExactly( - span -> span.hasName("producer"), - span -> - span.hasName("testBatchTopic publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(sendAttributes(record)), - span -> - span.hasName("testBatchTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(1)) - .hasAttributesSatisfyingExactly(processAttributes(record)), - span -> span.hasName("process error").hasParent(trace.getSpan(2))); - - producer.set(trace.getSpan(1)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasName("testBatchTopic process") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasLinks(LinkData.create(producer.get().getSpanContext())) - .hasStatus(StatusData.error()) - .hasException(new IllegalArgumentException("boom")) - .hasAttributesSatisfyingExactly(batchProcessAttributes("testBatchTopic")), - span -> span.hasName("batch consumer").hasParent(trace.getSpan(0)))); - } -} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetrySingleRecordVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetrySingleRecordVertxKafkaTest.java deleted file mode 100644 index 446e7356d037..000000000000 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetrySingleRecordVertxKafkaTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.sdk.trace.data.StatusData; -import io.vertx.kafka.client.producer.KafkaProducerRecord; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -class NoReceiveTelemetrySingleRecordVertxKafkaTest extends AbstractVertxKafkaTest { - - static final CountDownLatch consumerReady = new CountDownLatch(1); - - @BeforeAll - static void setUpTopicAndConsumer() { - kafkaConsumer.handler( - record -> { - testing.runWithSpan("consumer", () -> {}); - if (record.value().equals("error")) { - throw new IllegalArgumentException("boom"); - } - }); - - kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown()); - kafkaConsumer.subscribe("testSingleTopic"); - } - - @Test - void shouldCreateSpansForSingleRecordProcess() throws InterruptedException { - assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); - - KafkaProducerRecord record = - KafkaProducerRecord.create("testSingleTopic", "10", "testSpan"); - CountDownLatch sent = new CountDownLatch(1); - testing.runWithSpan("producer", () -> sendRecord(record, result -> sent.countDown())); - assertTrue(sent.await(30, TimeUnit.SECONDS)); - - testing.waitAndAssertTraces( - trace -> - trace.hasSpansSatisfyingExactly( - span -> span.hasName("producer"), - span -> - span.hasName("testSingleTopic publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(sendAttributes(record)), - span -> - span.hasName("testSingleTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(1)) - .hasAttributesSatisfyingExactly(processAttributes(record)), - span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); - } - - @Test - void shouldHandleFailureInSingleRecordHandler() throws InterruptedException { - assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); - - KafkaProducerRecord record = - KafkaProducerRecord.create("testSingleTopic", "10", "error"); - CountDownLatch sent = new CountDownLatch(1); - testing.runWithSpan("producer", () -> sendRecord(record, result -> sent.countDown())); - assertTrue(sent.await(30, TimeUnit.SECONDS)); - - testing.waitAndAssertTraces( - trace -> - trace.hasSpansSatisfyingExactly( - span -> span.hasName("producer"), - span -> - span.hasName("testSingleTopic publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(sendAttributes(record)), - span -> - span.hasName("testSingleTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(1)) - .hasStatus(StatusData.error()) - .hasException(new IllegalArgumentException("boom")) - .hasAttributesSatisfyingExactly(processAttributes(record)), - span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); - } -} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractBatchRecordsNoReceiveTelemetryVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractBatchRecordsNoReceiveTelemetryVertxKafkaTest.java new file mode 100644 index 000000000000..6816c49595d1 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractBatchRecordsNoReceiveTelemetryVertxKafkaTest.java @@ -0,0 +1,156 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka; + +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +// ordering is needed to ensure that the error case runs last - throwing errors in the batch handler +// is possible and tolerated, but it messes up the internal state of the vertx kafka consumer +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public abstract class AbstractBatchRecordsNoReceiveTelemetryVertxKafkaTest + extends AbstractVertxKafkaTest { + + final CountDownLatch consumerReady = new CountDownLatch(1); + + @BeforeAll + void setUpTopicAndConsumer() { + // in Vertx, a batch handler is something that runs in addition to the regular single record + // handler -- the KafkaConsumer won't start polling unless you set the regular handler + kafkaConsumer.batchHandler(BatchRecordsHandler.INSTANCE); + kafkaConsumer.handler(record -> testing().runWithSpan("process " + record.value(), () -> {})); + + kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown()); + subscribe("testBatchTopic"); + } + + @Order(1) + @Test + void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + KafkaProducerRecord record1 = + KafkaProducerRecord.create("testBatchTopic", "10", "testSpan1"); + KafkaProducerRecord record2 = + KafkaProducerRecord.create("testBatchTopic", "20", "testSpan2"); + sendBatchMessages(record1, record2); + + AtomicReference producer1 = new AtomicReference<>(); + AtomicReference producer2 = new AtomicReference<>(); + + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactlyInAnyOrder( + span -> span.hasName("producer"), + + // first record + span -> + span.hasName("testBatchTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record1)), + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly(processAttributes(record1)), + span -> span.hasName("process testSpan1").hasParent(trace.getSpan(2)), + + // second record + span -> + span.hasName("testBatchTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record2)), + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(4)) + .hasAttributesSatisfyingExactly(processAttributes(record2)), + span -> span.hasName("process testSpan2").hasParent(trace.getSpan(5))); + + producer1.set(trace.getSpan(1)); + producer2.set(trace.getSpan(4)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + // batch consumer + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinks( + LinkData.create(producer1.get().getSpanContext()), + LinkData.create(producer2.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + batchProcessAttributes("testBatchTopic")), + span -> span.hasName("batch consumer").hasParent(trace.getSpan(0)))); + } + + @Order(2) + @Test + void shouldHandleFailureInKafkaBatchListener() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + KafkaProducerRecord record = + KafkaProducerRecord.create("testBatchTopic", "10", "error"); + sendBatchMessages(record); + // make sure that the consumer eats up any leftover records + kafkaConsumer.resume(); + + AtomicReference producer = new AtomicReference<>(); + + // the regular handler is not being called if the batch one fails + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testBatchTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record)), + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly(processAttributes(record)), + span -> span.hasName("process error").hasParent(trace.getSpan(2))); + + producer.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly( + batchProcessAttributes("testBatchTopic")), + span -> span.hasName("batch consumer").hasParent(trace.getSpan(0)))); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractBatchRecordsVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractBatchRecordsVertxKafkaTest.java new file mode 100644 index 000000000000..59778c00df93 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractBatchRecordsVertxKafkaTest.java @@ -0,0 +1,172 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka; + +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +// ordering is needed to ensure that the error case runs last - throwing errors in the batch handler +// is possible and tolerated, but it messes up the internal state of the vertx kafka consumer +@TestMethodOrder(OrderAnnotation.class) +public abstract class AbstractBatchRecordsVertxKafkaTest extends AbstractVertxKafkaTest { + + final CountDownLatch consumerReady = new CountDownLatch(1); + + @BeforeAll + void setUpTopicAndConsumer() { + // in Vertx, a batch handler is something that runs in addition to the regular single record + // handler -- the KafkaConsumer won't start polling unless you set the regular handler + kafkaConsumer.batchHandler(BatchRecordsHandler.INSTANCE); + kafkaConsumer.handler(record -> testing().runWithSpan("process " + record.value(), () -> {})); + + kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown()); + subscribe("testBatchTopic"); + } + + @Order(1) + @Test + void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + KafkaProducerRecord record1 = + KafkaProducerRecord.create("testBatchTopic", "10", "testSpan1"); + KafkaProducerRecord record2 = + KafkaProducerRecord.create("testBatchTopic", "20", "testSpan2"); + sendBatchMessages(record1, record2); + + AtomicReference producer1 = new AtomicReference<>(); + AtomicReference producer2 = new AtomicReference<>(); + + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactlyInAnyOrder( + span -> span.hasName("producer"), + span -> + span.hasName("testBatchTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record1)), + span -> + span.hasName("testBatchTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record2))); + + producer1.set(trace.getSpan(1)); + producer2.set(trace.getSpan(2)); + }, + trace -> + trace.hasSpansSatisfyingExactlyInAnyOrder( + span -> + span.hasName("testBatchTopic receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly(receiveAttributes("testBatchTopic")), + + // batch consumer + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks( + LinkData.create(producer1.get().getSpanContext()), + LinkData.create(producer2.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + batchProcessAttributes("testBatchTopic")), + span -> span.hasName("batch consumer").hasParent(trace.getSpan(1)), + + // single consumer 1 + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer1.get().getSpanContext())) + .hasAttributesSatisfyingExactly(processAttributes(record1)), + span -> span.hasName("process testSpan1").hasParent(trace.getSpan(3)), + + // single consumer 2 + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer2.get().getSpanContext())) + .hasAttributesSatisfyingExactly(processAttributes(record2)), + span -> span.hasName("process testSpan2").hasParent(trace.getSpan(5)))); + } + + @Order(2) + @Test + void shouldHandleFailureInKafkaBatchListener() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + KafkaProducerRecord record = + KafkaProducerRecord.create("testBatchTopic", "10", "error"); + sendBatchMessages(record); + // make sure that the consumer eats up any leftover records + kafkaConsumer.resume(); + + AtomicReference producer = new AtomicReference<>(); + + // the regular handler is not being called if the batch one fails + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testBatchTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record))); + + producer.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testBatchTopic receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly(receiveAttributes("testBatchTopic")), + + // batch consumer + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly( + batchProcessAttributes("testBatchTopic")), + span -> span.hasName("batch consumer").hasParent(trace.getSpan(1)), + + // single consumer + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(processAttributes(record)), + span -> span.hasName("process error").hasParent(trace.getSpan(3)))); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractSingleRecordNoReceiveTelemetryVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractSingleRecordNoReceiveTelemetryVertxKafkaTest.java new file mode 100644 index 000000000000..b9412000a91a --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractSingleRecordNoReceiveTelemetryVertxKafkaTest.java @@ -0,0 +1,94 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public abstract class AbstractSingleRecordNoReceiveTelemetryVertxKafkaTest + extends AbstractVertxKafkaTest { + + final CountDownLatch consumerReady = new CountDownLatch(1); + + @BeforeAll + void setUpTopicAndConsumer() { + kafkaConsumer.handler( + record -> { + testing().runWithSpan("consumer", () -> {}); + if (record.value().equals("error")) { + throw new IllegalArgumentException("boom"); + } + }); + + kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown()); + subscribe("testSingleTopic"); + } + + @Test + void shouldCreateSpansForSingleRecordProcess() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + KafkaProducerRecord record = + KafkaProducerRecord.create("testSingleTopic", "10", "testSpan"); + CountDownLatch sent = new CountDownLatch(1); + testing().runWithSpan("producer", () -> sendRecord(record, result -> sent.countDown())); + assertTrue(sent.await(30, TimeUnit.SECONDS)); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testSingleTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record)), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly(processAttributes(record)), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } + + @Test + void shouldHandleFailureInSingleRecordHandler() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + KafkaProducerRecord record = + KafkaProducerRecord.create("testSingleTopic", "10", "error"); + CountDownLatch sent = new CountDownLatch(1); + testing().runWithSpan("producer", () -> sendRecord(record, result -> sent.countDown())); + assertTrue(sent.await(30, TimeUnit.SECONDS)); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testSingleTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record)), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly(processAttributes(record)), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractSingleRecordVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractSingleRecordVertxKafkaTest.java new file mode 100644 index 000000000000..066a8b1895e0 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractSingleRecordVertxKafkaTest.java @@ -0,0 +1,125 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka; + +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public abstract class AbstractSingleRecordVertxKafkaTest extends AbstractVertxKafkaTest { + + final CountDownLatch consumerReady = new CountDownLatch(1); + + @BeforeAll + void setUpTopicAndConsumer() { + kafkaConsumer.handler( + record -> { + testing().runWithSpan("consumer", () -> {}); + if (record.value().equals("error")) { + throw new IllegalArgumentException("boom"); + } + }); + + kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown()); + subscribe("testSingleTopic"); + } + + @Test + void shouldCreateSpansForSingleRecordProcess() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + KafkaProducerRecord record = + KafkaProducerRecord.create("testSingleTopic", "10", "testSpan"); + CountDownLatch sent = new CountDownLatch(1); + testing().runWithSpan("producer", () -> sendRecord(record, result -> sent.countDown())); + assertTrue(sent.await(30, TimeUnit.SECONDS)); + + AtomicReference producer = new AtomicReference<>(); + + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testSingleTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record))); + + producer.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testSingleTopic receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly(receiveAttributes("testSingleTopic")), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasAttributesSatisfyingExactly(processAttributes(record)), + span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); + } + + @Test + void shouldHandleFailureInSingleRecordHandler() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + KafkaProducerRecord record = + KafkaProducerRecord.create("testSingleTopic", "10", "error"); + CountDownLatch sent = new CountDownLatch(1); + testing().runWithSpan("producer", () -> sendRecord(record, result -> sent.countDown())); + assertTrue(sent.await(30, TimeUnit.SECONDS)); + + AtomicReference producer = new AtomicReference<>(); + + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testSingleTopic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record))); + + producer.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testSingleTopic receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly(receiveAttributes("testSingleTopic")), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly(processAttributes(record)), + span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractVertxKafkaTest.java similarity index 74% rename from instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java rename to instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractVertxKafkaTest.java index c20ff15fab8e..b0cdd3070a82 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/AbstractVertxKafkaTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; +package io.opentelemetry.javaagent.instrumentation.vertx.kafka; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; @@ -20,7 +20,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; import io.vertx.core.AsyncResult; @@ -30,9 +29,6 @@ import io.vertx.kafka.client.producer.KafkaProducer; import io.vertx.kafka.client.producer.KafkaProducerRecord; import io.vertx.kafka.client.producer.RecordMetadata; -import java.lang.invoke.MethodHandle; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.MethodType; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; @@ -47,7 +43,7 @@ import org.assertj.core.api.AbstractStringAssert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.TestInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -55,6 +51,7 @@ import org.testcontainers.kafka.KafkaContainer; import org.testcontainers.utility.DockerImageName; +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class AbstractVertxKafkaTest { private static final Logger logger = LoggerFactory.getLogger(AbstractVertxKafkaTest.class); @@ -62,16 +59,25 @@ public abstract class AbstractVertxKafkaTest { private static final AttributeKey MESSAGING_CLIENT_ID = AttributeKey.stringKey("messaging.client_id"); - @RegisterExtension - protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + KafkaContainer kafka; + Vertx vertx; + protected KafkaProducer kafkaProducer; + protected KafkaConsumer kafkaConsumer; - static KafkaContainer kafka; - static Vertx vertx; - static KafkaProducer kafkaProducer; - protected static KafkaConsumer kafkaConsumer; + protected abstract InstrumentationExtension testing(); + + protected abstract boolean hasConsumerGroup(); + + protected abstract void closeKafkaConsumer(KafkaConsumer consumer); + + protected abstract void closeKafkaProducer(KafkaProducer producer); + + protected abstract void closeVertx(Vertx vertx); + + protected abstract void subscribe(String topic); @BeforeAll - static void setUpAll() { + void setUpAll() { kafka = new KafkaContainer(DockerImageName.parse("apache/kafka:3.8.0")) .withEnv("KAFKA_HEAP_OPTS", "-Xmx256m") @@ -86,20 +92,20 @@ static void setUpAll() { } @AfterAll - static void tearDownAll() { + void tearDownAll() { if (kafkaConsumer != null) { - kafkaConsumer.close(unused -> {}); + closeKafkaConsumer(kafkaConsumer); } if (kafkaProducer != null) { - kafkaProducer.close(unused -> {}); + closeKafkaProducer(kafkaProducer); } if (vertx != null) { - vertx.close(unused -> {}); + closeVertx(vertx); } kafka.stop(); } - private static Properties producerProps() { + private Properties producerProps() { Properties props = new Properties(); props.put("bootstrap.servers", kafka.getBootstrapServers()); props.put("retries", 0); @@ -108,7 +114,7 @@ private static Properties producerProps() { return props; } - private static Properties consumerProps() { + private Properties consumerProps() { Properties props = new Properties(); props.put("bootstrap.servers", kafka.getBootstrapServers()); props.put("group.id", "test"); @@ -137,13 +143,14 @@ protected final void sendBatchMessages(KafkaProducerRecord... re Thread.sleep(1000); CountDownLatch sent = new CountDownLatch(records.length); - testing.runWithSpan( - "producer", - () -> { - for (KafkaProducerRecord record : records) { - sendRecord(record, result -> sent.countDown()); - } - }); + testing() + .runWithSpan( + "producer", + () -> { + for (KafkaProducerRecord record : records) { + sendRecord(record, result -> sent.countDown()); + } + }); assertTrue(sent.await(30, TimeUnit.SECONDS)); kafkaConsumer.resume(); @@ -152,60 +159,16 @@ protected final void sendBatchMessages(KafkaProducerRecord... re if (BatchRecordsHandler.getLastBatchSize() == records.length) { break; } else if (i < maxAttempts) { - testing.waitForTraces(2); + testing().waitForTraces(2); Thread.sleep(1_000); // sleep a bit to give time for all the spans to arrive - testing.clearData(); + testing().clearData(); logger.info("Messages weren't received as batch, retrying"); } } } - private static final MethodHandle SEND_METHOD; - - static { - MethodHandles.Lookup lookup = MethodHandles.publicLookup(); - MethodHandle sendMethod = null; - - // versions 3.6+ - try { - sendMethod = - lookup.findVirtual( - KafkaProducer.class, - "write", - MethodType.methodType(KafkaProducer.class, KafkaProducerRecord.class, Handler.class)); - } catch (NoSuchMethodException | IllegalAccessException ignored) { - // ignore - } - - // versions 4+ - if (sendMethod == null) { - try { - sendMethod = - lookup.findVirtual( - KafkaProducer.class, - "send", - MethodType.methodType( - KafkaProducer.class, KafkaProducerRecord.class, Handler.class)); - } catch (NoSuchMethodException | IllegalAccessException ignored) { - // ignore - } - } - - if (sendMethod == null) { - throw new AssertionError("Could not find send/write method on KafkaProducer"); - } - SEND_METHOD = sendMethod; - } - - protected static void sendRecord( - KafkaProducerRecord record, Handler> handler) { - - try { - SEND_METHOD.invoke(kafkaProducer, record, handler); - } catch (Throwable e) { - throw new AssertionError("Failed producer send/write invocation", e); - } - } + protected abstract void sendRecord( + KafkaProducerRecord record, Handler> handler); @SuppressWarnings("deprecation") // using deprecated semconv protected static List sendAttributes( @@ -226,16 +189,16 @@ protected static List sendAttributes( return assertions; } - protected static List receiveAttributes(String topic) { + protected List receiveAttributes(String topic) { return batchConsumerAttributes(topic, "receive"); } - protected static List batchProcessAttributes(String topic) { + protected List batchProcessAttributes(String topic) { return batchConsumerAttributes(topic, "process"); } @SuppressWarnings("deprecation") // using deprecated semconv - private static List batchConsumerAttributes(String topic, String operation) { + private List batchConsumerAttributes(String topic, String operation) { List assertions = new ArrayList<>( Arrays.asList( @@ -244,16 +207,14 @@ private static List batchConsumerAttributes(String topic, St equalTo(MESSAGING_OPERATION, operation), satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isPositive))); - // consumer group is not available in version 0.11 - if (Boolean.getBoolean("testLatestDeps")) { + if (hasConsumerGroup()) { assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test")); } return assertions; } @SuppressWarnings("deprecation") // using deprecated semconv - protected static List processAttributes( - KafkaProducerRecord record) { + protected List processAttributes(KafkaProducerRecord record) { List assertions = new ArrayList<>( Arrays.asList( @@ -270,7 +231,7 @@ protected static List processAttributes( AbstractLongAssert::isNotNegative)); } // consumer group is not available in version 0.11 - if (Boolean.getBoolean("testLatestDeps")) { + if (hasConsumerGroup()) { assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test")); } String messageKey = record.key(); diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsHandler.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/BatchRecordsHandler.java similarity index 95% rename from instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsHandler.java rename to instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/BatchRecordsHandler.java index 997498b4aff5..e5f3d377fe0e 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsHandler.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/BatchRecordsHandler.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; +package io.opentelemetry.javaagent.instrumentation.vertx.kafka; import io.opentelemetry.instrumentation.testing.GlobalTraceUtil; import io.vertx.core.Handler; diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/build.gradle.kts b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/build.gradle.kts new file mode 100644 index 000000000000..2c9dc8ef104b --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/build.gradle.kts @@ -0,0 +1,61 @@ +plugins { + id("otel.javaagent-testing") +} + +dependencies { + library("io.vertx:vertx-kafka-client:3.6.0") + // vertx-codegen is needed for Xlint's annotation checking + library("io.vertx:vertx-codegen:3.6.0") + + testImplementation(project(":instrumentation:vertx:vertx-kafka-client-3.6:testing")) + + testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")) + testInstrumentation(project(":instrumentation:vertx:vertx-kafka-client-3.6:javaagent")) + + latestDepTestLibrary("io.vertx:vertx-kafka-client:3.+") // documented limitation + latestDepTestLibrary("io.vertx:vertx-codegen:3.+") // documented limitation +} + +val latestDepTest = findProperty("testLatestDeps") as Boolean + +testing { + suites { + val testNoReceiveTelemetry by registering(JvmTestSuite::class) { + dependencies { + implementation(project(":instrumentation:vertx:vertx-kafka-client-3.6:testing")) + + val version = if (latestDepTest) "3.+" else "3.6.0" + implementation("io.vertx:vertx-kafka-client:$version") + implementation("io.vertx:vertx-codegen:$version") + } + + targets { + all { + testTask.configure { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) + + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=false") + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false") + } + } + } + } + } +} + +tasks { + test { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + systemProperty("testLatestDeps", latestDepTest) + + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + } + + check { + dependsOn(testing.suites) + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsVertxKafka36Test.java b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsVertxKafka36Test.java new file mode 100644 index 000000000000..76b0541a984f --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsVertxKafka36Test.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.javaagent.instrumentation.vertx.kafka.AbstractBatchRecordsVertxKafkaTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import org.junit.jupiter.api.extension.RegisterExtension; + +class BatchRecordsVertxKafka36Test extends AbstractBatchRecordsVertxKafkaTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasConsumerGroup() { + // consumer group is not available in version 0.11 + return Boolean.getBoolean("testLatestDeps"); + } + + @Override + protected void closeKafkaConsumer(KafkaConsumer consumer) { + consumer.close(); + } + + @Override + protected void closeKafkaProducer(KafkaProducer producer) { + producer.close(); + } + + @Override + protected void closeVertx(Vertx vertx) { + vertx.close(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + protected void sendRecord( + KafkaProducerRecord record, Handler> handler) { + // using raw type to avoid compilation failure in latest dep tests because the expected generic + // type has changed + kafkaProducer.write(record, (Handler) handler); + } + + @Override + protected void subscribe(String topic) { + kafkaConsumer.subscribe(topic); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafka36Test.java b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafka36Test.java new file mode 100644 index 000000000000..9cd6e2f6e1e5 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafka36Test.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.javaagent.instrumentation.vertx.kafka.AbstractSingleRecordVertxKafkaTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SingleRecordVertxKafka36Test extends AbstractSingleRecordVertxKafkaTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasConsumerGroup() { + // consumer group is not available in version 0.11 + return Boolean.getBoolean("testLatestDeps"); + } + + @Override + protected void closeKafkaConsumer(KafkaConsumer consumer) { + consumer.close(); + } + + @Override + protected void closeKafkaProducer(KafkaProducer producer) { + producer.close(); + } + + @Override + protected void closeVertx(Vertx vertx) { + vertx.close(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + protected void sendRecord( + KafkaProducerRecord record, Handler> handler) { + // using raw type to avoid compilation failure in latest dep tests because the expected generic + // type has changed + kafkaProducer.write(record, (Handler) handler); + } + + @Override + protected void subscribe(String topic) { + kafkaConsumer.subscribe(topic); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsNoReceiveTelemetryVertxKafka36Test.java b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsNoReceiveTelemetryVertxKafka36Test.java new file mode 100644 index 000000000000..42c14bff787d --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsNoReceiveTelemetryVertxKafka36Test.java @@ -0,0 +1,64 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.javaagent.instrumentation.vertx.kafka.AbstractBatchRecordsNoReceiveTelemetryVertxKafkaTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class BatchRecordsNoReceiveTelemetryVertxKafka36Test + extends AbstractBatchRecordsNoReceiveTelemetryVertxKafkaTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasConsumerGroup() { + // consumer group is not available in version 0.11 + return Boolean.getBoolean("testLatestDeps"); + } + + @Override + protected void closeKafkaConsumer(KafkaConsumer consumer) { + consumer.close(); + } + + @Override + protected void closeKafkaProducer(KafkaProducer producer) { + producer.close(); + } + + @Override + protected void closeVertx(Vertx vertx) { + vertx.close(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + protected void sendRecord( + KafkaProducerRecord record, Handler> handler) { + // using raw type to avoid compilation failure in latest dep tests because the expected generic + // type has changed + kafkaProducer.write(record, (Handler) handler); + } + + @Override + protected void subscribe(String topic) { + kafkaConsumer.subscribe(topic); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordNoReceiveTelemetryVertxKafka36Test.java b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordNoReceiveTelemetryVertxKafka36Test.java new file mode 100644 index 000000000000..ea8cdac21429 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-3.6-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordNoReceiveTelemetryVertxKafka36Test.java @@ -0,0 +1,64 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.javaagent.instrumentation.vertx.kafka.AbstractSingleRecordNoReceiveTelemetryVertxKafkaTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class SingleRecordNoReceiveTelemetryVertxKafka36Test + extends AbstractSingleRecordNoReceiveTelemetryVertxKafkaTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasConsumerGroup() { + // consumer group is not available in version 0.11 + return Boolean.getBoolean("testLatestDeps"); + } + + @Override + protected void closeKafkaConsumer(KafkaConsumer consumer) { + consumer.close(); + } + + @Override + protected void closeKafkaProducer(KafkaProducer producer) { + producer.close(); + } + + @Override + protected void closeVertx(Vertx vertx) { + vertx.close(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + protected void sendRecord( + KafkaProducerRecord record, Handler> handler) { + // using raw type to avoid compilation failure in latest dep tests because the expected generic + // type has changed + kafkaProducer.write(record, (Handler) handler); + } + + @Override + protected void subscribe(String topic) { + kafkaConsumer.subscribe(topic); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/build.gradle.kts b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/build.gradle.kts new file mode 100644 index 000000000000..47adfbc0e7a1 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/build.gradle.kts @@ -0,0 +1,61 @@ +plugins { + id("otel.javaagent-testing") +} + +dependencies { + library("io.vertx:vertx-kafka-client:4.0.0") + // vertx-codegen is needed for Xlint's annotation checking + library("io.vertx:vertx-codegen:4.0.0") + + testImplementation(project(":instrumentation:vertx:vertx-kafka-client-3.6:testing")) + + testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")) + testInstrumentation(project(":instrumentation:vertx:vertx-kafka-client-3.6:javaagent")) + + latestDepTestLibrary("io.vertx:vertx-kafka-client:4.+") // documented limitation + latestDepTestLibrary("io.vertx:vertx-codegen:4.+") // documented limitation +} + +val latestDepTest = findProperty("testLatestDeps") as Boolean + +testing { + suites { + val testNoReceiveTelemetry by registering(JvmTestSuite::class) { + dependencies { + implementation(project(":instrumentation:vertx:vertx-kafka-client-3.6:testing")) + + val version = if (latestDepTest) "4.+" else "4.0.0" + implementation("io.vertx:vertx-kafka-client:$version") + implementation("io.vertx:vertx-codegen:$version") + } + + targets { + all { + testTask.configure { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) + + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=false") + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false") + } + } + } + } + } +} + +tasks { + test { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + systemProperty("testLatestDeps", latestDepTest) + + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + } + + check { + dependsOn(testing.suites) + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v4_0/BatchRecordsVertxKafka4Test.java b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v4_0/BatchRecordsVertxKafka4Test.java new file mode 100644 index 000000000000..dc5700d51fa0 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v4_0/BatchRecordsVertxKafka4Test.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v4_0; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.javaagent.instrumentation.vertx.kafka.AbstractBatchRecordsVertxKafkaTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import org.junit.jupiter.api.extension.RegisterExtension; + +class BatchRecordsVertxKafka4Test extends AbstractBatchRecordsVertxKafkaTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasConsumerGroup() { + return true; + } + + @Override + protected void closeKafkaConsumer(KafkaConsumer consumer) { + consumer.close(); + } + + @Override + protected void closeKafkaProducer(KafkaProducer producer) { + producer.close(); + } + + @Override + protected void closeVertx(Vertx vertx) { + vertx.close(); + } + + @Override + protected void sendRecord( + KafkaProducerRecord record, Handler> handler) { + kafkaProducer.send(record, handler); + } + + @Override + protected void subscribe(String topic) { + kafkaConsumer.subscribe(topic); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v4_0/SingleRecordVertxKafka4Test.java b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v4_0/SingleRecordVertxKafka4Test.java new file mode 100644 index 000000000000..0a3a889a7539 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v4_0/SingleRecordVertxKafka4Test.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v4_0; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.javaagent.instrumentation.vertx.kafka.AbstractSingleRecordVertxKafkaTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SingleRecordVertxKafka4Test extends AbstractSingleRecordVertxKafkaTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasConsumerGroup() { + return true; + } + + @Override + protected void closeKafkaConsumer(KafkaConsumer consumer) { + consumer.close(); + } + + @Override + protected void closeKafkaProducer(KafkaProducer producer) { + producer.close(); + } + + @Override + protected void closeVertx(Vertx vertx) { + vertx.close(); + } + + @Override + protected void sendRecord( + KafkaProducerRecord record, Handler> handler) { + kafkaProducer.send(record, handler); + } + + @Override + protected void subscribe(String topic) { + kafkaConsumer.subscribe(topic); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v4_0/BatchRecordsNoReceiveTelemetryVertxKafka4Test.java b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v4_0/BatchRecordsNoReceiveTelemetryVertxKafka4Test.java new file mode 100644 index 000000000000..ae9f151c4c8e --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v4_0/BatchRecordsNoReceiveTelemetryVertxKafka4Test.java @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v4_0; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.javaagent.instrumentation.vertx.kafka.AbstractBatchRecordsNoReceiveTelemetryVertxKafkaTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class BatchRecordsNoReceiveTelemetryVertxKafka4Test + extends AbstractBatchRecordsNoReceiveTelemetryVertxKafkaTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasConsumerGroup() { + return true; + } + + @Override + protected void closeKafkaConsumer(KafkaConsumer consumer) { + consumer.close(); + } + + @Override + protected void closeKafkaProducer(KafkaProducer producer) { + producer.close(); + } + + @Override + protected void closeVertx(Vertx vertx) { + vertx.close(); + } + + @Override + protected void sendRecord( + KafkaProducerRecord record, Handler> handler) { + kafkaProducer.send(record, handler); + } + + @Override + protected void subscribe(String topic) { + kafkaConsumer.subscribe(topic); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v4_0/SingleRecordNoReceiveTelemetryVertxKafka4Test.java b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v4_0/SingleRecordNoReceiveTelemetryVertxKafka4Test.java new file mode 100644 index 000000000000..cdff4561162c --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-4-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v4_0/SingleRecordNoReceiveTelemetryVertxKafka4Test.java @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v4_0; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.javaagent.instrumentation.vertx.kafka.AbstractSingleRecordNoReceiveTelemetryVertxKafkaTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class SingleRecordNoReceiveTelemetryVertxKafka4Test + extends AbstractSingleRecordNoReceiveTelemetryVertxKafkaTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasConsumerGroup() { + return true; + } + + @Override + protected void closeKafkaConsumer(KafkaConsumer consumer) { + consumer.close(); + } + + @Override + protected void closeKafkaProducer(KafkaProducer producer) { + producer.close(); + } + + @Override + protected void closeVertx(Vertx vertx) { + vertx.close(); + } + + @Override + protected void sendRecord( + KafkaProducerRecord record, Handler> handler) { + kafkaProducer.send(record, handler); + } + + @Override + protected void subscribe(String topic) { + kafkaConsumer.subscribe(topic); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/build.gradle.kts b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/build.gradle.kts new file mode 100644 index 000000000000..8993c7020ea4 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/build.gradle.kts @@ -0,0 +1,62 @@ +plugins { + id("otel.javaagent-testing") +} + +otelJava { + minJavaVersionSupported.set(JavaVersion.VERSION_11) +} + +dependencies { + library("io.vertx:vertx-kafka-client:5.0.0") + // vertx-codegen is needed for Xlint's annotation checking + library("io.vertx:vertx-codegen:5.0.0") + + testImplementation(project(":instrumentation:vertx:vertx-kafka-client-3.6:testing")) + + testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")) + testInstrumentation(project(":instrumentation:vertx:vertx-kafka-client-3.6:javaagent")) +} + +val latestDepTest = findProperty("testLatestDeps") as Boolean + +testing { + suites { + val testNoReceiveTelemetry by registering(JvmTestSuite::class) { + dependencies { + implementation(project(":instrumentation:vertx:vertx-kafka-client-3.6:testing")) + + val version = if (latestDepTest) "latest.release" else "5.0.0" + implementation("io.vertx:vertx-kafka-client:$version") + implementation("io.vertx:vertx-codegen:$version") + } + + targets { + all { + testTask.configure { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) + + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=false") + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false") + } + } + } + } + } +} + +tasks { + test { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + systemProperty("testLatestDeps", latestDepTest) + + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + } + + check { + dependsOn(testing.suites) + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v5_0/BatchRecordsVertxKafka5Test.java b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v5_0/BatchRecordsVertxKafka5Test.java new file mode 100644 index 000000000000..f60ad6fe47b1 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v5_0/BatchRecordsVertxKafka5Test.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v5_0; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.javaagent.instrumentation.vertx.kafka.AbstractBatchRecordsVertxKafkaTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import org.junit.jupiter.api.extension.RegisterExtension; + +class BatchRecordsVertxKafka5Test extends AbstractBatchRecordsVertxKafkaTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasConsumerGroup() { + return true; + } + + @Override + protected void closeKafkaConsumer(KafkaConsumer consumer) { + consumer.close(); + } + + @Override + protected void closeKafkaProducer(KafkaProducer producer) { + producer.close(); + } + + @Override + protected void closeVertx(Vertx vertx) { + vertx.close(); + } + + @Override + protected void sendRecord( + KafkaProducerRecord record, Handler> handler) { + kafkaProducer.send(record).onComplete(handler); + } + + @Override + protected void subscribe(String topic) { + kafkaConsumer.subscribe(topic); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v5_0/SingleRecordVertxKafka5Test.java b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v5_0/SingleRecordVertxKafka5Test.java new file mode 100644 index 000000000000..8ced7ffd46eb --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v5_0/SingleRecordVertxKafka5Test.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v5_0; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.javaagent.instrumentation.vertx.kafka.AbstractSingleRecordVertxKafkaTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SingleRecordVertxKafka5Test extends AbstractSingleRecordVertxKafkaTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasConsumerGroup() { + return true; + } + + @Override + protected void closeKafkaConsumer(KafkaConsumer consumer) { + consumer.close(); + } + + @Override + protected void closeKafkaProducer(KafkaProducer producer) { + producer.close(); + } + + @Override + protected void closeVertx(Vertx vertx) { + vertx.close(); + } + + @Override + protected void sendRecord( + KafkaProducerRecord record, Handler> handler) { + kafkaProducer.send(record).onComplete(handler); + } + + @Override + protected void subscribe(String topic) { + kafkaConsumer.subscribe(topic); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v5_0/BatchRecordsNoReceiveTelemetryVertxKafka5Test.java b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v5_0/BatchRecordsNoReceiveTelemetryVertxKafka5Test.java new file mode 100644 index 000000000000..dfe6ef2b84f1 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v5_0/BatchRecordsNoReceiveTelemetryVertxKafka5Test.java @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v5_0; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.javaagent.instrumentation.vertx.kafka.AbstractBatchRecordsNoReceiveTelemetryVertxKafkaTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class BatchRecordsNoReceiveTelemetryVertxKafka5Test + extends AbstractBatchRecordsNoReceiveTelemetryVertxKafkaTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasConsumerGroup() { + return true; + } + + @Override + protected void closeKafkaConsumer(KafkaConsumer consumer) { + consumer.close(); + } + + @Override + protected void closeKafkaProducer(KafkaProducer producer) { + producer.close(); + } + + @Override + protected void closeVertx(Vertx vertx) { + vertx.close(); + } + + @Override + protected void sendRecord( + KafkaProducerRecord record, Handler> handler) { + kafkaProducer.send(record).onComplete(handler); + } + + @Override + protected void subscribe(String topic) { + kafkaConsumer.subscribe(topic); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v5_0/SingleRecordNoReceiveTelemetryVertxKafka5Test.java b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v5_0/SingleRecordNoReceiveTelemetryVertxKafka5Test.java new file mode 100644 index 000000000000..297aa3828259 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/vertx-kafka-client-5-testing/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v5_0/SingleRecordNoReceiveTelemetryVertxKafka5Test.java @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v5_0; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.javaagent.instrumentation.vertx.kafka.AbstractSingleRecordNoReceiveTelemetryVertxKafkaTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class SingleRecordNoReceiveTelemetryVertxKafka5Test + extends AbstractSingleRecordNoReceiveTelemetryVertxKafkaTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasConsumerGroup() { + return true; + } + + @Override + protected void closeKafkaConsumer(KafkaConsumer consumer) { + consumer.close(); + } + + @Override + protected void closeKafkaProducer(KafkaProducer producer) { + producer.close(); + } + + @Override + protected void closeVertx(Vertx vertx) { + vertx.close(); + } + + @Override + protected void sendRecord( + KafkaProducerRecord record, Handler> handler) { + kafkaProducer.send(record).onComplete(handler); + } + + @Override + protected void subscribe(String topic) { + kafkaConsumer.subscribe(topic); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 02f87bf5993f..d8a86aba184e 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -603,6 +603,9 @@ include(":instrumentation:vertx:vertx-http-client:vertx-http-client-5.0:javaagen include(":instrumentation:vertx:vertx-http-client:vertx-http-client-common:javaagent") include(":instrumentation:vertx:vertx-kafka-client-3.6:javaagent") include(":instrumentation:vertx:vertx-kafka-client-3.6:testing") +include(":instrumentation:vertx:vertx-kafka-client-3.6:vertx-kafka-client-3.6-testing") +include(":instrumentation:vertx:vertx-kafka-client-3.6:vertx-kafka-client-4-testing") +include(":instrumentation:vertx:vertx-kafka-client-3.6:vertx-kafka-client-5-testing") include(":instrumentation:vertx:vertx-redis-client-4.0:javaagent") include(":instrumentation:vertx:vertx-rx-java-3.5:javaagent") include(":instrumentation:vertx:vertx-sql-client-4.0:javaagent")