Skip to content

Commit 2a8035b

Browse files
author
Robert Diers
committed
custom properties kafka cosumer and producer, extended log delayed sender
1 parent b2d5fe2 commit 2a8035b

9 files changed

Lines changed: 61 additions & 13 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@
1919
| 1.16 | 1.0 | Adding support to change Kafka reconnect backoff (kafka.reconnect.backoff.max.ms=60000, kafka.reconnect.backoff.ms=1000) |
2020
| 1.17 | 1.0 | improved upload output, new GUI to analyze test cases, AWS SDK update |
2121
| 1.18 | 1.0 | feature flag to disable kafka expacted vs actual log, add testid and checkid to this log |
22-
| 1.19 | 1.0 | support parallel execution with bulk trigger |
22+
| 1.19 | 1.0 | support parallel execution with bulk trigger |
23+
| 1.20 | 1.0 | define Kafka consumer and producer properties with custom parameters, print testid with exception for Kafka DelayedSender |

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
</parent>
1212
<groupId>org.opentesting</groupId>
1313
<artifactId>impl_java</artifactId>
14-
<version>1.19</version>
14+
<version>1.20</version>
1515
<name>impl_java</name>
1616
<description>open testing implementation spring boot</description>
1717

src/main/java/org/opentesting/services/adapter/kafka/DelayedSender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ public void run() {
2424
try {
2525
log.info(testid + " " + inject.getInjectid() + " " + inject.getInstanceid() + ": sending delayed record");
2626
producer.send(kafkarecord);
27-
} catch (Exception e) {
28-
log.error("kafkarecord not produced", e);
27+
} catch (Throwable e) {
28+
log.error(testid + " " + inject.getInjectid() + " " + inject.getInstanceid() + ": kafkarecord not send", e);
2929
}
3030
}
3131

src/main/java/org/opentesting/services/adapter/kafka/Kafka.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ private Producer<String, String> getProducer(String testid, TestCaseServiceDTO s
177177
try {
178178
// create new
179179
if (prod == null) {
180-
prod = kafkaProducerFactory.createProducer(service);
180+
prod = kafkaProducerFactory.createProducer(testid, service);
181181
log.info("KafkaProducer created: " + prod.toString());
182182
producers.put(key, prod);
183183
}

src/main/java/org/opentesting/services/adapter/kafka/OpenTestingKafkaConsumer.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
import java.util.HashMap;
44
import java.util.Map;
5+
import java.util.Set;
6+
import java.util.UUID;
57

8+
import org.opentesting.dto.TestCaseCustomParameterDTO;
69
import org.opentesting.dto.TestCaseServiceDTO;
710
import org.opentesting.dto.api1dot0.TestCaseServiceDTOapi1dot0;
811
import org.opentesting.services.adapter.Adapter;
@@ -59,10 +62,17 @@ public void createConsumer(String testid, TestCaseServiceDTO service, Adapter ad
5962
return;
6063
}
6164

62-
//properties
65+
//validation
66+
String groupid = service.getCustom("group.id").getValue();
67+
if (groupid == null || groupid.isEmpty()) {
68+
groupid = UUID.randomUUID().toString();
69+
log.warn(testid+": using group.id="+groupid);
70+
}
71+
72+
//default properties
6373
Map<String, Object> props = new HashMap<>();
6474
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getConnectstring());
65-
props.put(ConsumerConfig.GROUP_ID_CONFIG, service.getCustom("group.id").getValue());
75+
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
6676
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
6777
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
6878
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs);
@@ -71,9 +81,22 @@ public void createConsumer(String testid, TestCaseServiceDTO service, Adapter ad
7181
//user and password
7282
if (service.getUsername() != null && service.getUsername().length() > 0) {
7383
props.put("security.protocol", service.getCustom("security.protocol").getValue());
84+
log.warn(testid+": using security.protocol="+service.getCustom("security.protocol").getValue());
7485
props.put("sasl.mechanism", service.getCustom("sasl.mechanism").getValue());
75-
props.put("sasl.jaas.config", service.getCustom("login.module").getValue() + " required username=\"" + service.getUsername()
76-
+ "\" password=\"" + encryption.decrypt(service.getPassword()) + "\";");
86+
log.warn(testid+": using sasl.mechanism="+service.getCustom("sasl.mechanism").getValue());
87+
String jassconfig_pre = service.getCustom("login.module").getValue() + " required username=\"" + service.getUsername()
88+
+ "\" password=\"";
89+
props.put("sasl.jaas.config", jassconfig_pre + encryption.decrypt(service.getPassword()) + "\";");
90+
log.warn(testid+": using sasl.jaas.config="+jassconfig_pre + "***\";");
91+
}
92+
93+
//override with custom ones
94+
Set<String> kapfaParameters = ConsumerConfig.configNames();
95+
for (TestCaseCustomParameterDTO cust : service.getCustom()) {
96+
if (kapfaParameters.contains(cust.getKey())) {
97+
log.warn(testid+": setting consumer property "+cust.getKey()+"="+cust.getValue());
98+
props.put(cust.getKey(), cust.getValue());
99+
}
77100
}
78101

79102
//factory

src/main/java/org/opentesting/services/adapter/kafka/OpenTestingKafkaProducerFactory.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package org.opentesting.services.adapter.kafka;
22

33
import java.util.Properties;
4+
import java.util.Set;
45

6+
import org.opentesting.dto.TestCaseCustomParameterDTO;
57
import org.opentesting.dto.TestCaseServiceDTO;
68
import org.opentesting.services.encryption.Encryption;
79
import org.opentesting.util.LogExecutionTime;
@@ -14,8 +16,10 @@
1416
import org.springframework.stereotype.Component;
1517

1618
import brave.kafka.clients.KafkaTracing;
19+
import lombok.extern.slf4j.Slf4j;
1720

1821
@Component
22+
@Slf4j
1923
public class OpenTestingKafkaProducerFactory {
2024

2125
@Autowired
@@ -31,8 +35,9 @@ public class OpenTestingKafkaProducerFactory {
3135
private int reconnectBackoffMs;
3236

3337
@LogExecutionTime
34-
public Producer<String,String> createProducer(TestCaseServiceDTO service) {
38+
public Producer<String,String> createProducer(String testid, TestCaseServiceDTO service) {
3539

40+
//default properties
3641
Properties props = new Properties();
3742
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getConnectstring());
3843
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -43,9 +48,22 @@ public Producer<String,String> createProducer(TestCaseServiceDTO service) {
4348
//user and password
4449
if (service.getUsername() != null && service.getUsername().length() > 0) {
4550
props.put("security.protocol", service.getCustom("security.protocol").getValue());
51+
log.warn(testid+": using security.protocol="+service.getCustom("security.protocol").getValue());
4652
props.put("sasl.mechanism", service.getCustom("sasl.mechanism").getValue());
47-
props.put("sasl.jaas.config", service.getCustom("login.module").getValue() + " required username=\"" + service.getUsername()
48-
+ "\" password=\"" + encryption.decrypt(service.getPassword()) + "\";");
53+
log.warn(testid+": using sasl.mechanism="+service.getCustom("sasl.mechanism").getValue());
54+
String jassconfig_pre = service.getCustom("login.module").getValue() + " required username=\"" + service.getUsername()
55+
+ "\" password=\"";
56+
props.put("sasl.jaas.config", jassconfig_pre + encryption.decrypt(service.getPassword()) + "\";");
57+
log.warn(testid+": using sasl.jaas.config="+jassconfig_pre + "***\";");
58+
}
59+
60+
//override with custom ones
61+
Set<String> kapfaParameters = ProducerConfig.configNames();
62+
for (TestCaseCustomParameterDTO cust : service.getCustom()) {
63+
if (kapfaParameters.contains(cust.getKey())) {
64+
log.warn(testid+": setting producer property "+cust.getKey()+"="+cust.getValue());
65+
props.put(cust.getKey(), cust.getValue());
66+
}
4967
}
5068

5169
return kafkaTracing.producer(new KafkaProducer<>(props));

src/main/java/org/opentesting/services/adapter/kafka/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Using consumer and producer to perform inject and check
1313
* please use custom parameter jwtpost, jwtparam and jwtheader to request token, please use #username#, #password# or replacement keys as placeholders (for example #jwtpassword# could be used and will be decrypted automatically)
1414
* please use custom parameter kafkaheaderjwt to define the kafka header key for requested JWT
1515
* please use custom parameter senddelay in ms to define a kafka produce delay, helpful if kafka consumer initialization is to slow (default: 0)
16+
* please use custom parameter for kafka producer properties (like value.serializer)
1617

1718
```
1819
{
@@ -86,6 +87,7 @@ Using consumer and producer to perform inject and check
8687
* injects executed after successful check (please do not create infinite loops)
8788
* please use result2random to transfer JSON result (only JSON!) attributes to random data for later usage
8889
* please use custom parameter stopexpectedlog to stop logging expected vs actual (default: false)
90+
* please use custom parameter for kafka consumer properties (like value.deserializer)
8991

9092
```
9193
{

src/main/java/org/opentesting/services/execution/TestScheduler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,9 @@ private List<String> prepareAdapters(TestCaseDTO test) {
174174
}
175175
} catch (Exception e) {
176176
log.error("could not initialize adapter "+a, e);
177-
warnings.add("WARN "+a.getServicename()+": "+e.getMessage());
177+
String servicename = "";
178+
if (a != null) servicename = a.getServicename();
179+
warnings.add("WARN "+servicename+": "+e.getMessage());
178180
}
179181
}
180182
);

src/test/resources/application.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ kafka:
1616
max:
1717
ms: 60000
1818
ms: 1000
19+
producer:
20+
acks: 1
1921

2022
app:
2123
version: ^project.version^

0 commit comments

Comments
 (0)