Skip to content

Commit 7088663

Browse files
committed
docs: kafka demo
Signed-off-by: Bence Csati <bence.csati@axoflow.com>
1 parent 12a2d28 commit 7088663

File tree

5 files changed

+283
-0
lines changed

5 files changed

+283
-0
lines changed

examples/kafka/README.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Kafka Connector Examples
2+
3+
## Prerequisites
4+
5+
It is assumed that you have Axoflow deployed locally.
6+
7+
## Quick Start
8+
9+
### 1. Deploy Kafka to Kubernetes
10+
11+
```bash
12+
./deploy-kafka.sh
13+
```
14+
15+
This script:
16+
17+
- Creates a `kafka` namespace
18+
- Deploys Apache Kafka 4.1.1 in KRaft mode
19+
- Creates a `kafka-client` pod for testing
20+
- Creates the `otlp_logs` topic with 3 partitions
21+
22+
### 2. Test the Connector
23+
24+
#### Option A: Run in Kubernetes (Recommended)
25+
26+
```bash
27+
# Build the connector image
28+
make docker-build
29+
30+
# Deploy via Helm
31+
./test-helm.sh
32+
```
33+
34+
This deploys the connector as a StatefulSet in the `cloudconnectors` namespace.
35+
36+
#### Option B: Run Locally with Docker
37+
38+
**Requirements:**
39+
40+
- Port-forward Kafka: `kubectl port-forward -n kafka pods/kafka-0 9092:9092`
41+
- Port-forward AxoRouter: `kubectl port-forward -n axoflow-local pods/axorouter-0 4317:4317`
42+
43+
```bash
44+
# Build the connector image
45+
make docker-build
46+
47+
# Run locally
48+
./test-plaintext.sh
49+
```
50+
51+
**Note:** On macOS, the script uses `host.docker.internal` to access port-forwarded services. It also maps Kafka's cluster DNS names using `--add-host` to work around Docker networking limitations.
52+
53+
### 3. Produce Test Logs
54+
55+
Send test OTLP log messages to Kafka:
56+
57+
```bash
58+
./produce-test-logs.sh
59+
```
60+
61+
This sends a sample OTLP JSON log message to the `otlp_logs` topic. The connector will consume it and forward it to Axorouter.
62+
63+
## Clean Up
64+
65+
```bash
66+
# Remove Helm deployment
67+
helm uninstall cloudconnectors -n cloudconnectors
68+
69+
# Remove Kafka
70+
kubectl delete namespace kafka
71+
72+
# Remove local storage
73+
rm -rf /tmp/kafka-connector-storage
74+
```

examples/kafka/deploy-kafka.sh

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
#!/bin/bash
2+
set -e
3+
4+
kubectl create namespace kafka --dry-run=client -o yaml | kubectl apply -f -
5+
6+
kubectl apply -f - <<EOF
7+
apiVersion: v1
8+
kind: Service
9+
metadata:
10+
name: kafka
11+
namespace: kafka
12+
labels:
13+
app: kafka
14+
spec:
15+
ports:
16+
- port: 9092
17+
name: plaintext
18+
clusterIP: None
19+
selector:
20+
app: kafka
21+
---
22+
apiVersion: apps/v1
23+
kind: StatefulSet
24+
metadata:
25+
name: kafka
26+
namespace: kafka
27+
spec:
28+
serviceName: kafka
29+
replicas: 1
30+
selector:
31+
matchLabels:
32+
app: kafka
33+
template:
34+
metadata:
35+
labels:
36+
app: kafka
37+
spec:
38+
containers:
39+
- name: kafka
40+
image: docker.io/apache/kafka:4.1.1
41+
ports:
42+
- containerPort: 9092
43+
name: plaintext
44+
- containerPort: 9093
45+
name: controller
46+
env:
47+
- name: KAFKA_NODE_ID
48+
value: "1"
49+
- name: KAFKA_PROCESS_ROLES
50+
value: "broker,controller"
51+
- name: KAFKA_LISTENERS
52+
value: "PLAINTEXT://:9092,CONTROLLER://:9093"
53+
- name: KAFKA_ADVERTISED_LISTENERS
54+
value: "PLAINTEXT://kafka.kafka.svc.cluster.local:9092"
55+
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
56+
value: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
57+
- name: KAFKA_CONTROLLER_LISTENER_NAMES
58+
value: "CONTROLLER"
59+
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
60+
value: "1@kafka-0.kafka.kafka.svc.cluster.local:9093"
61+
- name: KAFKA_INTER_BROKER_LISTENER_NAME
62+
value: "PLAINTEXT"
63+
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
64+
value: "1"
65+
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
66+
value: "1"
67+
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
68+
value: "1"
69+
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
70+
value: "true"
71+
- name: CLUSTER_ID
72+
value: "5L6g3nShT-eMCtK--X86sw"
73+
resources:
74+
requests:
75+
memory: "512Mi"
76+
cpu: "500m"
77+
limits:
78+
memory: "1Gi"
79+
cpu: "1000m"
80+
EOF
81+
kubectl wait --for=condition=ready pod -l app=kafka -n kafka --timeout=300s
82+
83+
kubectl apply -f - <<EOF
84+
apiVersion: v1
85+
kind: Pod
86+
metadata:
87+
name: kafka-client
88+
namespace: kafka
89+
spec:
90+
containers:
91+
- name: kafka-client
92+
image: docker.io/apache/kafka:4.1.1
93+
command: ["sleep", "infinity"]
94+
EOF
95+
kubectl wait --for=condition=ready pod/kafka-client -n kafka --timeout=60s
96+
97+
kubectl exec -n kafka kafka-client -- /opt/kafka/bin/kafka-topics.sh \
98+
--create \
99+
--topic otlp_logs \
100+
--partitions 3 \
101+
--replication-factor 1 \
102+
--bootstrap-server kafka.kafka.svc.cluster.local:9092 \
103+
--if-not-exists
104+
105+
echo -e "Listing topics..."
106+
kubectl exec -n kafka kafka-client -- /opt/kafka/bin/kafka-topics.sh \
107+
--list \
108+
--bootstrap-server kafka.kafka.svc.cluster.local:9092
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#!/bin/bash
2+
set -e
3+
4+
TOPIC="${KAFKA_LOGS_TOPIC:-otlp_logs}"
5+
BOOTSTRAP_SERVER="${KAFKA_BOOTSTRAP_SERVER:-kafka.kafka.svc.cluster.local:9092}"
6+
INTERVAL="${LOG_INTERVAL:-5}"
7+
COUNT="${LOG_COUNT:-0}"
8+
9+
if [ "$COUNT" -eq 0 ]; then
10+
echo "Count: infinite"
11+
else
12+
echo "Count: ${COUNT} messages"
13+
fi
14+
echo ""
15+
16+
counter=0
17+
while true; do
18+
counter=$((counter + 1))
19+
TIMESTAMP=$(date +%s)000000000
20+
21+
echo "[$(date '+%Y-%m-%d %H:%M:%S')] Sending message #${counter}..."
22+
23+
cat <<EOF | kubectl exec -i -n kafka kafka-client -- /opt/kafka/bin/kafka-console-producer.sh --topic ${TOPIC} --bootstrap-server ${BOOTSTRAP_SERVER}
24+
{"resourceLogs":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"test-service"}},{"key":"service.version","value":{"stringValue":"1.0.0"}},{"key":"message.number","value":{"intValue":${counter}}}]},"scopeLogs":[{"scope":{"name":"test-scope","version":"1.0"},"logRecords":[{"timeUnixNano":"${TIMESTAMP}","severityNumber":9,"severityText":"INFO","body":{"stringValue":"Test log message #${counter} from Kafka at $(date '+%Y-%m-%d %H:%M:%S')"},"attributes":[{"key":"log.source","value":{"stringValue":"kafka-test"}},{"key":"environment","value":{"stringValue":"development"}},{"key":"iteration","value":{"intValue":${counter}}}]}]}]}]}
25+
EOF
26+
27+
if [ $? -eq 0 ]; then
28+
echo "✓ Message #${counter} sent successfully"
29+
else
30+
echo "✗ Failed to send message #${counter}"
31+
fi
32+
33+
if [ "$COUNT" -ne 0 ] && [ "$counter" -ge "$COUNT" ]; then
34+
echo ""
35+
echo "Completed sending ${COUNT} messages"
36+
break
37+
fi
38+
39+
sleep "${INTERVAL}"
40+
done
41+
42+
echo "Done!"

examples/kafka/test-helm.sh

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#!/bin/bash
2+
set -e
3+
4+
KAFKA_BROKERS="${KAFKA_BROKERS:-kafka.kafka.svc.cluster.local:9092}"
5+
KAFKA_LOGS_TOPIC="${KAFKA_LOGS_TOPIC:-otlp_logs}"
6+
7+
UUID_FULL=$(uuidgen 2>/dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())")
8+
AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1)
9+
export AXOCLOUDCONNECTOR_DEVICE_ID
10+
11+
kubectl create secret generic kafka-credentials \
12+
--from-literal=brokers="${KAFKA_BROKERS}" \
13+
--from-literal=logs-topic="${KAFKA_LOGS_TOPIC}" \
14+
--namespace cloudconnectors \
15+
--dry-run=client -o yaml | kubectl apply -f -
16+
17+
helm upgrade --install --wait --namespace cloudconnectors cloudconnectors ./charts/cloudconnectors \
18+
--set image.repository="axocloudconnectors" \
19+
--set image.tag="dev" \
20+
--set 'env[0].name=AXOROUTER_ENDPOINT' \
21+
--set 'env[0].value=axorouter.axoflow-local.svc.cluster.local:4317' \
22+
--set 'env[1].name=AXOCLOUDCONNECTOR_DEVICE_ID' \
23+
--set "env[1].value=${AXOCLOUDCONNECTOR_DEVICE_ID}" \
24+
--set 'env[2].name=KAFKA_BROKERS' \
25+
--set 'env[2].valueFrom.secretKeyRef.name=kafka-credentials' \
26+
--set 'env[2].valueFrom.secretKeyRef.key=brokers' \
27+
--set 'env[3].name=KAFKA_LOGS_TOPIC' \
28+
--set 'env[3].valueFrom.secretKeyRef.name=kafka-credentials' \
29+
--set 'env[3].valueFrom.secretKeyRef.key=logs-topic' \
30+
--set 'env[4].name=AXOROUTER_TLS_INSECURE' \
31+
--set-string 'env[4].value=true' \
32+
--set 'env[5].name=KAFKA_TLS_INSECURE' \
33+
--set-string 'env[5].value=true'

examples/kafka/test-plaintext.sh

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#!/bin/bash
2+
set -e
3+
4+
KAFKA_BROKERS="${KAFKA_BROKERS:-host.docker.internal:9092}"
5+
KAFKA_LOGS_TOPIC="${KAFKA_LOGS_TOPIC:-otlp_logs}"
6+
AXOROUTER_ENDPOINT="${AXOROUTER_ENDPOINT:-http://host.docker.internal:4317}"
7+
STORAGE_DIRECTORY="${STORAGE_DIRECTORY:-/tmp/kafka-connector-storage}"
8+
9+
UUID_FULL=$(uuidgen 2>/dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())")
10+
AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1)
11+
export AXOCLOUDCONNECTOR_DEVICE_ID
12+
13+
mkdir -p "${STORAGE_DIRECTORY}"
14+
15+
docker run --rm \
16+
--add-host=kafka.kafka.svc.cluster.local:host-gateway \
17+
--add-host=kafka-0.kafka.kafka.svc.cluster.local:host-gateway \
18+
-v "${STORAGE_DIRECTORY}":"${STORAGE_DIRECTORY}" \
19+
-e KAFKA_BROKERS="${KAFKA_BROKERS}" \
20+
-e KAFKA_LOGS_TOPIC="${KAFKA_LOGS_TOPIC}" \
21+
-e AXOROUTER_ENDPOINT="${AXOROUTER_ENDPOINT}" \
22+
-e AXOROUTER_TLS_INSECURE="true" \
23+
-e KAFKA_TLS_INSECURE="true" \
24+
-e STORAGE_DIRECTORY="${STORAGE_DIRECTORY}" \
25+
-e AXOCLOUDCONNECTOR_DEVICE_ID="${AXOCLOUDCONNECTOR_DEVICE_ID}" \
26+
axocloudconnectors:dev

0 commit comments

Comments
 (0)