Skip to content

Commit 74a04e5

Browse files
committed
Use BlockingQueues instead of CyclicBarrier.
1 parent 97f6ada commit 74a04e5

File tree

1 file changed

+42
-52
lines changed

1 file changed

+42
-52
lines changed
Lines changed: 42 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2019 the original author or authors.
2+
* Copyright 2013-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,16 +19,16 @@
1919

2020
import java.util.HashMap;
2121
import java.util.Map;
22-
import java.util.concurrent.BrokenBarrierException;
22+
import java.util.concurrent.ArrayBlockingQueue;
23+
import java.util.concurrent.BlockingQueue;
2324
import java.util.concurrent.ConcurrentHashMap;
24-
import java.util.concurrent.CyclicBarrier;
2525
import java.util.concurrent.TimeUnit;
2626

2727
import javax.annotation.Nullable;
2828

29+
import org.apache.commons.logging.Log;
30+
import org.apache.commons.logging.LogFactory;
2931
import org.apache.kafka.clients.consumer.ConsumerRecord;
30-
import org.slf4j.Logger;
31-
import org.slf4j.LoggerFactory;
3232
import org.testcontainers.containers.KafkaContainer;
3333
import org.testcontainers.junit.jupiter.Container;
3434
import org.testcontainers.junit.jupiter.Testcontainers;
@@ -52,7 +52,7 @@
5252
import org.springframework.test.context.DynamicPropertyRegistry;
5353
import org.springframework.test.context.DynamicPropertySource;
5454

55-
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = {TestConfig.class, Application.class})
55+
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = {BaseClass.TestConfig.class, Application.class})
5656
@Testcontainers
5757
@AutoConfigureMessageVerifier
5858
@ActiveProfiles("test")
@@ -75,69 +75,59 @@ public void trigger() {
7575
this.controller.sendFoo("example");
7676

7777
}
78-
}
7978

8079

81-
@EnableKafka
82-
@Configuration
83-
class TestConfig {
80+
@EnableKafka
81+
@Configuration
82+
static class TestConfig {
83+
84+
@Bean
85+
KafkaMessageVerifier kafkaTemplateMessageVerifier() {
86+
return new KafkaMessageVerifier();
87+
}
8488

85-
@Bean
86-
KafkaMessageVerifier kafkaTemplateMessageVerifier() {
87-
return new KafkaMessageVerifier();
8889
}
8990

90-
}
9191

92-
class KafkaMessageVerifier implements MessageVerifierReceiver<Message<?>> {
92+
static class KafkaMessageVerifier implements MessageVerifierReceiver<Message<?>> {
9393

94-
private static final Logger log = LoggerFactory.getLogger(KafkaMessageVerifier.class);
94+
private static final Log LOG = LogFactory.getLog(KafkaMessageVerifier.class);
9595

96-
private final Map<String, Message> broker = new ConcurrentHashMap<>();
96+
Map<String, BlockingQueue<Message<?>>> broker = new ConcurrentHashMap<>();
9797

98-
private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
9998

100-
@Override
101-
public Message receive(String destination, long timeout, TimeUnit timeUnit, @Nullable YamlContract contract) {
102-
Message message = message(destination);
103-
if (message != null) {
99+
@Override
100+
public Message receive(String destination, long timeout, TimeUnit timeUnit, @Nullable YamlContract contract) {
101+
broker.putIfAbsent(destination, new ArrayBlockingQueue<>(1));
102+
BlockingQueue<Message<?>> messageQueue = broker.get(destination);
103+
Message<?> message;
104+
try {
105+
message = messageQueue.poll(timeout, timeUnit);
106+
}
107+
catch (InterruptedException e) {
108+
throw new RuntimeException(e);
109+
}
110+
if (message != null) {
111+
LOG.info("Removed a message from a topic [" + destination + "]");
112+
}
104113
return message;
105114
}
106-
await(timeout, timeUnit);
107-
return message(destination);
108-
}
109115

110-
private void await(long timeout, TimeUnit timeUnit) {
111-
try {
112-
cyclicBarrier.await(timeout, timeUnit);
113-
}
114-
catch (Exception e) {
115116

117+
@KafkaListener(id = "baristaContractTestListener", topicPattern = ".*")
118+
public void listen(ConsumerRecord payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
119+
LOG.info("Got a message from a topic [" + topic + "]");
120+
Map<String, Object> headers = new HashMap<>();
121+
new DefaultKafkaHeaderMapper().toHeaders(payload.headers(), headers);
122+
broker.putIfAbsent(topic, new ArrayBlockingQueue<>(1));
123+
BlockingQueue<Message<?>> messageQueue = broker.get(topic);
124+
messageQueue.add(MessageBuilder.createMessage(payload.value(), new MessageHeaders(headers)));
116125
}
117-
}
118126

119-
private Message message(String destination) {
120-
Message message = broker.get(destination);
121-
if (message != null) {
122-
broker.remove(destination);
123-
log.info("Removed a message from a topic [" + destination + "]");
127+
@Override
128+
public Message receive(String destination, YamlContract contract) {
129+
return receive(destination, 15, TimeUnit.SECONDS, contract);
124130
}
125-
return message;
126-
}
127131

128-
@KafkaListener(id = "listener", topicPattern = ".*")
129-
public void listen(ConsumerRecord payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws BrokenBarrierException, InterruptedException {
130-
log.info("Got a message from a topic [" + topic + "]");
131-
Map<String, Object> headers = new HashMap<>();
132-
new DefaultKafkaHeaderMapper().toHeaders(payload.headers(), headers);
133-
broker.put(topic, MessageBuilder.createMessage(payload.value(), new MessageHeaders(headers)));
134-
cyclicBarrier.await();
135-
cyclicBarrier.reset();
136132
}
137-
138-
@Override
139-
public Message receive(String destination, YamlContract contract) {
140-
return receive(destination, 5, TimeUnit.SECONDS, contract);
141-
}
142-
143133
}

0 commit comments

Comments
 (0)