Skip to content

Commit b4e35c8

Browse files
author
Robert Diers
committed
log fix
1 parent 5fd4b78 commit b4e35c8

12 files changed

Lines changed: 137 additions & 17 deletions

File tree

src/main/java/org/opentesting/services/adapter/kafka/DelayedSender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ public class DelayedSender implements Runnable {
1616

1717
private String testid;
1818
private TestCaseInjectionDTO inject;
19-
private Producer<String, String> producer;
20-
private ProducerRecord<String, String> kafkarecord;
19+
private Producer<Object, Object> producer;
20+
private ProducerRecord<Object, Object> kafkarecord;
2121

2222
@Override
2323
public void run() {

src/main/java/org/opentesting/services/adapter/kafka/Kafka.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class Kafka extends Adapter {
4444
private TaskScheduler taskScheduler;
4545

4646
// producer cache
47-
private HashMap<String, Producer<String, String>> producers = new HashMap<>();
47+
private HashMap<String, Producer<Object, Object>> producers = new HashMap<>();
4848

4949
@Override
5050
public String getServicename() {
@@ -55,7 +55,7 @@ public String getServicename() {
5555
@LogExecutionTime
5656
public boolean inject(String testid, TestCaseInjectionDTO inject) {
5757

58-
Producer<String, String> producer = null;
58+
Producer<Object, Object> producer = null;
5959
String connectstr = "";
6060
try {
6161

@@ -81,7 +81,7 @@ public boolean inject(String testid, TestCaseInjectionDTO inject) {
8181
String content = this.getFileAndAddTestData(testid, inject.getSourcefile(), inject.getRandomdata());
8282

8383
// create record and add records
84-
ProducerRecord<String, String> kafkarecord = new ProducerRecord<>(
84+
ProducerRecord<Object, Object> kafkarecord = new ProducerRecord<>(
8585
inject.getService().getCustom("topic").getValue(), UUID.randomUUID().toString(), content);
8686
headerMap.entrySet().forEach(es -> kafkarecord.headers().add(es.getKey(), es.getValue().getBytes()));
8787

@@ -160,12 +160,12 @@ public boolean check(String testid, TestCaseCheckDTO check, Object... args) {
160160
}
161161

162162
@LogExecutionTime
163-
private Producer<String, String> getProducer(String testid, TestCaseServiceDTO service, String connectstr)
163+
private Producer<Object, Object> getProducer(String testid, TestCaseServiceDTO service, String connectstr)
164164
throws ConnectFailedException {
165165

166166
// no reuse because of different password configurations
167167
String key = this.createConnectionKey(testid, connectstr, service.getUsername(), service.getPassword());
168-
Producer<String, String> prod = producers.get(key);
168+
Producer<Object, Object> prod = producers.get(key);
169169

170170
// block ones
171171
if (this.isFailedConnector(key)) {
@@ -222,8 +222,8 @@ public void createRequiredComponents(TestCaseDTO test) {
222222

223223
// remove blocked connections and existing producers
224224
this.removeFailedConnectorStartingWith(test.getId());
225-
HashMap<String, Producer<String, String>> newProducers = new HashMap<>();
226-
for (Map.Entry<String,Producer<String, String>> entry : producers.entrySet()) {
225+
HashMap<String, Producer<Object, Object>> newProducers = new HashMap<>();
226+
for (Map.Entry<String,Producer<Object, Object>> entry : producers.entrySet()) {
227227
if (entry.getKey().startsWith(test.getId())) {
228228
entry.getValue().close();
229229
log.info("closed: " + entry.getValue().toString());

src/main/java/org/opentesting/services/adapter/kafka/KafkaTrace.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ private String getHeader(ConsumerRecord<String, String> data, String header, boo
107107
* add trace info to record
108108
* @param kafkarecord kafka record
109109
*/
110-
public void generateTraceInfo(ProducerRecord<String, String> kafkarecord) {
110+
public void generateTraceInfo(ProducerRecord<Object, Object> kafkarecord) {
111111

112112
// add trace information
113113
Span span = tracer.currentSpan();

src/main/java/org/opentesting/services/adapter/kafka/OpenTestingKafkaConsumer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class OpenTestingKafkaConsumer {
5050
@Value("${kafka.reconnect.backoff.ms}")
5151
private int reconnectBackoffMs;
5252

53+
//we need to handle everything as a string to do simple string checks
5354
private Map<String, KafkaMessageListenerContainer<String, String>> createdConsumers = new HashMap<>();
5455

5556
@LogExecutionTime

src/main/java/org/opentesting/services/adapter/kafka/OpenTestingKafkaMessageListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void onMessage(ConsumerRecord<String, String> data) {
6161
try {
6262
return testCheck.executeCheck(c, false, adapter, trace, data.value(), Long.valueOf(data.timestamp()));
6363
} catch (Exception e) {
64-
log.error("Kafka - check execution exception: "+e.getMessage(), e);
64+
log.error(testid+": Kafka - check execution exception: "+e.getMessage(), e);
6565
return null;
6666
}
6767
}).collect(Collectors.toList());

src/main/java/org/opentesting/services/adapter/kafka/OpenTestingKafkaProducerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class OpenTestingKafkaProducerFactory {
3535
private int reconnectBackoffMs;
3636

3737
@LogExecutionTime
38-
public Producer<String,String> createProducer(String testid, TestCaseServiceDTO service) {
38+
public Producer<Object,Object> createProducer(String testid, TestCaseServiceDTO service) {
3939

4040
//default properties
4141
Properties props = new Properties();

src/main/java/org/opentesting/services/adapter/kafka/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Using consumer and producer to perform inject and check
55
### Configuration Inject
66

77
* please use sourcefile to store message to send
8-
* StringSerializer is default
8+
* StringSerializer is default, should be fine as we send file content directly
99
* please use empty user and password for unprotected topics
1010
* random data replacement processed for: sourcefile
1111

@@ -23,7 +23,7 @@ Using consumer and producer to perform inject and check
2323
| jwtpassword | might be used to specify the password in dedicated field (optional) | myPossibillyEncryptedPassword |
2424
| kafkaheaderjwt | define the kafka header key for requested JWT (optional) | accessToken |
2525
| senddelay | milliseconds to delay the send, helpful if kafka consumer initialization is to slow (default: not used) | 15000 |
26-
| value.serializer | you can use any of the available Kafka producer properties (optional) | org.apache.kafka.common.serialization.StringSerializer |
26+
| producer property | you can use any of the available Kafka producer properties (optional) | see link |
2727

2828
https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
2929

@@ -90,7 +90,7 @@ https://docs.confluent.io/platform/current/installation/configuration/producer-c
9090
### Configuration Check
9191

9292
* validation 'request' not required
93-
* StringDeserializer is default
93+
* StringDeserializer is default, should be fine as we do only String validations
9494
* please use validation 'response' to define expected JSON result(s)
9595
* expectedtype could be 'equals', 'contains', 'containsnot', 'containsoneof'
9696
* random data replacement processed for: 'response'
@@ -106,7 +106,7 @@ https://docs.confluent.io/platform/current/installation/configuration/producer-c
106106
| sasl.mechanism | sasl mechanism used in combination with user and password (optional) | SCRAM-SHA-512 |
107107
| login.module | login module used in combination with user and password (optional) | org.apache.kafka.common.security.scram.ScramLoginModule |
108108
| stopexpectedlog | hide check compare log actual vs expected (required for high load topics), default = false | true |
109-
| value.deserializer | you can use any of the available Kafka consumer properties (optional) | org.apache.kafka.common.serialization.StringDeserializer |
109+
| consumer property | you can use any of the available Kafka consumer properties (optional) | see link |
110110

111111
https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
112112

src/test/resources/opentesting/systemtests/kafka_001/test.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
},
8989
{
9090
"key": "group.id",
91-
"value": "e2etests"
91+
"value": "kafka_001.check-kafka-1.group"
9292
}
9393
]
9494
},
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
#replaceme1#
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"hallo": "#replaceme1#",
3+
"welt": "#replaceme2#"
4+
}

0 commit comments

Comments
 (0)