Skip to content

Commit 23a4fcd

Browse files
authored
Create python kafka client (#1)
* first commit Signed-off-by: Francisco Vila <[email protected]> * added codeowners Signed-off-by: Francisco Vila <[email protected]> * remove registry destination variable Signed-off-by: Francisco Vila <[email protected]> * added release tag for python kafka clients Signed-off-by: Francisco Vila <[email protected]> * update readme file Signed-off-by: Francisco Vila <[email protected]> * added dependabot yaml config file Signed-off-by: Francisco Vila <[email protected]> * add cron schedule Signed-off-by: Francisco Vila <[email protected]> * remove cron schedule Signed-off-by: Francisco Vila <[email protected]> * add cron schedule and checkout latest tag Signed-off-by: Francisco Vila <[email protected]> * added license to all files Signed-off-by: Francisco Vila <[email protected]> * remove default value for consumer group Signed-off-by: Francisco Vila <[email protected]> * added headers support for producer and consumer Signed-off-by: Francisco Vila <[email protected]> * added headers support for producer and consumer Signed-off-by: Francisco Vila <[email protected]> * added default value for group: Failed to create consumer: group.id must be set Signed-off-by: Francisco Vila <[email protected]> * added json parser to log the consumed record Signed-off-by: Francisco Vila <[email protected]> * added SASL configuration options Signed-off-by: Francisco Vila <[email protected]> * fix tls parameter Signed-off-by: Francisco Vila <[email protected]> * replace --tls by security_protocol Signed-off-by: Francisco Vila <[email protected]> * remove useless codewoners line and refactor extra_conf check Signed-off-by: Francisco Vila <[email protected]> * fix extra_conf check Signed-off-by: Francisco Vila <[email protected]> * refactor DockerFile Signed-off-by: Francisco Vila <[email protected]> * remove echo in DockerFile Signed-off-by: Francisco Vila <[email protected]> * update readme file Signed-off-by: Francisco Vila <[email protected]> * fix readme file Signed-off-by: Francisco Vila <[email protected]> * pass directly extra config to producer/consumer configurations Signed-off-by: Francisco Vila <[email protected]> * remove print for debug Signed-off-by: Francisco Vila <[email protected]> * default num of messages to avoid error Signed-off-by: Francisco Vila <[email protected]> --------- Signed-off-by: Francisco Vila <[email protected]>
1 parent 48b9f0c commit 23a4fcd

File tree

8 files changed

+389
-0
lines changed

8 files changed

+389
-0
lines changed

.github/CODEOWNERS

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# We write CODEOWNERS files in terms of Github teams.
2+
# When adding (or removing) code owners to the project, remember to update the `developers` within the pom.xml.
3+
4+
/.github/ @kroxylicious/admins
5+
* @kroxylicious/developers

.github/dependabot.yaml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
# To get started with Dependabot version updates, you'll need to specify which
18+
# package ecosystems to update and where the package manifests are located.
19+
# Please see the documentation for all configuration options:
20+
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
21+
22+
version: 2
23+
updates:
24+
- package-ecosystem: "github-actions"
25+
directory: "/"
26+
schedule:
27+
# Check for updates to GitHub Actions every week
28+
interval: "weekly"
29+
labels:
30+
- "dependencies"
31+
- "dev/github_actions"
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
# pythonkafkaclient.yaml - builds and pushes a Python Kafka Client container image.
18+
#
19+
# Requires repository variables:
20+
# - REGISTRY_SERVER - the server of the container registry service e.g. `quay.io` or `docker.io`
21+
# - REGISTRY_USERNAME - - your username on the service (or username of your robot account)
22+
# - REGISTRY_TOKEN - the access token that corresponds to `REGISTRY_USERNAME`
23+
#
24+
25+
name: Python Kafka Client Container
26+
on:
27+
schedule:
28+
- cron: '0 0 * * 5'
29+
push:
30+
tags: [ '*.*.*' ]
31+
workflow_dispatch:
32+
jobs:
33+
build:
34+
runs-on: ubuntu-latest
35+
if: ${{ vars.REGISTRY_SERVER != '' && vars.REGISTRY_USERNAME != '' }}
36+
steps:
37+
- name: 'Get latest release version of confluent kafka python package'
38+
id: confluentKafkaPython
39+
uses: pozetroninc/[email protected]
40+
with:
41+
repository: confluentinc/confluent-kafka-python
42+
- name: 'Get latest release version of this repository'
43+
id: pythonKafkaClientRelease
44+
uses: pozetroninc/[email protected]
45+
with:
46+
repository: kroxylicious/python-kafka-client
47+
- name: 'Determine Version'
48+
id: confluentKafkaPythonVersion
49+
run: |
50+
REPO_RELEASE=${{ steps.confluentKafkaPython.outputs.release }}
51+
RELEASE=$(echo ${REPO_RELEASE//v})
52+
echo "$RELEASE"
53+
echo "LIBRDKAFKA_VERSION=${RELEASE}" >> $GITHUB_ENV
54+
- name: 'Set release version from new tag'
55+
if: ${{ contains(github.ref, 'tag') }}
56+
run: |
57+
REPO_RELEASE=${{ github.ref_name }}
58+
echo "RELEASE_VERSION=${REPO_RELEASE}" >> $GITHUB_ENV
59+
- name: 'Set release version from latest tag'
60+
if: ${{ ! contains(github.ref, 'tag') }}
61+
run: |
62+
REPO_RELEASE=${{ steps.pythonKafkaClientRelease.outputs.release }}
63+
echo "RELEASE_VERSION=${REPO_RELEASE}" >> $GITHUB_ENV
64+
- name: 'Check out repository from latest tag'
65+
uses: actions/checkout@v5
66+
with:
67+
ref: ${{ env.RELEASE_VERSION }}
68+
fetch-depth: 0
69+
- name: 'Set up QEMU'
70+
uses: docker/setup-qemu-action@v3
71+
- name: Set up Docker Buildx
72+
id: buildx
73+
uses: docker/setup-buildx-action@v3
74+
- name: 'Login to container registry'
75+
uses: docker/login-action@v3
76+
with:
77+
registry: ${{ vars.REGISTRY_SERVER }}
78+
username: ${{ vars.REGISTRY_USERNAME }}
79+
password: ${{ secrets.REGISTRY_TOKEN }}
80+
- name: 'Build and push python kafka client container image'
81+
uses: docker/build-push-action@v6
82+
with:
83+
context: .
84+
build-args: LIBRDKAFKA_VERSION=${{env.LIBRDKAFKA_VERSION}}
85+
platforms: linux/amd64,linux/arm64,linux/s390x,linux/ppc64le
86+
push: true
87+
tags: ${{ vars.REGISTRY_DESTINATION }}:${{ env.RELEASE_VERSION }}-${{ env.LIBRDKAFKA_VERSION }}
88+
cache-from: type=gha
89+
cache-to: type=gha,mode=max,compression=zstd

.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
### Example user template
2+
3+
# IntelliJ project files
4+
.idea
5+
*.iml
6+
out
7+
gen

Consumer.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#
2+
# Copyright Kroxylicious Authors.
3+
#
4+
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
#
6+
from typing import AnyStr
7+
8+
from confluent_kafka import Consumer, KafkaException
9+
import argparse
10+
import sys
11+
import logging
12+
import json
13+
import inspect
14+
15+
def get_value_from_type(obj):
16+
value_str = obj
17+
if isinstance(obj, bytes):
18+
value_str = obj.decode("utf-8")
19+
if isinstance(obj, list):
20+
value_str = [get_value_from_type(x) for x in obj]
21+
if isinstance(obj, tuple):
22+
value_str = { "Key" : str(get_value_from_type(obj[0])),
23+
"Value": str(get_value_from_type(obj[1])) }
24+
25+
return value_str
26+
27+
def props(obj):
28+
pr = {}
29+
for name in dir(obj):
30+
value = getattr(obj, name)
31+
if (not (name.startswith('__') or name.startswith("set_"))
32+
and not inspect.ismethod(value)):
33+
pr[name] = get_value_from_type(value())
34+
return pr
35+
36+
def print_record_json(msg):
37+
res = json.dumps(props(msg))
38+
print("Received: " + res)
39+
40+
def main(args):
41+
topic = args.topic
42+
records_expected = int(args.num_of_records)
43+
44+
# Consumer configuration
45+
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
46+
consumer_conf = {'bootstrap.servers': args.bootstrap_servers, 'group.id': args.group, 'session.timeout.ms': 6000,
47+
'auto.offset.reset': 'earliest', 'enable.auto.offset.store': False}
48+
49+
vargs = vars(args)
50+
extra_configuration = [x[0].split('=') for x in vargs.get('extra_conf', [])]
51+
consumer_conf.update(dict(extra_configuration))
52+
53+
# Create logger for consumer (logs will be emitted when poll() is called)
54+
logger = logging.getLogger('consumer')
55+
logger.setLevel(logging.DEBUG)
56+
handler = logging.StreamHandler()
57+
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
58+
logger.addHandler(handler)
59+
60+
# Create Consumer instance
61+
# Hint: try debug='fetch' to generate some log records
62+
c = Consumer(consumer_conf, logger=logger)
63+
64+
def print_assignment(consumer, partitions):
65+
print('Assignment:', partitions)
66+
67+
# Subscribe to topics
68+
c.subscribe([topic], on_assign=print_assignment)
69+
70+
# Read records from Kafka, print to stdout
71+
try:
72+
records_received = 0
73+
while True:
74+
msg = c.poll(timeout=1.0)
75+
if msg is None:
76+
continue
77+
if msg.error():
78+
raise KafkaException(msg.error())
79+
else:
80+
print_record_json(msg)
81+
# Store the offset associated with msg to a local cache.
82+
# Stored offsets are committed to Kafka by a background thread every 'auto.commit.interval.ms'.
83+
# Explicitly storing offsets after processing gives at-least once semantics.
84+
c.store_offsets(msg)
85+
records_received += 1
86+
if records_received == records_expected:
87+
c.close()
88+
sys.exit(0)
89+
90+
except KeyboardInterrupt:
91+
sys.stderr.write('%% Aborted by user\n')
92+
93+
finally:
94+
# Close down consumer to commit final offsets.
95+
c.close()
96+
97+
if __name__ == '__main__':
98+
parser = argparse.ArgumentParser(description="Consumer")
99+
parser.add_argument('-b', dest="bootstrap_servers", required=True,
100+
help="Bootstrap broker(s) (host[:port])")
101+
parser.add_argument('-n', dest="num_of_records", default=0,
102+
help="Number of records expected")
103+
parser.add_argument('-t', dest="topic", required=True,
104+
help="Topic name")
105+
parser.add_argument('-g', dest="group", default="test_group",
106+
help="Consumer group")
107+
parser.add_argument('-X', nargs=1, dest='extra_conf', action='append', help='Configuration property', default=[])
108+
109+
main(parser.parse_args())

Dockerfile

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#
2+
# Copyright Kroxylicious Authors.
3+
#
4+
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
#
6+
7+
FROM alpine:edge
8+
9+
COPY . /usr/src/confluent-kafka-python
10+
11+
ARG LIBRDKAFKA_VERSION
12+
13+
ENV BUILD_DEPS="git make gcc g++ pkgconfig python3-dev"
14+
15+
ENV RUN_DEPS="bash librdkafka-dev>${LIBRDKAFKA_VERSION} libcurl cyrus-sasl-gssapiv2 ca-certificates libsasl heimdal-libs krb5 zstd-libs zstd-static python3 py3-pip"
16+
17+
RUN \
18+
apk update && \
19+
apk add --no-cache --virtual .dev_pkgs $BUILD_DEPS && \
20+
apk add --no-cache $RUN_DEPS && \
21+
python3 -m pip install -I confluent_kafka==${LIBRDKAFKA_VERSION} --break-system-packages && \
22+
apk del .dev_pkgs
23+
24+
RUN \
25+
python3 -c 'import confluent_kafka as cf ; print(cf.version(), "librdkafka", cf.libversion())'

Producer.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#
2+
# Copyright Kroxylicious Authors.
3+
#
4+
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
#
6+
7+
from confluent_kafka import Producer
8+
import argparse
9+
import sys
10+
11+
def main(args):
12+
broker = args.bootstrap_servers
13+
topic = args.topic
14+
key = args.key
15+
headers = args.headers
16+
17+
# Producer configuration
18+
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
19+
producer_conf = {'bootstrap.servers': broker}
20+
21+
vargs = vars(args)
22+
extra_configuration = [x[0].split('=') for x in vargs.get('extra_conf', [])]
23+
producer_conf.update(dict(extra_configuration))
24+
25+
# Create Producer instance
26+
p = Producer(**producer_conf)
27+
28+
# Optional per-record delivery callback (triggered by poll() or flush())
29+
# when a record has been successfully delivered or permanently
30+
# failed delivery (after retries).
31+
def delivery_callback(err, msg):
32+
if err:
33+
sys.stderr.write('%% Record failed delivery: %s\n' % err)
34+
else:
35+
print('Record {} successfully produced to {} [{}] at offset {}'.format(
36+
msg.key(), msg.topic(), msg.partition(), msg.offset()))
37+
38+
# Read lines from stdin, produce each line to Kafka
39+
for line in sys.stdin:
40+
try:
41+
# Produce line (without newline)
42+
p.produce(topic, headers=headers, key=key, value=line.rstrip(), callback=delivery_callback)
43+
44+
except BufferError:
45+
sys.stderr.write('%% Local producer queue is full (%d records awaiting delivery): try again\n' %
46+
len(p))
47+
48+
# Serve delivery callback queue.
49+
# NOTE: Since produce() is an asynchronous API this poll() call
50+
# will most likely not serve the delivery callback for the
51+
# last produce()d record.
52+
p.poll(0)
53+
54+
# Wait until all records have been delivered
55+
sys.stdout.write('%% Waiting for %d deliveries\n' % len(p))
56+
p.flush()
57+
58+
if __name__ == '__main__':
59+
parser = argparse.ArgumentParser(description="Producer")
60+
parser.add_argument('-b', dest="bootstrap_servers", required=True,
61+
help="Bootstrap broker(s) (host[:port])")
62+
parser.add_argument('-t', dest="topic", required=True,
63+
help="Topic name")
64+
parser.add_argument('-k', dest="key", default=None,
65+
help="Key")
66+
parser.add_argument('-H', action='append', dest="headers", type=lambda a: tuple(map(str, a.split('='))),
67+
default=[], help="Headers (header1=header value)")
68+
parser.add_argument('-X', nargs=1, dest='extra_conf', action='append', help='Configuration property', default=[])
69+
70+
main(parser.parse_args())

README.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Python Kafka Test Client
2+
3+
The intent of this client is to help Kroxylicious system tests, so all configurations are made to fulfill our requirements.
4+
5+
## Usage
6+
7+
The essential requirement to run these clients is a Kubernetes cluster with an Apache Kafka cluster.
8+
9+
After successfully building images (which will cause the images to be pushed to the specified Docker repository) you are ready to deploy the producer and consumer containers along with Kafka.
10+
11+
You can deploy clients by using kubernetes `Jobs` with the example image.
12+
13+
Example command for deploying job:
14+
15+
### Producer
16+
17+
The different parameters that can be used are:
18+
```
19+
-b <bootstrap_servers> list of boostrap servers separated by comma
20+
-t <topic_name> topic name
21+
-k <key> optional parameter to set the message key
22+
-H <headerKey=headerValue> optional parameter to set a header.
23+
If more than one header is added, -H has to be included multiple times: "-H 'header1=header1 value' -H 'header2=header2 value'..."
24+
-X <name=value> Extra configuration (See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
25+
If more than one extra config is added, -X has to be included multiple times
26+
* sasl.mechanism=<value> --> SASL mechanism to use for authentication. Choices=['GSSAPI', 'PLAIN', 'SCRAM-SHA-512', 'SCRAM-SHA-256']
27+
* security.protocol=<value> --> Security protocol to use. Choices=['PLAINTEXT', 'SASL_PLAINTEXT', 'SSL', 'SASL_SSL']
28+
* sasl.username=<value> --> Username
29+
...
30+
```
31+
32+
```bash
33+
kubectl run -i -n <namespace> --image=quay.io/kroxylicious/python-kafka-test-client:0.1.0-2.11.1 -- python3 /usr/src/confluent-kafka-python/Producer.py -b <bootstrap-servers> -t <topic_name>
34+
```
35+
36+
### Consumer
37+
38+
The different parameters that can be used are:
39+
```
40+
-b <bootstrap_servers> list of boostrap servers separated by comma
41+
-t <topic_name> topic name
42+
-n <int> number of messages expected to receive
43+
-g <group_name> optional parameter to set the group name
44+
-X <name=value> Extra configuration (See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
45+
If more than one extra config is added, -X has to be included multiple times
46+
* sasl.mechanism=<value> --> SASL mechanism to use for authentication. Choices=['GSSAPI', 'PLAIN', 'SCRAM-SHA-512', 'SCRAM-SHA-256']
47+
* security.protocol=<value> --> Security protocol to use. Choices=['PLAINTEXT', 'SASL_PLAINTEXT', 'SSL', 'SASL_SSL']
48+
* sasl.username=<value> --> Username
49+
...
50+
```
51+
```bash
52+
kubectl run -i -n <namespace> --image=quay.io/kroxylicious/python-kafka-test-client:0.1.0-2.11.1 -- python3 /usr/src/confluent-kafka-python/Consumer.py -n <num_of_expected_messages> -b <bootstrap-servers> -t <topic_name>
53+
```

0 commit comments

Comments
 (0)