@@ -28,7 +28,6 @@ import org.springframework.kafka.test.EmbeddedKafkaBroker
2828import org.springframework.kafka.test.EmbeddedKafkaKraftBroker
2929import org.springframework.kafka.test.utils.ContainerTestUtils
3030import org.springframework.kafka.test.utils.KafkaTestUtils
31- import spock.lang.Shared
3231
3332import java.util.concurrent.ExecutionException
3433import java.util.concurrent.Future
@@ -44,15 +43,16 @@ import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPro
4443abstract class KafkaClientTestBase extends VersionedNamingTestBase {
4544 static final SHARED_TOPIC = " shared.topic"
4645
47- @Shared
4846 EmbeddedKafkaBroker embeddedKafka
4947
50- def setupSpec () {
48+ def setup () {
5149 embeddedKafka = new EmbeddedKafkaKraftBroker (1 , 2 , SHARED_TOPIC )
5250 embeddedKafka. afterPropertiesSet()
51+
52+ TEST_WRITER . setFilter(dropKafkaPoll)
5353 }
5454
55- def cleanupSpec () {
55+ def cleanup () {
5656 embeddedKafka. destroy()
5757 }
5858
@@ -117,10 +117,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
117117 PRODUCER_PATHWAY_EDGE_TAGS . put(" type" , " kafka" )
118118 }
119119
120- def setup () {
121- TEST_WRITER . setFilter(dropKafkaPoll)
122- }
123-
124120 @Override
125121 int version () {
126122 0
@@ -214,7 +210,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
214210 // wait for produce offset 0, commit offset 0 on partition 0 and 1, and commit offset 1 on 1 partition
215211 // TODO
216212 TEST_DATA_STREAMS_WRITER . waitForBacklogs(2 )
217-
218213 }
219214
220215 then :
@@ -513,7 +508,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
513508 cleanup :
514509 producerFactory. stop()
515510 container?. stop()
516-
517511 }
518512
519513 def "test records (TopicPartition ) kafka consume" () {
@@ -545,7 +539,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
545539 }
546540
547541 then:
548- recs.hasNext()
542+ ! recs.hasNext()
549543 first.value() == greeting
550544 first.key() == null
551545
@@ -568,8 +562,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
568562 cleanup:
569563 consumer.close()
570564 producer.close()
571-
572-
573565 }
574566
575567 def " test records(TopicPartition ). subList kafka consume" () {
@@ -604,7 +596,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
604596 }
605597
606598 then:
607- recs.hasNext()
599+ ! recs.hasNext()
608600 first.value() == greeting
609601 first.key() == null
610602
@@ -627,7 +619,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
627619 cleanup:
628620 consumer.close()
629621 producer.close()
630-
631622 }
632623
633624 def " test records(TopicPartition ). forEach kafka consume" () {
@@ -685,7 +676,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
685676 cleanup:
686677 consumer.close()
687678 producer.close()
688-
689679 }
690680
691681 def " test iteration backwards over ConsumerRecords " () {
@@ -796,7 +786,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
796786 cleanup:
797787 consumer.close()
798788 producer.close()
799-
800789 }
801790
802791 def " test kafka client header propagation manual config" () {
@@ -808,7 +797,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
808797 def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
809798 def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
810799
811-
812800 // create a Kafka consumer factory
813801 def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
814802
@@ -1007,7 +995,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
1007995 }
1008996 return clusterId
1009997 }
1010-
1011998}
1012999
10131000abstract class KafkaClientForkedTest extends KafkaClientTestBase {
0 commit comments