Skip to content

Commit eb98baa

Browse files
authored
Merge pull request #16 from axoflow/feat/kafka-connector
feat: kafka connector
2 parents 10c2bfc + 4515ee0 commit eb98baa

File tree

10 files changed

+532
-1
lines changed

10 files changed

+532
-1
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.120.0-axoflow.1 AS axo-otelcol
1+
FROM ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.129.0-axoflow.kafkareceiver AS axo-otelcol
22

33
FROM alpine:3.21 AS base
44

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ You can find guides per connector:
88

99
- [Azure connector](./connectors/azure/README.md#quickstart)
1010
- [AWS connector](./connectors/aws/README.md#quickstart)
11+
- [Kafka connector](./connectors/kafka/README.md#quickstart)
1112

1213
## Environment Variables
1314

@@ -46,6 +47,32 @@ You can find guides per connector:
4647
| `AWS_ACCESS_KEY_ID` | No | - | AWS access key ID for direct authentication |
4748
| `AWS_SECRET_ACCESS_KEY` | No | - | AWS secret access key for direct authentication |
4849

50+
### Kafka Provider
51+
52+
| Variable | Required | Default | Description |
53+
|----------|----------|---------|-------------|
54+
| `KAFKA_BROKERS` | No | `localhost:9092` | Kafka broker addresses (e.g., `broker1:9092,broker2:9092`) |
55+
| `KAFKA_PROTOCOL_VERSION` | No | `2.1.0` | Kafka protocol version |
56+
| `KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY` | No | `false` | Resolve then reverse-lookup broker IPs during startup |
57+
| `KAFKA_LOGS_TOPIC` | No | `otlp_logs` | Topic for consuming logs |
58+
| `KAFKA_LOGS_ENCODING` | No | `otlp_json` | Encoding for logs (otlp_proto, otlp_json, raw, text, json, azure_resource_logs) |
59+
| `KAFKA_GROUP_ID` | No | `axocloudconnector` | Consumer group ID |
60+
| `KAFKA_CLIENT_ID` | No | `axocloudconnector` | Kafka client ID |
61+
| `KAFKA_INITIAL_OFFSET` | No | `latest` | Initial offset (`latest` or `earliest`) |
62+
| `KAFKA_GROUP_INSTANCE_ID` | No | - | Unique identifier for static consumer group membership |
63+
| `KAFKA_SESSION_TIMEOUT` | No | `10s` | Timeout for detecting client failures |
64+
| `KAFKA_HEARTBEAT_INTERVAL` | No | `3s` | Expected time between heartbeats |
65+
| `KAFKA_GROUP_REBALANCE_STRATEGY` | No | `range` | Partition assignment strategy (range, roundrobin, sticky) |
66+
| `KAFKA_MIN_FETCH_SIZE` | No | `1` | Minimum message bytes to fetch |
67+
| `KAFKA_DEFAULT_FETCH_SIZE` | No | `1048576` | Default message bytes to fetch (1MB) |
68+
| `KAFKA_MAX_FETCH_SIZE` | No | `0` | Maximum message bytes to fetch (0 = unlimited) |
69+
| `KAFKA_MAX_FETCH_WAIT` | No | `250ms` | Maximum wait time for min_fetch_size bytes |
70+
| `KAFKA_TLS_INSECURE` | No | `false` | Use insecure connection |
71+
| `KAFKA_TLS_CA_FILE` | No | - | Path to CA certificate file |
72+
| `KAFKA_TLS_CERT_FILE` | No | - | Path to client certificate file |
73+
| `KAFKA_TLS_KEY_FILE` | No | - | Path to client key file |
74+
| `KAFKA_TLS_INSECURE_SKIP_VERIFY` | No | `false` | Skip TLS certificate verification |
75+
4976
## Usage
5077

5178
### Local Development with Docker

connectors/kafka/README.md

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Kafka Receiver Connector
2+
3+
This directory contains the Axoflow Kafka receiver connector which helps collecting logs from Kafka topics.
4+
5+
## Quickstart
6+
7+
Create a topic for your logs, e.g:
8+
9+
```sh
10+
kubectl apply -f - <<EOF
11+
apiVersion: v1
12+
kind: Pod
13+
metadata:
14+
name: kafka-client
15+
namespace: kafka
16+
spec:
17+
containers:
18+
- name: kafka-client
19+
image: docker.io/apache/kafka:4.1.1
20+
command: ["sleep", "infinity"]
21+
EOF
22+
kubectl wait --for=condition=ready pod/kafka-client -n kafka --timeout=60s
23+
24+
kubectl exec -n kafka kafka-client -- /opt/kafka/bin/kafka-topics.sh \
25+
--create \
26+
--topic otlp_logs \
27+
--partitions 3 \
28+
--replication-factor 1 \
29+
--bootstrap-server kafka.kafka.svc.cluster.local:9092 \
30+
--if-not-exists
31+
```
32+
33+
Make sure the required environment variables are set before running the connector.
34+
35+
### Using plaintext connection
36+
37+
```bash
38+
UUID_FULL=$(uuidgen 2>/dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())")
39+
AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1)
40+
41+
docker run \
42+
--rm \
43+
-v "${STORAGE_DIRECTORY}":"${STORAGE_DIRECTORY}" \
44+
-e KAFKA_BROKERS="${KAFKA_BROKERS}" \
45+
-e KAFKA_LOGS_TOPIC="${KAFKA_LOGS_TOPIC}" \
46+
-e AXOROUTER_ENDPOINT="${AXOROUTER_ENDPOINT}" \
47+
-e STORAGE_DIRECTORY="${STORAGE_DIRECTORY}" \
48+
-e AXOCLOUDCONNECTOR_DEVICE_ID="${AXOCLOUDCONNECTOR_DEVICE_ID}" \
49+
ghcr.io/axoflow/axocloudconnectors:latest
50+
```
51+
52+
### Using TLS
53+
54+
```bash
55+
UUID_FULL=$(uuidgen 2>/dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())")
56+
AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1)
57+
58+
docker run \
59+
--rm \
60+
-v "${STORAGE_DIRECTORY}":"${STORAGE_DIRECTORY}" \
61+
-v "${KAFKA_CERTS_PATH}:/certs:ro" \
62+
-e KAFKA_BROKERS="${KAFKA_BROKERS}" \
63+
-e KAFKA_LOGS_TOPIC="${KAFKA_LOGS_TOPIC}" \
64+
-e KAFKA_TLS_CA_FILE="/certs/ca.pem" \
65+
-e KAFKA_TLS_CERT_FILE="/certs/cert.pem" \
66+
-e KAFKA_TLS_KEY_FILE="/certs/key.pem" \
67+
-e AXOROUTER_ENDPOINT="${AXOROUTER_ENDPOINT}" \
68+
-e STORAGE_DIRECTORY="${STORAGE_DIRECTORY}" \
69+
-e AXOCLOUDCONNECTOR_DEVICE_ID="${AXOCLOUDCONNECTOR_DEVICE_ID}" \
70+
ghcr.io/axoflow/axocloudconnectors:latest
71+
```
72+
73+
## Deploy with Helm-chart
74+
75+
1. Set the required environment-variables.
76+
77+
### Example deploy with Axorouter in cluster
78+
79+
```bash
80+
make minikube-cluster
81+
make docker-build
82+
make minikube-load-image
83+
84+
kubectl create namespace cloudconnectors
85+
kubectl create secret generic kafka-credentials \
86+
--from-literal=brokers="<YOUR-KAFKA-BROKERS>" \
87+
--from-literal=logs-topic="<YOUR-KAFKA-LOGS-TOPIC>" \
88+
--namespace cloudconnectors \
89+
--dry-run=client -o yaml | kubectl apply -f -
90+
91+
UUID_FULL=$(uuidgen 2>/dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())")
92+
AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1)
93+
94+
helm upgrade --install --wait --namespace cloudconnectors cloudconnectors ./charts/cloudconnectors \
95+
--set image.repository="axocloudconnectors" \
96+
--set image.tag="dev" \
97+
--set 'env[0].name=AXOROUTER_ENDPOINT' \
98+
--set 'env[0].value=axorouter.axoflow-local.svc.cluster.local:4317' \
99+
--set 'env[1].name=AXOCLOUDCONNECTOR_DEVICE_ID' \
100+
--set "env[1].value=${AXOCLOUDCONNECTOR_DEVICE_ID}" \
101+
--set 'env[2].name=KAFKA_BROKERS' \
102+
--set 'env[2].valueFrom.secretKeyRef.name=kafka-credentials' \
103+
--set 'env[2].valueFrom.secretKeyRef.key=brokers' \
104+
--set 'env[3].name=KAFKA_LOGS_TOPIC' \
105+
--set 'env[3].valueFrom.secretKeyRef.name=kafka-credentials' \
106+
--set 'env[3].valueFrom.secretKeyRef.key=logs-topic'
107+
```

connectors/kafka/config.yaml

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
exporters:
2+
otlp/axorouter:
3+
endpoint: ${env:AXOROUTER_ENDPOINT}
4+
retry_on_failure:
5+
enabled: true
6+
max_elapsed_time: 0
7+
sending_queue:
8+
enabled: true
9+
storage: file_storage
10+
tls:
11+
insecure: ${env:AXOROUTER_TLS_INSECURE:-false}
12+
ca_file: ${env:AXOROUTER_TLS_CA_FILE}
13+
ca_pem: ${env:AXOROUTER_TLS_CA_PEM}
14+
cert_file: ${env:AXOROUTER_TLS_CERT_FILE}
15+
cert_pem: ${env:AXOROUTER_TLS_CERT_PEM}
16+
key_file: ${env:AXOROUTER_TLS_KEY_FILE}
17+
key_pem: ${env:AXOROUTER_TLS_KEY_PEM}
18+
min_version: ${env:AXOROUTER_TLS_MIN_VERSION:-1.2}
19+
max_version: ${env:AXOROUTER_TLS_MAX_VERSION}
20+
include_system_ca_certs_pool: ${env:AXOROUTER_TLS_INCLUDE_SYSTEM_CA_CERTS_POOL:-false}
21+
insecure_skip_verify: ${env:AXOROUTER_TLS_INSECURE_SKIP_VERIFY:-false}
22+
23+
processors:
24+
resource/axoflow_device_id:
25+
attributes:
26+
- key: "com.axoflow.device_id"
27+
action: insert
28+
value: "${env:AXOCLOUDCONNECTOR_DEVICE_ID}"
29+
30+
resourcedetection/system:
31+
detectors: ["system", "env"]
32+
system:
33+
hostname_sources: ["dns", "os", "cname", "lookup"]
34+
resource_attributes:
35+
host.name:
36+
enabled: true
37+
host.ip:
38+
enabled: true
39+
host.id:
40+
enabled: true
41+
42+
resource/axoflow: # Provider specific!
43+
attributes:
44+
- key: "com.axoflow.product"
45+
action: insert
46+
value: "kafka"
47+
- key: "com.axoflow.vendor"
48+
action: insert
49+
value: "apache"
50+
51+
receivers: # Provider specific!
52+
kafka:
53+
brokers: ${env:KAFKA_BROKERS:-localhost:9092}
54+
protocol_version: ${env:KAFKA_PROTOCOL_VERSION:-2.1.0}
55+
resolve_canonical_bootstrap_servers_only: ${env:KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY:-false}
56+
logs:
57+
topic: ${env:KAFKA_LOGS_TOPIC:-otlp_logs}
58+
encoding: ${env:KAFKA_LOGS_ENCODING:-otlp_json}
59+
60+
group_id: ${env:KAFKA_GROUP_ID:-axocloudconnector}
61+
client_id: ${env:KAFKA_CLIENT_ID:-axocloudconnector}
62+
initial_offset: ${env:KAFKA_INITIAL_OFFSET:-latest}
63+
group_instance_id: ${env:KAFKA_GROUP_INSTANCE_ID}
64+
65+
session_timeout: ${env:KAFKA_SESSION_TIMEOUT:-10s}
66+
heartbeat_interval: ${env:KAFKA_HEARTBEAT_INTERVAL:-3s}
67+
group_rebalance_strategy: ${env:KAFKA_GROUP_REBALANCE_STRATEGY:-range}
68+
min_fetch_size: ${env:KAFKA_MIN_FETCH_SIZE:-1}
69+
default_fetch_size: ${env:KAFKA_DEFAULT_FETCH_SIZE:-1048576}
70+
max_fetch_size: ${env:KAFKA_MAX_FETCH_SIZE:-0}
71+
max_fetch_wait: ${env:KAFKA_MAX_FETCH_WAIT:-250ms}
72+
73+
tls:
74+
insecure: ${env:KAFKA_TLS_INSECURE:-false}
75+
ca_file: ${env:KAFKA_TLS_CA_FILE}
76+
cert_file: ${env:KAFKA_TLS_CERT_FILE}
77+
key_file: ${env:KAFKA_TLS_KEY_FILE}
78+
insecure_skip_verify: ${env:KAFKA_TLS_INSECURE_SKIP_VERIFY:-false}
79+
80+
autocommit:
81+
enable: false
82+
message_marking:
83+
after: true
84+
on_error: false
85+
on_permanent_error: true
86+
metadata:
87+
full: true
88+
retry:
89+
max: 3
90+
backoff: 250ms
91+
header_extraction:
92+
extract_headers: true
93+
error_backoff:
94+
enabled: true
95+
initial_interval: 5s
96+
max_interval: 30s
97+
max_elapsed_time: 0
98+
99+
extensions:
100+
health_check:
101+
endpoint: ${env:POD_IP}:13133
102+
file_storage:
103+
directory: ${env:STORAGE_DIRECTORY}
104+
create_directory: true
105+
106+
service:
107+
extensions: [health_check, file_storage]
108+
pipelines:
109+
logs:
110+
receivers: [kafka]
111+
processors: [resource/axoflow_device_id, resourcedetection/system, resource/axoflow]
112+
exporters: [otlp/axorouter]

entrypoint.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ detect_provider() {
1010

1111
env | grep -q "^AZURE_" && provider="$provider azure" && count=$((count + 1))
1212
env | grep -q "^AWS_" && provider="$provider aws" && count=$((count + 1))
13+
env | grep -q "^KAFKA_" && provider="$provider kafka" && count=$((count + 1))
1314
# env | grep -q "^GCP_" && provider="$provider gcp" && count=$((count + 1))
1415

1516
if [ "$count" -gt 1 ]; then
@@ -29,5 +30,6 @@ fi
2930
echo "No cloud provider configuration detected. Please set environment variables for one of:"
3031
echo " - Azure (AZURE_*)"
3132
echo " - AWS (AWS_*)"
33+
echo " - Kafka (KAFKA_*)"
3234
# echo " - GCP (GCP_*)"
3335
exit 1

examples/kafka/README.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Kafka Connector Examples
2+
3+
## Prerequisites
4+
5+
It is assumed that you have Axoflow deployed locally.
6+
7+
## Quick Start
8+
9+
### 1. Deploy Kafka to Kubernetes
10+
11+
```bash
12+
./deploy-kafka.sh
13+
```
14+
15+
This script:
16+
17+
- Creates a `kafka` namespace
18+
- Deploys Apache Kafka 4.1.1 in KRaft mode
19+
- Creates a `kafka-client` pod for testing
20+
- Creates the `otlp_logs` topic with 3 partitions
21+
22+
### 2. Test the Connector
23+
24+
#### Option A: Run in Kubernetes (Recommended)
25+
26+
```bash
27+
# Build the connector image
28+
make docker-build
29+
30+
# Deploy via Helm
31+
./test-helm.sh
32+
```
33+
34+
This deploys the connector as a StatefulSet in the `cloudconnectors` namespace.
35+
36+
#### Option B: Run Locally with Docker
37+
38+
**Requirements:**
39+
40+
- Port-forward Kafka: `kubectl port-forward -n kafka pods/kafka-0 9092:9092`
41+
- Port-forward AxoRouter: `kubectl port-forward -n axoflow-local pods/axorouter-0 4317:4317`
42+
43+
```bash
44+
# Build the connector image
45+
make docker-build
46+
47+
# Run locally
48+
./test-plaintext.sh
49+
```
50+
51+
**Note:** On macOS, the script uses `host.docker.internal` to access port-forwarded services. It also maps Kafka's cluster DNS names using `--add-host` to work around Docker networking limitations.
52+
53+
### 3. Produce Test Logs
54+
55+
Send test OTLP log messages to Kafka:
56+
57+
```bash
58+
./produce-test-logs.sh
59+
```
60+
61+
This sends a sample OTLP JSON log message to the `otlp_logs` topic. The connector will consume it and forward it to Axorouter.
62+
63+
## Clean Up
64+
65+
```bash
66+
# Remove Helm deployment
67+
helm uninstall cloudconnectors -n cloudconnectors
68+
69+
# Remove Kafka
70+
kubectl delete namespace kafka
71+
72+
# Remove local storage
73+
rm -rf /tmp/kafka-connector-storage
74+
```

0 commit comments

Comments
 (0)