@@ -5,17 +5,17 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
55import org.apache.kafka.clients.producer.ProducerRecord
66import org.apache.kafka.common.header.Headers
77import org.apache.kafka.common.header.internals.RecordHeaders
8- import org.junit.Rule
98import org.springframework.kafka.core.DefaultKafkaConsumerFactory
109import org.springframework.kafka.core.DefaultKafkaProducerFactory
1110import org.springframework.kafka.core.KafkaTemplate
1211import org.springframework.kafka.listener.ContainerProperties
1312import org.springframework.kafka.listener.KafkaMessageListenerContainer
1413import org.springframework.kafka.listener.MessageListener
1514import org.springframework.kafka.test.EmbeddedKafkaBroker
16- import org.springframework.kafka.test.rule.EmbeddedKafkaRule
15+ import org.springframework.kafka.test.EmbeddedKafkaKraftBroker
1716import org.springframework.kafka.test.utils.ContainerTestUtils
1817import org.springframework.kafka.test.utils.KafkaTestUtils
18+ import spock.lang.Shared
1919
2020import java.util.concurrent.LinkedBlockingQueue
2121import java.util.concurrent.TimeUnit
@@ -29,9 +29,17 @@ class KafkaClientCustomPropagationConfigTest extends InstrumentationSpecificatio
2929 static final SHARED_TOPIC = [" topic1" , " topic2" , " topic3" , " topic4" ]
3030 static final MESSAGE = " Testing without headers for certain topics"
3131
32- @Rule
33- EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule (1 , true , SHARED_TOPIC . toArray(String []::new ))
34- EmbeddedKafkaBroker embeddedKafka = kafkaRule. embeddedKafka
32+ @Shared
33+ EmbeddedKafkaBroker embeddedKafka
34+
35+ def setupSpec () {
36+ embeddedKafka = new EmbeddedKafkaKraftBroker (1 , 2 , * SHARED_TOPIC )
37+ embeddedKafka. afterPropertiesSet()
38+ }
39+
40+ def cleanupSpec () {
41+ embeddedKafka. destroy()
42+ }
3543
3644 static final dataTable () {
3745 [
@@ -91,36 +99,36 @@ class KafkaClientCustomPropagationConfigTest extends InstrumentationSpecificatio
9199
92100 // setup a Kafka message listener
93101 container1. setupMessageListener(new MessageListener<String , String > () {
94- @Override
95- void onMessage (ConsumerRecord<String , String > record ) {
96- TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
97- records1. add(record)
98- }
99- })
102+ @Override
103+ void onMessage (ConsumerRecord<String , String > record ) {
104+ TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
105+ records1. add(record)
106+ }
107+ })
100108
101109 container2. setupMessageListener(new MessageListener<String , String > () {
102- @Override
103- void onMessage (ConsumerRecord<String , String > record ) {
104- TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
105- records2. add(record)
106- }
107- })
110+ @Override
111+ void onMessage (ConsumerRecord<String , String > record ) {
112+ TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
113+ records2. add(record)
114+ }
115+ })
108116
109117 container3. setupMessageListener(new MessageListener<String , String > () {
110- @Override
111- void onMessage (ConsumerRecord<String , String > record ) {
112- TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
113- records3. add(record)
114- }
115- })
118+ @Override
119+ void onMessage (ConsumerRecord<String , String > record ) {
120+ TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
121+ records3. add(record)
122+ }
123+ })
116124
117125 container4. setupMessageListener(new MessageListener<String , String > () {
118- @Override
119- void onMessage (ConsumerRecord<String , String > record ) {
120- TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
121- records4. add(record)
122- }
123- })
126+ @Override
127+ void onMessage (ConsumerRecord<String , String > record ) {
128+ TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
129+ records4. add(record)
130+ }
131+ })
124132
125133 // start the container and underlying message listener
126134 container1. start()
@@ -195,36 +203,36 @@ class KafkaClientCustomPropagationConfigTest extends InstrumentationSpecificatio
195203
196204 // setup a Kafka message listener
197205 container1. setupMessageListener(new MessageListener<String , String > () {
198- @Override
199- void onMessage (ConsumerRecord<String , String > record ) {
200- TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
201- records1. add(activeSpan())
202- }
203- })
206+ @Override
207+ void onMessage (ConsumerRecord<String , String > record ) {
208+ TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
209+ records1. add(activeSpan())
210+ }
211+ })
204212
205213 container2. setupMessageListener(new MessageListener<String , String > () {
206- @Override
207- void onMessage (ConsumerRecord<String , String > record ) {
208- TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
209- records2. add(activeSpan())
210- }
211- })
214+ @Override
215+ void onMessage (ConsumerRecord<String , String > record ) {
216+ TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
217+ records2. add(activeSpan())
218+ }
219+ })
212220
213221 container3. setupMessageListener(new MessageListener<String , String > () {
214- @Override
215- void onMessage (ConsumerRecord<String , String > record ) {
216- TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
217- records3. add(activeSpan())
218- }
219- })
222+ @Override
223+ void onMessage (ConsumerRecord<String , String > record ) {
224+ TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
225+ records3. add(activeSpan())
226+ }
227+ })
220228
221229 container4. setupMessageListener(new MessageListener<String , String > () {
222- @Override
223- void onMessage (ConsumerRecord<String , String > record ) {
224- TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
225- records4. add(activeSpan())
226- }
227- })
230+ @Override
231+ void onMessage (ConsumerRecord<String , String > record ) {
232+ TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
233+ records4. add(activeSpan())
234+ }
235+ })
228236
229237 // start the container and underlying message listener
230238 container1. start()
@@ -245,12 +253,12 @@ class KafkaClientCustomPropagationConfigTest extends InstrumentationSpecificatio
245253 activateSpan(span). withCloseable {
246254 for (String topic : SHARED_TOPIC ) {
247255 ProducerRecord record = new ProducerRecord<> (
248- topic,
249- 0 ,
250- null ,
251- MESSAGE ,
252- header
253- )
256+ topic,
257+ 0 ,
258+ null ,
259+ MESSAGE ,
260+ header
261+ )
254262 kafkaTemplate. send(record as ProducerRecord<String , String > )
255263 }
256264 }
@@ -291,9 +299,7 @@ class KafkaClientCustomPropagationConfigTest extends InstrumentationSpecificatio
291299 container3?. stop()
292300 container4?. stop()
293301
294-
295302 where :
296303 [value, expected1, expected2, expected3, expected4]<< dataTable()
297304 }
298-
299305}
0 commit comments