diff --git a/dd-java-agent/instrumentation/jax-rs-annotations-1/build.gradle b/dd-java-agent/instrumentation/jax-rs-annotations-1/build.gradle index 5d7c4c60eee..043d79272bb 100644 --- a/dd-java-agent/instrumentation/jax-rs-annotations-1/build.gradle +++ b/dd-java-agent/instrumentation/jax-rs-annotations-1/build.gradle @@ -16,6 +16,7 @@ apply from: "$rootDir/gradle/java.gradle" dependencies { compileOnly group: 'javax.ws.rs', name: 'jsr311-api', version: '1.1.1' + testImplementation libs.spock.junit4 // This legacy module still needs JUnit4. testImplementation group: 'io.dropwizard', name: 'dropwizard-testing', version: '0.7.1' testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3' } diff --git a/dd-java-agent/instrumentation/jax-rs-annotations-2/build.gradle b/dd-java-agent/instrumentation/jax-rs-annotations-2/build.gradle index 08d2b69574f..fb43be7bb4f 100644 --- a/dd-java-agent/instrumentation/jax-rs-annotations-2/build.gradle +++ b/dd-java-agent/instrumentation/jax-rs-annotations-2/build.gradle @@ -33,8 +33,7 @@ dependencies { testImplementation project(':dd-java-agent:instrumentation:jax-rs-annotations-2:filter-resteasy-3.1') // Jersey - // First version with DropwizardTestSupport: - testImplementation group: 'io.dropwizard', name: 'dropwizard-testing', version: '0.8.0' + testImplementation group: 'io.dropwizard', name: 'dropwizard-testing', version: '1.3.29' // Version compatible with Java 8 and JUnit5. testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.3.1' testImplementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-afterburner', version: '2.9.10' diff --git a/dd-java-agent/instrumentation/jax-rs-annotations-2/src/nestedTest/groovy/NestedResourcesTest.groovy b/dd-java-agent/instrumentation/jax-rs-annotations-2/src/nestedTest/groovy/NestedResourcesTest.groovy index 061c180a279..c8e3382d44c 100644 --- a/dd-java-agent/instrumentation/jax-rs-annotations-2/src/nestedTest/groovy/NestedResourcesTest.groovy +++ b/dd-java-agent/instrumentation/jax-rs-annotations-2/src/nestedTest/groovy/NestedResourcesTest.groovy @@ -1,8 +1,7 @@ import datadog.trace.agent.test.InstrumentationSpecification import datadog.trace.api.DDSpanTypes import datadog.trace.bootstrap.instrumentation.api.Tags -import io.dropwizard.testing.junit.ResourceTestRule -import org.junit.ClassRule +import io.dropwizard.testing.junit5.ResourceExtension import spock.lang.Shared import javax.ws.rs.core.Response @@ -11,8 +10,7 @@ import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace class NestedResourcesTest extends InstrumentationSpecification { @Shared - @ClassRule - ResourceTestRule resources = ResourceTestRule.builder() + ResourceExtension resources = ResourceExtension.builder() .addResource(new KeyCloakResources.AdminRoot()) .addResource(new KeyCloakResources.RealmsAdminResource()) .addResource(new KeyCloakResources.RealmAdminResource()) @@ -20,15 +18,23 @@ class NestedResourcesTest extends InstrumentationSpecification { .addResource(new KeyCloakResources.UserResource()) .build() + // Spock has no support for JUnit5 extension. + def setupSpec() { + resources.before() + } + + def cleanupSpec() { + resources.after() + } + def getClient() { resources.client() } def "test nested calls"() { when: - Response response - runUnderTrace("test.span") { - response = getClient().target("/admin/realms/realm1/users/53c82214-ca89-423b-a1f3-6a7784e61cf6").request().get() + Response response = runUnderTrace("test.span") { + getClient().target("/admin/realms/realm1/users/53c82214-ca89-423b-a1f3-6a7784e61cf6").request().get() } then: diff --git a/dd-java-agent/instrumentation/jax-rs-annotations-2/src/test/groovy/JaxRsFilterTest.groovy b/dd-java-agent/instrumentation/jax-rs-annotations-2/src/test/groovy/JaxRsFilterTest.groovy index 495dd6d7c13..7027e0bf100 100644 --- a/dd-java-agent/instrumentation/jax-rs-annotations-2/src/test/groovy/JaxRsFilterTest.groovy +++ b/dd-java-agent/instrumentation/jax-rs-annotations-2/src/test/groovy/JaxRsFilterTest.groovy @@ -1,12 +1,11 @@ import datadog.trace.agent.test.InstrumentationSpecification import datadog.trace.api.DDSpanTypes import datadog.trace.bootstrap.instrumentation.api.Tags -import io.dropwizard.testing.junit.ResourceTestRule +import io.dropwizard.testing.junit5.ResourceExtension import org.jboss.resteasy.core.Dispatcher import org.jboss.resteasy.mock.MockDispatcherFactory import org.jboss.resteasy.mock.MockHttpRequest import org.jboss.resteasy.mock.MockHttpResponse -import org.junit.ClassRule import spock.lang.Shared import javax.ws.rs.client.Entity @@ -36,12 +35,9 @@ abstract class JaxRsFilterTest extends InstrumentationSpecification { def abort = abortNormal || abortPrematch when: - def responseText - def responseStatus - // start a trace because the test doesn't go through any servlet or other instrumentation. - runUnderTrace("test.span") { - (responseText, responseStatus) = makeRequest(resource) + def (responseText, responseStatus) = runUnderTrace("test.span") { + makeRequest(resource) } then: @@ -104,12 +100,9 @@ abstract class JaxRsFilterTest extends InstrumentationSpecification { prematchRequestFilter.abort = false when: - def responseText - def responseStatus - // start a trace because the test doesn't go through any servlet or other instrumentation. - runUnderTrace("test.span") { - (responseText, responseStatus) = makeRequest(resource) + def (responseText, responseStatus) = runUnderTrace("test.span") { + makeRequest(resource) } then: @@ -192,8 +185,7 @@ abstract class JaxRsFilterTest extends InstrumentationSpecification { class JerseyFilterTest extends JaxRsFilterTest { @Shared - @ClassRule - ResourceTestRule resources = ResourceTestRule.builder() + ResourceExtension resources = ResourceExtension.builder() .addResource(new Resource.Test1()) .addResource(new Resource.Test2()) .addResource(new Resource.Test3()) @@ -201,6 +193,15 @@ class JerseyFilterTest extends JaxRsFilterTest { .addProvider(prematchRequestFilter) .build() + // Spock has no support for JUnit5 extension. + def setupSpec() { + resources.before() + } + + def cleanupSpec() { + resources.after() + } + @Override def makeRequest(String url) { Response response = resources.client().target(url).request().post(Entity.text("")) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/build.gradle b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/build.gradle index f72dbf676bd..b23bf4c1808 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/build.gradle +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/build.gradle @@ -26,6 +26,7 @@ dependencies { compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0' implementation project(':dd-java-agent:instrumentation:kafka:kafka-common') + testImplementation libs.spock.junit4 // This legacy module still needs JUnit4. testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0' testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '1.3.3.RELEASE' testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '1.3.3.RELEASE' diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/build.gradle b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/build.gradle index 13d5a9591a4..d0e4201049a 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/build.gradle +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/build.gradle @@ -1,6 +1,7 @@ ext { minJavaVersionForTests = JavaVersion.VERSION_17 } + muzzle { pass { group = "org.apache.kafka" @@ -31,14 +32,11 @@ tasks.withType(GroovyCompile).configureEach { } dependencies { - // compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0' - main_java17CompileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0' implementation project(':dd-java-agent:instrumentation:kafka:kafka-common') - main_java17Implementation project(':dd-java-agent:instrumentation:kafka:kafka-common') - implementation project(':dd-java-agent:instrumentation:span-origin') - testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0' + main_java17CompileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0' + main_java17Implementation project(':dd-java-agent:instrumentation:kafka:kafka-common') testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.4', { exclude group: 'org.apache.kafka' @@ -46,8 +44,13 @@ dependencies { testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.3.4', { exclude group: 'org.apache.kafka' } + + testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test' + + testImplementation 'org.apache.kafka:kafka-clients:3.8.0' testImplementation 'org.apache.kafka:kafka-clients:3.8.0:test' - testImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '3.8.0' + + testImplementation 'org.apache.kafka:kafka_2.13:3.8.0' testImplementation 'org.apache.kafka:kafka_2.13:3.8.0:test' testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3' @@ -57,7 +60,5 @@ dependencies { latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.+' latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.+' latestDepTestImplementation group: 'io.dropwizard.metrics', name: 'metrics-core', version: '+' - latestDepTestImplementation libs.guava - } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientCustomPropagationConfigTest.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientCustomPropagationConfigTest.groovy index 08661cd0f5d..245a6365793 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientCustomPropagationConfigTest.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientCustomPropagationConfigTest.groovy @@ -5,7 +5,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.header.Headers import org.apache.kafka.common.header.internals.RecordHeaders -import org.junit.Rule import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate @@ -13,9 +12,10 @@ import org.springframework.kafka.listener.ContainerProperties import org.springframework.kafka.listener.KafkaMessageListenerContainer import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.test.EmbeddedKafkaBroker -import org.springframework.kafka.test.rule.EmbeddedKafkaRule +import org.springframework.kafka.test.EmbeddedKafkaKraftBroker import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Shared import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit @@ -29,9 +29,17 @@ class KafkaClientCustomPropagationConfigTest extends InstrumentationSpecificatio static final SHARED_TOPIC = ["topic1", "topic2", "topic3", "topic4"] static final MESSAGE = "Testing without headers for certain topics" - @Rule - EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, true, SHARED_TOPIC.toArray(String[]::new)) - EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka + @Shared + EmbeddedKafkaBroker embeddedKafka + + def setupSpec() { + embeddedKafka = new EmbeddedKafkaKraftBroker(1, 2, *SHARED_TOPIC) + embeddedKafka.afterPropertiesSet() + } + + def cleanupSpec() { + embeddedKafka.destroy() + } static final dataTable() { [ @@ -91,36 +99,36 @@ class KafkaClientCustomPropagationConfigTest extends InstrumentationSpecificatio // setup a Kafka message listener container1.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces - records1.add(record) - } - }) + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records1.add(record) + } + }) container2.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces - records2.add(record) - } - }) + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records2.add(record) + } + }) container3.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces - records3.add(record) - } - }) + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records3.add(record) + } + }) container4.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces - records4.add(record) - } - }) + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records4.add(record) + } + }) // start the container and underlying message listener container1.start() @@ -195,36 +203,36 @@ class KafkaClientCustomPropagationConfigTest extends InstrumentationSpecificatio // setup a Kafka message listener container1.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces - records1.add(activeSpan()) - } - }) + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records1.add(activeSpan()) + } + }) container2.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces - records2.add(activeSpan()) - } - }) + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records2.add(activeSpan()) + } + }) container3.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces - records3.add(activeSpan()) - } - }) + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records3.add(activeSpan()) + } + }) container4.setupMessageListener(new MessageListener() { - @Override - void onMessage(ConsumerRecord record) { - TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces - records4.add(activeSpan()) - } - }) + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records4.add(activeSpan()) + } + }) // start the container and underlying message listener container1.start() @@ -245,12 +253,12 @@ class KafkaClientCustomPropagationConfigTest extends InstrumentationSpecificatio activateSpan(span).withCloseable { for (String topic : SHARED_TOPIC) { ProducerRecord record = new ProducerRecord<>( - topic, - 0, - null, - MESSAGE, - header - ) + topic, + 0, + null, + MESSAGE, + header + ) kafkaTemplate.send(record as ProducerRecord) } } @@ -291,9 +299,7 @@ class KafkaClientCustomPropagationConfigTest extends InstrumentationSpecificatio container3?.stop() container4?.stop() - where: [value, expected1, expected2, expected3, expected4]<< dataTable() } - } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy index 665e2df70d8..7dbd5e60473 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy @@ -11,10 +11,13 @@ import datadog.trace.core.datastreams.StatsGroup import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.clients.producer.* +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringSerializer -import org.junit.Rule import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate @@ -22,7 +25,7 @@ import org.springframework.kafka.listener.ContainerProperties import org.springframework.kafka.listener.KafkaMessageListenerContainer import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.test.EmbeddedKafkaBroker -import org.springframework.kafka.test.rule.EmbeddedKafkaRule +import org.springframework.kafka.test.EmbeddedKafkaKraftBroker import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils @@ -39,21 +42,19 @@ import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPro abstract class KafkaClientTestBase extends VersionedNamingTestBase { static final SHARED_TOPIC = "shared.topic" - static final String MESSAGE = "Testing without headers for certain topics" - - static final dataTable() { - [ - ["topic1,topic2,topic3,topic4", false, false, false, false], - ["topic1,topic2", false, false, true, true], - ["topic1", false, true, true, true], - ["", true, true, true, true], - ["randomTopic", true, true, true, true] - ] + + EmbeddedKafkaBroker embeddedKafka + + def setup() { + embeddedKafka = new EmbeddedKafkaKraftBroker(1, 2, SHARED_TOPIC) + embeddedKafka.afterPropertiesSet() + + TEST_WRITER.setFilter(dropKafkaPoll) } - @Rule - EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, true, SHARED_TOPIC) - EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka + def cleanup() { + embeddedKafka.destroy() + } @Override boolean useStrictTraceWrites() { @@ -116,10 +117,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { PRODUCER_PATHWAY_EDGE_TAGS.put("type", "kafka") } - def setup() { - TEST_WRITER.setFilter(dropKafkaPoll) - } - @Override int version() { 0 @@ -198,7 +195,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { when: String greeting = "Hello Spring Kafka Sender!" runUnderTrace("parent") { - producer.send(new ProducerRecord(SHARED_TOPIC,greeting)) { meta, ex -> + producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> assert isAsyncPropagationEnabled() if (ex == null) { runUnderTrace("producer callback") {} @@ -213,7 +210,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { // wait for produce offset 0, commit offset 0 on partition 0 and 1, and commit offset 1 on 1 partition //TODO TEST_DATA_STREAMS_WRITER.waitForBacklogs(2) - } then: @@ -267,20 +263,20 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { ) } - def sorted = new ArrayList(TEST_DATA_STREAMS_WRITER.backlogs).sort{ it.type } + def sorted = new ArrayList(TEST_DATA_STREAMS_WRITER.backlogs).sort { it.type } verifyAll(sorted) { size() == 2 get(0).hasAllTags( "consumer_group:sender", "kafka_cluster_id:$clusterId", - "partition:"+received.partition(), + "partition:" + received.partition(), "topic:$SHARED_TOPIC", "type:kafka_commit" ) get(1).hasAllTags( "kafka_cluster_id:$clusterId", - "partition:"+received.partition(), - "topic:"+SHARED_TOPIC, + "partition:" + received.partition(), + "topic:" + SHARED_TOPIC, "type:kafka_produce" ) } @@ -324,7 +320,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } // set up the Kafka consumer properties - def consumerProperties = KafkaTestUtils.consumerProps( embeddedKafka.getBrokersAsString(),"sender", "false") + def consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka.getBrokersAsString(), "sender", "false") // create a Kafka consumer factory def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) @@ -423,20 +419,20 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "type:kafka" ) } - def items = new ArrayList(TEST_DATA_STREAMS_WRITER.backlogs).sort {it.type} + def items = new ArrayList(TEST_DATA_STREAMS_WRITER.backlogs).sort { it.type } verifyAll(items) { size() == 2 get(0).hasAllTags( "consumer_group:sender", "kafka_cluster_id:$clusterId".toString(), - "partition:"+received.partition(), - "topic:"+SHARED_TOPIC, + "partition:" + received.partition(), + "topic:" + SHARED_TOPIC, "type:kafka_commit" ) get(1).hasAllTags( "kafka_cluster_id:$clusterId".toString(), - "partition:"+received.partition(), - "topic:"+SHARED_TOPIC, + "partition:" + received.partition(), + "topic:" + SHARED_TOPIC, "type:kafka_produce" ) } @@ -455,7 +451,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def kafkaTemplate = new KafkaTemplate(producerFactory) // set up the Kafka consumer properties - def consumerProperties = KafkaTestUtils.consumerProps( embeddedKafka.getBrokersAsString(),"sender", "false") + def consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka.getBrokersAsString(), "sender", "false") // create a Kafka consumer factory def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) @@ -512,14 +508,13 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { cleanup: producerFactory.stop() container?.stop() - } def "test records(TopicPartition) kafka consume"() { setup: // set up the Kafka consumer properties def kafkaPartition = 0 - def consumerProperties = KafkaTestUtils.consumerProps( embeddedKafka.getBrokersAsString(),"sender", "false") + def consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka.getBrokersAsString(), "sender", "false") consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") def consumer = new KafkaConsumer(consumerProperties) @@ -544,7 +539,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } then: - recs.hasNext() == false + !recs.hasNext() first.value() == greeting first.key() == null @@ -567,15 +562,13 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { cleanup: consumer.close() producer.close() - - } def "test records(TopicPartition).subList kafka consume"() { setup: def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) - def consumerProperties = KafkaTestUtils.consumerProps( embeddedKafka.getBrokersAsString(),"sender", "false") + def consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka.getBrokersAsString(), "sender", "false") // set up the Kafka consumer properties def kafkaPartition = 0 @@ -603,7 +596,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } then: - recs.hasNext() == false + !recs.hasNext() first.value() == greeting first.key() == null @@ -626,14 +619,13 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { cleanup: consumer.close() producer.close() - } def "test records(TopicPartition).forEach kafka consume"() { setup: def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) - def consumerProperties = KafkaTestUtils.consumerProps( embeddedKafka.getBrokersAsString(),"sender", "false") + def consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka.getBrokersAsString(), "sender", "false") // set up the Kafka consumer properties def kafkaPartition = 0 @@ -684,14 +676,13 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { cleanup: consumer.close() producer.close() - } def "test iteration backwards over ConsumerRecords"() { setup: def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) - def consumerProperties = KafkaTestUtils.consumerProps( embeddedKafka.getBrokersAsString(),"sender", "false") + def consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka.getBrokersAsString(), "sender", "false") def kafkaPartition = 0 consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") @@ -795,19 +786,17 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { cleanup: consumer.close() producer.close() - } def "test kafka client header propagation manual config"() { setup: def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) - def consumerProperties = KafkaTestUtils.consumerProps( embeddedKafka.getBrokersAsString(),"sender", "false") + def consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka.getBrokersAsString(), "sender", "false") def producerFactory = new DefaultKafkaProducerFactory(senderProps) def kafkaTemplate = new KafkaTemplate(producerFactory) - // create a Kafka consumer factory def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) @@ -862,7 +851,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def producerSpan( TraceAssert trace, - Map config, + Map config, DDSpan parentSpan = null, boolean partitioned = true, boolean tombstone = false, @@ -891,7 +880,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { if (tombstone) { "$InstrumentationTags.TOMBSTONE" true } - if ({isDataStreamsEnabled()}) { + if ({ isDataStreamsEnabled() }) { "$DDTags.PATHWAY_HASH" { String } if (schema != null) { "$DDTags.SCHEMA_DEFINITION" schema @@ -912,7 +901,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { DDSpan parentSpan = null ) { trace.span { - serviceName splitByDestination() ? "$SHARED_TOPIC" : serviceForTimeInQueue() + serviceName splitByDestination() ? "$SHARED_TOPIC" : serviceForTimeInQueue() operationName "kafka.deliver" resourceName "$SHARED_TOPIC" spanType "queue" @@ -933,7 +922,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def consumerSpan( TraceAssert trace, - Map config, + Map config, DDSpan parentSpan = null, Range offset = 0..0, boolean tombstone = false, @@ -964,7 +953,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { if (tombstone) { "$InstrumentationTags.TOMBSTONE" true } - if ({isDataStreamsEnabled()}) { + if ({ isDataStreamsEnabled() }) { "$DDTags.PATHWAY_HASH" { String } } defaultTags(distributedRootSpan) @@ -992,12 +981,13 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } } } + def waitForKafkaMetadataUpdate(KafkaTemplate kafkaTemplate) { kafkaTemplate.flush() Producer wrappedProducer = kafkaTemplate.getTheProducer() - assert(wrappedProducer instanceof DefaultKafkaProducerFactory.CloseSafeProducer) + assert (wrappedProducer instanceof DefaultKafkaProducerFactory.CloseSafeProducer) Producer producer = wrappedProducer.delegate - assert(producer instanceof KafkaProducer) + assert (producer instanceof KafkaProducer) String clusterId = producer.metadata.fetch().clusterResource().clusterId() while (clusterId == null || clusterId.isEmpty()) { Thread.sleep(1500) @@ -1005,7 +995,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } return clusterId } - } abstract class KafkaClientForkedTest extends KafkaClientTestBase { @@ -1121,12 +1110,10 @@ abstract class KafkaClientLegacyTracingForkedTest extends KafkaClientTestBase { } } -class KafkaClientLegacyTracingV0ForkedTest extends KafkaClientLegacyTracingForkedTest{ - - +class KafkaClientLegacyTracingV0ForkedTest extends KafkaClientLegacyTracingForkedTest { } -class KafkaClientLegacyTracingV1ForkedTest extends KafkaClientLegacyTracingForkedTest{ +class KafkaClientLegacyTracingV1ForkedTest extends KafkaClientLegacyTracingForkedTest { @Override int version() { diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 9ba0e138840..e5de39c1a25 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -106,7 +106,7 @@ jackson-databind = {module = "com.fasterxml.jackson.core:jackson-databind", vers asm = ["asm", "asmcommons"] cafe-crypto = ["cafe-crypto-curve25519", "cafe-crypto-ed25519"] # Testing -spock = ["spock-core", "spock-junit4", "objenesis"] +spock = ["spock-core", "objenesis"] junit5 = ["junit-jupiter", "junit-jupiter-params"] junit-platform = ["junit-platform-launcher"] mockito = ["mokito-core", "mokito-junit-jupiter", "bytebuddy", "bytebuddyagent"]