Skip to content

Commit 4de18db

Browse files
author
Robert Diers
committed
test case specific kafka consumers
1 parent cd1c003 commit 4de18db

6 files changed

Lines changed: 71 additions & 10 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Container: https://github.com/opentestingapi/impl_java/pkgs/container/opentest
1717
| 1.8 | 1.0 | data generator endpoint to create mass data from testcases |
1818
| 1.9 | 1.0 | dependency upgrades, CVE fix: https://github.com/aws/aws-sdk-java/security/advisories/GHSA-c28r-hw5m-5gv3 |
1919
| 1.10 | 1.0 | endpoint to export all test cases |
20+
| 1.11 | 1.0 | Kafka consumers not shared between test cases, they have to use different group.id (recreated with every upload) |
2021

2122
## Architecture
2223

src/main/java/org/opentesting/services/adapter/kafka/Kafka.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ public void createRequiredComponents(TestCaseDTO test) {
196196
// create required Kafka consumer
197197
for (TestCaseCheckDTO check : test.getChecks()) {
198198
if (getAllServicenames().contains(check.getService().getType()) && check.isActive()) {
199-
kafkaConsumerFactory.createConsumer(check.getService(), this);
199+
kafkaConsumerFactory.closeConsumer(test.getId(), check.getService(), this);
200+
kafkaConsumerFactory.createConsumer(test.getId(), check.getService(), this);
200201
}
201202
}
202203

src/main/java/org/opentesting/services/adapter/kafka/OpenTestingKafkaConsumer.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.Map;
55

66
import org.opentesting.dto.TestCaseServiceDTO;
7+
import org.opentesting.dto.api1dot0.TestCaseServiceDTOapi1dot0;
78
import org.opentesting.services.adapter.Adapter;
89
import org.opentesting.services.encryption.Encryption;
910
import org.opentesting.services.execution.TestCheck;
@@ -42,10 +43,10 @@ public class OpenTestingKafkaConsumer {
4243
private Map<String, KafkaMessageListenerContainer<String, String>> createdConsumers = new HashMap<>();
4344

4445
@LogExecutionTime
45-
public void createConsumer(TestCaseServiceDTO service, Adapter adapter) {
46+
public void createConsumer(String testid, TestCaseServiceDTO service, Adapter adapter) {
4647

47-
//shared between test cases
48-
String key = service.getConnectstring()+"#"+service.getCustom("topic").getValue()+"#"+service.getUsername()+"#"+service.getCustom("group.id").getValue();
48+
//we will not share between test ids, they need to use different group.ids
49+
String key = createUniqueKey(testid, service);
4950
if (createdConsumers.containsKey(key)) {
5051
log.info("kafka consumer already exists: "+key);
5152
return;
@@ -71,7 +72,7 @@ public void createConsumer(TestCaseServiceDTO service, Adapter adapter) {
7172

7273
//consumer container
7374
ContainerProperties containerProps = new ContainerProperties(service.getCustom("topic").getValue());
74-
containerProps.setMessageListener(new OpenTestingKafkaMessageListener(service, testCheck, key, prometheus, adapter, kafkaTrace));
75+
containerProps.setMessageListener(new OpenTestingKafkaMessageListener(testid, service, testCheck, key, prometheus, adapter, kafkaTrace));
7576
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(fact, containerProps);
7677

7778
//start container
@@ -91,6 +92,40 @@ public void createConsumer(TestCaseServiceDTO service, Adapter adapter) {
9192
log.info("KafkaConsumer created: "+key);
9293
}
9394

95+
/**
96+
* close consumer
97+
* @param testid
98+
* @param service
99+
* @param kafka
100+
*/
101+
@LogExecutionTime
102+
public void closeConsumer(String testid, TestCaseServiceDTOapi1dot0 service, Kafka kafka) {
103+
String key = createUniqueKey(testid, service);
104+
if (createdConsumers.containsKey(key)) {
105+
log.info("closing kafka consumer: "+key);
106+
createdConsumers.get(key).destroy();
107+
createdConsumers.remove(key);
108+
}
109+
}
110+
111+
/**
112+
* close consumer
113+
* @param testid
114+
*/
115+
@LogExecutionTime
116+
public void closeConsumer(String testid) {
117+
for (String key : createdConsumers.keySet()) {
118+
if (key.startsWith(testid+"#")) {
119+
log.info("closing kafka consumer: "+key);
120+
createdConsumers.get(key).destroy();
121+
}
122+
}
123+
}
124+
125+
private String createUniqueKey(String testid, TestCaseServiceDTO service) {
126+
return testid+"#"+service.getConnectstring()+"#"+service.getCustom("topic").getValue()+"#"+service.getUsername()+"#"+service.getCustom("group.id").getValue();
127+
}
128+
94129
/**
95130
* stop consumers
96131
*/
@@ -125,6 +160,6 @@ public void startConsumers() {
125160
}
126161
}
127162
);
128-
}
163+
}
129164

130165
}

src/main/java/org/opentesting/services/adapter/kafka/OpenTestingKafkaMessageListener.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,17 @@ public class OpenTestingKafkaMessageListener implements MessageListener<String,
2727

2828
private static final String TOPIC = "topic";
2929

30+
private String testid;
3031
private TestCaseServiceDTO service;
3132
private TestCheck testCheck;
3233
private String key;
3334
private Prometheus prometheus;
3435
private Adapter adapter;
3536
private KafkaTrace kafkatrace;
3637

37-
public OpenTestingKafkaMessageListener(TestCaseServiceDTO service, TestCheck testCheck, String key,
38+
public OpenTestingKafkaMessageListener(String testid, TestCaseServiceDTO service, TestCheck testCheck, String key,
3839
Prometheus prometheus, Adapter adapter, KafkaTrace kafkatrace) {
40+
this.testid = testid;
3941
this.service = service;
4042
this.testCheck = testCheck;
4143
this.key = key;
@@ -49,7 +51,7 @@ public OpenTestingKafkaMessageListener(TestCaseServiceDTO service, TestCheck tes
4951
public void onMessage(ConsumerRecord<String, String> data) {
5052

5153
//read checks
52-
List<TestCaseCheckDTO> checks = getChecksByServiceAndConnectstringAndOpen(service);
54+
List<TestCaseCheckDTO> checks = getChecksByServiceAndConnectstringAndOpen(testid, service);
5355

5456
//extract Trace info
5557
final TraceDTO trace = kafkatrace.extractTraceInfo(data);
@@ -81,14 +83,14 @@ public void onMessage(ConsumerRecord<String, String> data) {
8183
* @throws DataAccessException
8284
*/
8385
@LogExecutionTime
84-
public List<TestCaseCheckDTO> getChecksByServiceAndConnectstringAndOpen(TestCaseServiceDTO service) {
86+
public List<TestCaseCheckDTO> getChecksByServiceAndConnectstringAndOpen(String testid, TestCaseServiceDTO service) {
8587

8688
List<TestCaseCheckDTO> openKafkaChecks = new ArrayList<>();
8789
String topic = service.getCustom(TOPIC).getValue();
8890

8991
//filter by topic
9092
try {
91-
for (TestCaseCheckDTO check : testCheck.getOpenChecks(service)) {
93+
for (TestCaseCheckDTO check : testCheck.getOpenChecks(testid, service)) {
9294
if (check.getService().getCustom(TOPIC).getValue().equals(topic)) {
9395
openKafkaChecks.add(check);
9496
}

src/main/java/org/opentesting/services/execution/TestCheck.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,25 @@ public List<TestCaseCheckDTO> getOpenChecks(TestCaseServiceDTO service) throws D
223223
return getOpenChecks(service.getType());
224224
}
225225

226+
/**
227+
* read all open checks for a defined type
228+
*/
229+
public List<TestCaseCheckDTO> getOpenChecks(String testid, TestCaseServiceDTO service) throws DataAccessException {
230+
return getOpenChecks(testid, service.getType());
231+
}
232+
226233
/**
227234
* read all open checks for a defined type
228235
*/
229236
public List<TestCaseCheckDTO> getOpenChecks(String service) throws DataAccessException {
230237
return checkRepository.selectByOpentype(service);
231238
}
239+
240+
/**
241+
* read all open checks for a defined type
242+
*/
243+
public List<TestCaseCheckDTO> getOpenChecks(String testid, String service) throws DataAccessException {
244+
return checkRepository.selectByTestAndOpentype(testid, service);
245+
}
232246

233247
}

src/main/java/org/opentesting/services/execution/TestManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.opentesting.persistence.IndexRepository;
66
import org.opentesting.services.adapter.Adapter;
77
import org.opentesting.services.adapter.AdapterResolver;
8+
import org.opentesting.services.adapter.kafka.OpenTestingKafkaConsumer;
89
import org.opentesting.services.pause.Pause;
910
import org.opentesting.services.scheduler.AsynTaskExec;
1011
import org.opentesting.services.scheduler.TimerFactory;
@@ -41,6 +42,9 @@ public class TestManager extends TestAbstractHandler {
4142
@Autowired
4243
private TestCheck testCheck;
4344

45+
@Autowired
46+
private OpenTestingKafkaConsumer openTestingKafkaConsumer;
47+
4448
/**
4549
* adapters might need timers
4650
*/
@@ -61,6 +65,10 @@ public void scheduleAdapterTimers() {
6165
*/
6266
@LogExecutionTime
6367
public String removeTest(String testid) {
68+
69+
//remove Kafka consumer
70+
openTestingKafkaConsumer.closeConsumer(testid);
71+
6472
StringBuilder st = new StringBuilder();
6573
indexRepository.deleteByValue(IndexRepository.LABELINDEX, testid);
6674
st.append("index deleted");

0 commit comments

Comments
 (0)