Skip to content

Commit 3700ff2

Browse files
committed
issue-657 during consume recordType JSON with alpha key issue(potential fix)
1 parent 569a8cd commit 3700ff2

File tree

8 files changed

+310
-0
lines changed

8 files changed

+310
-0
lines changed

core/src/main/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelper.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.jsmart.zerocode.core.kafka.helper;
22

3+
import com.fasterxml.jackson.core.JacksonException;
34
import com.fasterxml.jackson.core.JsonProcessingException;
45
import com.fasterxml.jackson.databind.JsonNode;
56
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -304,6 +305,14 @@ public static void readJson(List<ConsumerJsonRecord> jsonRecords,
304305
LOGGER.debug("\nRecord Key - {} , Record value - {}, Record partition - {}, Record offset - {}, Headers - {}",
305306
key, valueStr, thisRecord.partition(), thisRecord.offset(), headers);
306307

308+
if (!isKeyParseableAsJson(keyStr)) {
309+
LOGGER.info(">>>Converting the key to JSON format for to able to read it");
310+
// Most cases a bare string is used as key, not really a JSON.
311+
// Hence, convert the key to JSON equivalent string for the framework able to
312+
// read the already consumed key for display and assertion purpose.
313+
keyStr = objectMapper.writeValueAsString(keyStr);
314+
}
315+
307316
JsonNode keyNode = objectMapper.readTree(keyStr);
308317
JsonNode valueNode = objectMapper.readTree(valueStr);
309318

@@ -319,6 +328,17 @@ public static void readJson(List<ConsumerJsonRecord> jsonRecords,
319328
}
320329
}
321330

331+
public static boolean isKeyParseableAsJson(String consumedKey) {
332+
try {
333+
objectMapper.readTree(consumedKey);
334+
} catch (JacksonException e) {
335+
LOGGER.info(">>>The key was not in a parsable JSON format:{}", consumedKey);
336+
return false;
337+
}
338+
LOGGER.info(">>> The consumed key was fine and parseable:{}", consumedKey);
339+
return true;
340+
}
341+
322342
private static String convertProtobufToJson(ConsumerRecord thisRecord, ConsumerLocalConfigs consumerLocalConfig) {
323343
if (org.apache.commons.lang3.StringUtils.isEmpty(consumerLocalConfig.getProtoClassType())) {
324344
throw new IllegalArgumentException(

kafka-testing/src/test/java/org/jsmart/zerocode/integration/tests/kafka/KafkaSuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.jsmart.zerocode.integration.tests.kafka.consume.latest.KafkaConsumeLatestExistingTopicTest;
1313
import org.jsmart.zerocode.integration.tests.kafka.consume.latest.KafkaConsumeLatestTest;
1414
import org.jsmart.zerocode.integration.tests.kafka.consume.negative.KafkaConsumeAvroNegativeTest;
15+
import org.jsmart.zerocode.integration.tests.kafka.consume.stringkey.KafkaConsumeJsonAlphaStringKeyTest;
1516
import org.jsmart.zerocode.integration.tests.kafka.produce.KafkaProduceAsyncTest;
1617
import org.jsmart.zerocode.integration.tests.kafka.produce.KafkaProduceIntKeyTest;
1718
import org.jsmart.zerocode.integration.tests.kafka.produce.KafkaProduceJsonTest;
@@ -48,6 +49,7 @@
4849
KafkaConsumeJsonTest.class,
4950
KafkaProduceIntKeyTest.class,
5051
KafkaConsumeIntKeyTest.class,
52+
KafkaConsumeJsonAlphaStringKeyTest.class,
5153
KafkaConsumeAvroTest.class,
5254
KafkaConsumeAvroNegativeTest.class,
5355
KafkaProduceConsumeAvroTest.class,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.jsmart.zerocode.integration.tests.kafka.consume.stringkey;
2+
3+
import org.jsmart.zerocode.core.domain.Scenario;
4+
import org.jsmart.zerocode.core.domain.TargetEnv;
5+
import org.jsmart.zerocode.core.runner.ZeroCodeUnitRunner;
6+
import org.junit.Ignore;
7+
import org.junit.Test;
8+
import org.junit.runner.RunWith;
9+
10+
@TargetEnv("kafka_servers/kafka_test_server.properties")
11+
@RunWith(ZeroCodeUnitRunner.class)
12+
public class KafkaConsumeJsonAlphaStringKeyTest {
13+
14+
@Test
15+
@Scenario("kafka/consume/stringkey/test_kafka_produce_consume_json_string_key.json")
16+
public void testKafkaConsumeKey_AlphaNumericKey() throws Exception {
17+
}
18+
19+
@Test
20+
@Scenario("kafka/consume/stringkey/test_kafka_produce_consume_json_numberasstring_key.json")
21+
public void testKafkaConsumeKey_NumberAsStringKey() throws Exception {
22+
}
23+
24+
@Test
25+
@Scenario("kafka/consume/stringkey/test_kafka_produce_consume_jsonobject_as_string_key.json")
26+
public void testKafkaConsumeKey_JsonObjectAsStringKey() throws Exception {
27+
}
28+
29+
@Test
30+
@Scenario("kafka/consume/stringkey/test_kafka_produce_consume_string_with_double_quotes_key.json")
31+
public void testKafkaConsumeKey_DoubleQuotedStringKey() throws Exception {
32+
}
33+
34+
// JSON Object Key as JSON Object (the below way) is not supported.
35+
// Instead, use the JSON Object as a String format :
36+
// e.g. if you want to produce a JSON "key":{"id": "value"}, then produce the "key":"{\"id\": \"value\"}"
37+
@Ignore
38+
@Test
39+
@Scenario("kafka/consume/stringkey/test_kafka_produce_consume_jsonobject_as_objectblock_key.json")
40+
public void testKafkaConsumeKey_JsonObjectAsObjectlockKey() throws Exception {
41+
}
42+
43+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
{
2+
"scenarioName": "DEMO SCENARIO - Number as a string key",
3+
"steps": [
4+
{
5+
"name": "do_produce",
6+
"url": "kafka-topic:THE_TEST_TOPIC",
7+
"operation": "produce",
8+
"request": {
9+
"recordType": "JSON",
10+
"records": [
11+
{
12+
"key": "111",
13+
"value": {
14+
"appName": "App X"
15+
}
16+
}
17+
]
18+
},
19+
"verify": {
20+
"status": "Ok"
21+
}
22+
},
23+
{
24+
"name": "do_consume",
25+
"url": "kafka-topic:THE_TEST_TOPIC",
26+
"operation": "consume",
27+
"request": {
28+
"consumerLocalConfigs": {
29+
"recordType": "JSON",
30+
"commitSync": true,
31+
"showRecordsConsumed": true,
32+
"maxNoOfRetryPollsOrTimeouts": 3
33+
}
34+
},
35+
"verify": {
36+
"records": [
37+
{
38+
"key": "(int)${$.do_produce.request.records[0].key}",
39+
"value": {
40+
"appName": "App X"
41+
}
42+
}
43+
]
44+
}
45+
}
46+
]
47+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
{
2+
"scenarioName": "DEMO SCENARIO - JSON String alphanumeric string key",
3+
"steps": [
4+
{
5+
"name": "do_produce",
6+
"url": "kafka-topic:THE_TEST_TOPIC",
7+
"operation": "produce",
8+
"request": {
9+
"recordType": "JSON",
10+
"records": [
11+
{
12+
"key": "EMP-123",
13+
"value": {
14+
"appName": "App X"
15+
}
16+
}
17+
]
18+
},
19+
"verify": {
20+
"status": "Ok"
21+
}
22+
},
23+
{
24+
"name": "do_consume",
25+
"url": "kafka-topic:THE_TEST_TOPIC",
26+
"operation": "consume",
27+
"request": {
28+
"consumerLocalConfigs": {
29+
"recordType": "JSON",
30+
"commitSync": true,
31+
"showRecordsConsumed": true,
32+
"maxNoOfRetryPollsOrTimeouts": 3
33+
}
34+
},
35+
"verify": {
36+
// "size": 1,
37+
"records": [
38+
{
39+
"key": "${$.do_produce.request.records[0].key}",
40+
"value": {
41+
"appName": "App X"
42+
}
43+
}
44+
]
45+
}
46+
}
47+
]
48+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{
2+
"scenarioName": "DEMO SCENARIO - key as a JSON block(not supported)",
3+
"steps": [
4+
{
5+
"name": "do_produce",
6+
"url": "kafka-topic:THE_TEST_TOPIC",
7+
"operation": "produce",
8+
"request": {
9+
"recordType": "JSON",
10+
"records": [
11+
{
12+
"key": {
13+
"keyId": 444,
14+
"keyValue": "EMP-444"
15+
},
16+
"value": {
17+
"appName": "App X"
18+
}
19+
}
20+
]
21+
},
22+
"verify": {
23+
"status": "Ok"
24+
}
25+
},
26+
{
27+
"name": "do_consume",
28+
"url": "kafka-topic:THE_TEST_TOPIC",
29+
"operation": "consume",
30+
"request": {
31+
"consumerLocalConfigs": {
32+
"recordType": "JSON",
33+
"commitSync": true,
34+
"showRecordsConsumed": true,
35+
"maxNoOfRetryPollsOrTimeouts": 3
36+
}
37+
},
38+
"verify": {
39+
"records": [
40+
{
41+
"key": {
42+
"keyId": "${$.do_produce.request.records[0].keyId}"
43+
},
44+
"value": {
45+
"appName": "App X"
46+
}
47+
}
48+
]
49+
}
50+
}
51+
]
52+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
{
2+
"scenarioName": "DEMO SCENARIO - Key as String JSON(Supports, but very rare use case)",
3+
"steps": [
4+
{
5+
"name": "do_produce",
6+
"url": "kafka-topic:THE_TEST_TOPIC",
7+
"operation": "produce",
8+
"request": {
9+
"recordType": "JSON",
10+
"records": [
11+
{
12+
"key": "{\"keyId\": 789,\"keyRef\": \"EMP-123\"}",
13+
"value": {
14+
"appName": "App X"
15+
}
16+
}
17+
]
18+
},
19+
"verify": {
20+
"status": "Ok"
21+
}
22+
},
23+
{
24+
"name": "do_consume",
25+
"url": "kafka-topic:THE_TEST_TOPIC",
26+
"operation": "consume",
27+
"request": {
28+
"consumerLocalConfigs": {
29+
"recordType": "JSON",
30+
"commitSync": true,
31+
"showRecordsConsumed": true,
32+
"maxNoOfRetryPollsOrTimeouts": 3
33+
}
34+
},
35+
"verify": {
36+
"records": [
37+
{
38+
"key" : {
39+
"keyId" : 789, // For dynamic assertions(to avoid hardcoded values, define a variable and reuse it. Search variables in Documentation)
40+
"keyRef" : "EMP-123"
41+
},
42+
"value": {
43+
"appName": "App X"
44+
}
45+
}
46+
]
47+
}
48+
}
49+
]
50+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
{
2+
"scenarioName": "DEMO SCENARIO - Special char in the Key(supported, but very rare use case)",
3+
"steps": [
4+
{
5+
"name": "do_produce",
6+
"url": "kafka-topic:THE_TEST_TOPIC",
7+
"operation": "produce",
8+
"request": {
9+
"recordType": "JSON",
10+
"records": [
11+
{
12+
"key": "Hello's Key\"s",
13+
"value": {
14+
"appName": "App X"
15+
}
16+
}
17+
]
18+
},
19+
"verify": {
20+
"status": "Ok"
21+
}
22+
},
23+
{
24+
"name": "do_consume",
25+
"url": "kafka-topic:THE_TEST_TOPIC",
26+
"operation": "consume",
27+
"request": {
28+
"consumerLocalConfigs": {
29+
"recordType": "JSON",
30+
"commitSync": true,
31+
"showRecordsConsumed": true,
32+
"maxNoOfRetryPollsOrTimeouts": 3
33+
}
34+
},
35+
"verify": {
36+
"records": [
37+
{
38+
// "key": "${$.do_produce.request.records[0].key}", // JSONPath for assertion Doesn't work. But consume works.
39+
"key": "Hello's Key\"s", // For dynamic assertions(to avoid hardcoded value, define a variable and reuse it. Search variables in Documentation for examples)
40+
"value": {
41+
"appName": "App X"
42+
}
43+
}
44+
]
45+
}
46+
}
47+
]
48+
}

0 commit comments

Comments
 (0)