Skip to content

Commit d2a4bf0

Browse files
authored
[System Tests] Python Kafka Test Client (kroxylicious#2670)
* added suport for python kafka client Signed-off-by: Francisco Vila <[email protected]> * delete some testing with message keys Signed-off-by: Francisco Vila <[email protected]> * fix comment Signed-off-by: Francisco Vila <[email protected]> * adapt python consumer record to the proper json structure Signed-off-by: Francisco Vila <[email protected]> * added renovate for new python test client Signed-off-by: Francisco Vila <[email protected]> * add one more test and update readme file Signed-off-by: Francisco Vila <[email protected]> * update method doc Signed-off-by: Francisco Vila <[email protected]> * fix class doc Signed-off-by: Francisco Vila <[email protected]> * rename value by payload for ConsumerRecord Signed-off-by: Francisco Vila <[email protected]> * replace contains marker by starts with Signed-off-by: Francisco Vila <[email protected]> * rename python by python_test_client Signed-off-by: Francisco Vila <[email protected]> * refactor extractRecordLines Signed-off-by: Francisco Vila <[email protected]> * replace log by pod stdout Signed-off-by: Francisco Vila <[email protected]> * make consumerRecord abstract Signed-off-by: Francisco Vila <[email protected]> * create local constants Signed-off-by: Francisco Vila <[email protected]> * rename python test client method Signed-off-by: Francisco Vila <[email protected]> * rename everything to python test client Signed-off-by: Francisco Vila <[email protected]> * remove sonar issue Signed-off-by: Francisco Vila <[email protected]> --------- Signed-off-by: Francisco Vila <[email protected]>
1 parent 8cb7cfb commit d2a4bf0

File tree

15 files changed

+313
-47
lines changed

15 files changed

+313
-47
lines changed

.github/renovate.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@
9191
"fileMatch": ["kroxylicious-systemtests/src/main/java/io/kroxylicious/systemtests/Constants.java"],
9292
"matchStrings": [
9393
"(?<depName>quay.io/kroxylicious/kcat):(?<currentValue>\\d+\\.\\d+\\.\\d+)",
94-
"(?<depName>quay.io/kroxylicious/kaf):(?<currentValue>\\d+\\.\\d+\\.\\d+)"
94+
"(?<depName>quay.io/kroxylicious/kaf):(?<currentValue>\\d+\\.\\d+\\.\\d+)",
95+
"(?<depName>quay.io/kroxylicious/python-kafka-test-client):(?<currentValue>\\d+\\.\\d+\\.\\d+-\\d+\\.\\d+\\.\\d+)"
9596
],
9697
"datasourceTemplate": "docker"
9798
},

DEV_GUIDE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ has been applied ineffectively.
405405
* `CONTAINER_CONFIG_PATH`: directory where `config.json` file is located. This file contains the pull secrets to be used by
406406
the container engine. Default value: `$HOME/.docker/config.json`
407407
* `SKIP_STRIMZI_INSTALL`: skip strimzi installation. Default value: `false`
408-
* `KAFKA_CLIENT`: client used to produce/consume messages. Default value: `strimzi_test_client`. Currently supported values: `strimzi_test_client`, `kaf`, `kcat`
408+
* `KAFKA_CLIENT`: client used to produce/consume messages. Default value: `strimzi_test_client`. Currently supported values: `strimzi_test_client`, `kaf`, `kcat`, `python_test_client`
409409
* `TEST_CLIENTS_IMAGE`: strimzi test client image to be used when running the tests. It is useful when running regression tests. Default value: `quay.io/strimzi-test-clients/test-clients:latest-kafka-${kafka.version}`
410410
* `AWS_USE_CLOUD`: set to `true` in case AWS Cloud is used for Record Encryption System Tests. LocalStack will be used by default. Default value: `false`
411411
* `AWS_REGION`: region of the AWS Cloud account to be used for KMS management. Default value: `us-east-2`

kroxylicious-systemtests/src/main/java/io/kroxylicious/systemtests/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ private Constants() {
116116
* Test clients image url
117117
*/
118118
public static final String KCAT_CLIENT_IMAGE = "quay.io/kroxylicious/kcat:1.7.1";
119+
public static final String PYTHON_CLIENT_IMAGE = "quay.io/kroxylicious/python-kafka-test-client:0.1.1-2.11.1";
119120
public static final String KAF_CLIENT_IMAGE = "quay.io/kroxylicious/kaf:v0.2.13";
120121

121122
/**

kroxylicious-systemtests/src/main/java/io/kroxylicious/systemtests/clients/KafkaClients.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public static KafkaClient getKafkaClient() {
2626
return switch (Enum.valueOf(KafkaClientType.class, Environment.KAFKA_CLIENT.toUpperCase())) {
2727
case KAF -> kaf();
2828
case KCAT -> kcat();
29+
case PYTHON_TEST_CLIENT -> pythonTestClient();
2930
default -> strimziTestClient();
3031
};
3132
}
@@ -39,6 +40,15 @@ public static KafClient kaf() {
3940
return new KafClient();
4041
}
4142

43+
/**
44+
* Python test client python client.
45+
*
46+
* @return the python client
47+
*/
48+
public static PythonTestClient pythonTestClient() {
49+
return new PythonTestClient();
50+
}
51+
4252
/**
4353
* Strimzi test client.
4454
*
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.systemtests.clients;
8+
9+
import java.time.Duration;
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import java.util.Objects;
13+
import java.util.Optional;
14+
import java.util.function.Predicate;
15+
import java.util.stream.Stream;
16+
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
import com.fasterxml.jackson.core.type.TypeReference;
21+
22+
import io.fabric8.kubernetes.api.model.batch.v1.Job;
23+
import io.skodjob.testframe.utils.KubeUtils;
24+
25+
import io.kroxylicious.systemtests.Constants;
26+
import io.kroxylicious.systemtests.clients.records.ConsumerRecord;
27+
import io.kroxylicious.systemtests.clients.records.PythonTestClientConsumerRecord;
28+
import io.kroxylicious.systemtests.enums.KafkaClientType;
29+
import io.kroxylicious.systemtests.templates.testclients.TestClientsJobTemplates;
30+
import io.kroxylicious.systemtests.utils.DeploymentUtils;
31+
import io.kroxylicious.systemtests.utils.KafkaUtils;
32+
import io.kroxylicious.systemtests.utils.TestUtils;
33+
34+
import edu.umd.cs.findbugs.annotations.Nullable;
35+
36+
import static io.kroxylicious.systemtests.k8s.KubeClusterResource.cmdKubeClient;
37+
import static io.kroxylicious.systemtests.k8s.KubeClusterResource.kubeClient;
38+
39+
/**
40+
* The type Python kafka test client (librdkafka client based CLI).
41+
*/
42+
public class PythonTestClient implements KafkaClient {
43+
private static final String RECEIVED_MESSAGE_MARKER = "Received:";
44+
private static final String PYTHON_COMMAND = "python3";
45+
private static final String BASE_PATH = "/usr/src";
46+
private static final String CONFLUENT_PYTHON_PATH = BASE_PATH + "/confluent-kafka-python";
47+
private static final String PRODUCER_PATH = CONFLUENT_PYTHON_PATH + "/Producer.py";
48+
private static final String CONSUMER_PATH = CONFLUENT_PYTHON_PATH + "/Consumer.py";
49+
private static final Logger LOGGER = LoggerFactory.getLogger(PythonTestClient.class);
50+
private static final TypeReference<PythonTestClientConsumerRecord> VALUE_TYPE_REF = new TypeReference<>() {
51+
};
52+
private String deployNamespace;
53+
54+
@Override
55+
public KafkaClient inNamespace(String namespace) {
56+
this.deployNamespace = namespace;
57+
return this;
58+
}
59+
60+
/**
61+
* Instantiates a new python test client.
62+
*/
63+
public PythonTestClient() {
64+
this.deployNamespace = kubeClient().getNamespace();
65+
}
66+
67+
@Override
68+
public void produceMessages(String topicName, String bootstrap, String message, @Nullable String messageKey, int numOfMessages) {
69+
final Optional<String> recordKey = Optional.ofNullable(messageKey);
70+
71+
StringBuilder msg = new StringBuilder();
72+
for (int i = 0; i < numOfMessages; i++) {
73+
msg.append(message)
74+
.append(" - ")
75+
.append(i)
76+
.append("\n");
77+
}
78+
79+
LOGGER.atInfo().setMessage("Producing messages in '{}' topic using python").addArgument(topicName).log();
80+
String name = Constants.KAFKA_PRODUCER_CLIENT_LABEL + "-python-" + TestUtils.getRandomPodNameSuffix();
81+
String jsonOverrides = KubeUtils.isOcp() ? TestUtils.getJsonFileContent("nonJVMClient_openshift.json").replace("%NAME%", name) : "";
82+
83+
List<String> executableCommand = new ArrayList<>(List.of(cmdKubeClient(deployNamespace).toString(), "run", "-i",
84+
"-n", deployNamespace, name,
85+
"--image=" + Constants.PYTHON_CLIENT_IMAGE,
86+
"--override-type=strategic",
87+
"--overrides=" + jsonOverrides,
88+
"--", PYTHON_COMMAND, PRODUCER_PATH, "-b", bootstrap, "-t", topicName));
89+
recordKey.ifPresent(key -> {
90+
executableCommand.add("-k");
91+
executableCommand.add(key);
92+
});
93+
94+
KafkaUtils.produceMessagesWithCmd(deployNamespace, executableCommand, String.valueOf(msg), name, KafkaClientType.PYTHON_TEST_CLIENT.name().toLowerCase());
95+
}
96+
97+
@Override
98+
public List<ConsumerRecord> consumeMessages(String topicName, String bootstrap, int numOfMessages, Duration timeout) {
99+
LOGGER.atInfo().log("Consuming messages using python");
100+
String name = Constants.KAFKA_CONSUMER_CLIENT_LABEL + "-python-" + TestUtils.getRandomPodNameSuffix();
101+
// Running consumer with parameters to get the latest N number of messages received to avoid consuming twice the same messages
102+
List<String> args = List.of(PYTHON_COMMAND, CONSUMER_PATH, "-n", String.valueOf(numOfMessages), "-b", bootstrap, "-t", topicName);
103+
Job pythonClientJob = TestClientsJobTemplates.defaultPythonJob(name, args).build();
104+
String podName = KafkaUtils.createJob(deployNamespace, name, pythonClientJob);
105+
String log = waitForConsumer(deployNamespace, podName, timeout);
106+
KafkaUtils.deleteJob(pythonClientJob);
107+
LOGGER.atInfo().setMessage("Pod STD_OUT: {}").addArgument(log).log();
108+
Stream<String> logRecords = extractRecordLinesFromLog(log);
109+
return getConsumerRecords(logRecords);
110+
}
111+
112+
private String waitForConsumer(String namespace, String podName, Duration timeout) {
113+
DeploymentUtils.waitForPodRunSucceeded(namespace, podName, timeout);
114+
return kubeClient().logsInSpecificNamespace(namespace, podName);
115+
}
116+
117+
private Stream<String> extractRecordLinesFromLog(String log) {
118+
return Stream.of(log.split("\n"))
119+
.filter(l -> l.startsWith(RECEIVED_MESSAGE_MARKER))
120+
.map(line -> line.substring(RECEIVED_MESSAGE_MARKER.length()));
121+
}
122+
123+
private List<ConsumerRecord> getConsumerRecords(Stream<String> logRecords) {
124+
return logRecords.filter(Predicate.not(String::isBlank))
125+
.map(x -> ConsumerRecord.parseFromJsonString(VALUE_TYPE_REF, x))
126+
.filter(Objects::nonNull)
127+
.map(ConsumerRecord.class::cast)
128+
.toList();
129+
}
130+
}

kroxylicious-systemtests/src/main/java/io/kroxylicious/systemtests/clients/records/ConsumerRecord.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,25 @@
1717
import com.fasterxml.jackson.core.type.TypeReference;
1818
import com.fasterxml.jackson.databind.ObjectMapper;
1919

20-
public class ConsumerRecord {
20+
public abstract class ConsumerRecord {
2121
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerRecord.class);
2222
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
2323

2424
protected String topic;
2525
protected String key;
26-
protected String value;
26+
protected String payload;
2727
protected int partition;
2828
protected long offset;
2929
protected Map<String, String> recordHeaders;
3030

31+
protected ConsumerRecord(String topic, String key, String payload, int partition, long offset) {
32+
this.topic = topic;
33+
this.key = key;
34+
this.payload = payload;
35+
this.partition = partition;
36+
this.offset = offset;
37+
}
38+
3139
public String getTopic() {
3240
return topic;
3341
}
@@ -36,16 +44,8 @@ public String getKey() {
3644
return key;
3745
}
3846

39-
public String getValue() {
40-
return value;
41-
}
42-
43-
public int getPartition() {
44-
return partition;
45-
}
46-
47-
public long getOffset() {
48-
return offset;
47+
public String getPayload() {
48+
return payload;
4949
}
5050

5151
public Map<String, String> getRecordHeaders() {
@@ -63,15 +63,15 @@ public boolean equals(Object o) {
6363
ConsumerRecord that = (ConsumerRecord) o;
6464
return Objects.equals(topic, that.topic) &&
6565
Objects.equals(key, that.key) &&
66-
Objects.equals(value, that.value) &&
66+
Objects.equals(payload, that.payload) &&
6767
partition == that.partition &&
6868
offset == that.offset &&
6969
Objects.deepEquals(recordHeaders, that.recordHeaders);
7070
}
7171

7272
@Override
7373
public int hashCode() {
74-
return Objects.hash(topic, key, value, partition, offset, recordHeaders);
74+
return Objects.hash(topic, key, payload, partition, offset, recordHeaders);
7575
}
7676

7777
/**
@@ -96,7 +96,7 @@ public static <T> T parseFromJsonString(TypeReference<T> valueTypeRef, String re
9696
public String toString() {
9797
return "ConsumerRecord(topic: " + this.topic +
9898
", key: " + this.key +
99-
", value: " + this.value +
99+
", payload: " + this.payload +
100100
", partition: " + this.partition +
101101
", offset: " + this.offset +
102102
", headers: " + this.recordHeaders +

kroxylicious-systemtests/src/main/java/io/kroxylicious/systemtests/clients/records/KafConsumerRecord.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,11 @@ public KafConsumerRecord(@JsonProperty("headers") List<Map<String, String>> head
3636
@JsonProperty("payload") String payload,
3737
@JsonProperty("partition") int partition,
3838
@JsonProperty("offset") long offset) {
39+
super(null, key, payload, partition, offset);
3940
this.recordHeaders = new HashMap<>();
4041
if (headers != null) {
4142
headers.forEach(h -> recordHeaders.put(new String(Base64.getDecoder().decode(h.get("Key"))), new String(Base64.getDecoder().decode(h.get("Value")))));
4243
}
43-
this.key = key;
44-
this.value = payload;
45-
this.partition = partition;
46-
this.offset = offset;
4744
}
4845

4946
/**

kroxylicious-systemtests/src/main/java/io/kroxylicious/systemtests/clients/records/KcatConsumerRecord.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public KcatConsumerRecord(@JsonProperty("headers") List<String> headers,
3737
@JsonProperty("payload") String payload,
3838
@JsonProperty("partition") int partition,
3939
@JsonProperty("offset") long offset) {
40+
super(topic, key, payload, partition, offset);
4041
this.recordHeaders = new HashMap<>();
4142
if (headers != null) {
4243
int headersSize = headers.size();
@@ -47,10 +48,5 @@ public KcatConsumerRecord(@JsonProperty("headers") List<String> headers,
4748
recordHeaders.put(headers.get(i), Optional.ofNullable(headers.get(i + 1)).orElse(""));
4849
}
4950
}
50-
this.topic = topic;
51-
this.key = key;
52-
this.value = payload;
53-
this.partition = partition;
54-
this.offset = offset;
5551
}
5652
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.systemtests.clients.records;
8+
9+
import java.util.HashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
13+
import com.fasterxml.jackson.annotation.JsonCreator;
14+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
15+
import com.fasterxml.jackson.annotation.JsonProperty;
16+
17+
/**
18+
* The type Python test client consumer record.
19+
*/
20+
@JsonIgnoreProperties(ignoreUnknown = true)
21+
public class PythonTestClientConsumerRecord extends ConsumerRecord {
22+
23+
/**
24+
* Instantiates a new python test client consumer record.
25+
*
26+
* @param headers the headers
27+
* @param topic the topic
28+
* @param key the key
29+
* @param payload the payload
30+
* @param partition the partition
31+
* @param offset the offset
32+
*/
33+
@JsonCreator
34+
public PythonTestClientConsumerRecord(@JsonProperty("headers") List<Map<String, String>> headers,
35+
@JsonProperty("topic") String topic,
36+
@JsonProperty("key") String key,
37+
@JsonProperty("value") String payload,
38+
@JsonProperty("partition") int partition,
39+
@JsonProperty("offset") long offset) {
40+
super(topic, key, payload, partition, offset);
41+
this.recordHeaders = new HashMap<>();
42+
if (headers != null) {
43+
headers.forEach(h -> recordHeaders.put(h.get("Key"), h.get("Value")));
44+
}
45+
}
46+
}

kroxylicious-systemtests/src/main/java/io/kroxylicious/systemtests/clients/records/StrimziTestClientConsumerRecord.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,10 @@ public StrimziTestClientConsumerRecord(@JsonProperty("headers") List<Entry<Strin
3737
@JsonProperty("payload") String payload,
3838
@JsonProperty("partition") int partition,
3939
@JsonProperty("offset") long offset) {
40+
super(topic, key, payload, partition, offset);
4041
this.recordHeaders = new HashMap<>();
4142
if (headers != null) {
4243
headers.forEach(h -> recordHeaders.put(h.getKey(), h.getValue()));
4344
}
44-
this.topic = topic;
45-
this.key = key;
46-
this.value = payload;
47-
this.partition = partition;
48-
this.offset = offset;
4945
}
5046
}

0 commit comments

Comments
 (0)