From fb373a622fd77b80b055bb287c971615ca166eb5 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Tue, 25 Nov 2025 15:30:23 +0100 Subject: [PATCH 1/4] feat: kafka config Signed-off-by: Bence Csati --- connectors/kafka/config.yaml | 112 +++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 connectors/kafka/config.yaml diff --git a/connectors/kafka/config.yaml b/connectors/kafka/config.yaml new file mode 100644 index 0000000..fedcf2e --- /dev/null +++ b/connectors/kafka/config.yaml @@ -0,0 +1,112 @@ +exporters: + otlp/axorouter: + endpoint: ${env:AXOROUTER_ENDPOINT} + retry_on_failure: + enabled: true + max_elapsed_time: 0 + sending_queue: + enabled: true + storage: file_storage + tls: + insecure: ${env:AXOROUTER_TLS_INSECURE:-false} + ca_file: ${env:AXOROUTER_TLS_CA_FILE} + ca_pem: ${env:AXOROUTER_TLS_CA_PEM} + cert_file: ${env:AXOROUTER_TLS_CERT_FILE} + cert_pem: ${env:AXOROUTER_TLS_CERT_PEM} + key_file: ${env:AXOROUTER_TLS_KEY_FILE} + key_pem: ${env:AXOROUTER_TLS_KEY_PEM} + min_version: ${env:AXOROUTER_TLS_MIN_VERSION:-1.2} + max_version: ${env:AXOROUTER_TLS_MAX_VERSION} + include_system_ca_certs_pool: ${env:AXOROUTER_TLS_INCLUDE_SYSTEM_CA_CERTS_POOL:-false} + insecure_skip_verify: ${env:AXOROUTER_TLS_INSECURE_SKIP_VERIFY:-false} + +processors: + resource/axoflow_device_id: + attributes: + - key: "com.axoflow.device_id" + action: insert + value: "${env:AXOCLOUDCONNECTOR_DEVICE_ID}" + + resourcedetection/system: + detectors: ["system", "env"] + system: + hostname_sources: ["dns", "os", "cname", "lookup"] + resource_attributes: + host.name: + enabled: true + host.ip: + enabled: true + host.id: + enabled: true + + resource/axoflow: # Provider specific! + attributes: + - key: "com.axoflow.product" + action: insert + value: "kafka" + - key: "com.axoflow.vendor" + action: insert + value: "apache" + +receivers: # Provider specific! + kafka: + brokers: ${env:KAFKA_BROKERS:-localhost:9092} + protocol_version: ${env:KAFKA_PROTOCOL_VERSION:-2.1.0} + resolve_canonical_bootstrap_servers_only: ${env:KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY:-false} + logs: + topic: ${env:KAFKA_LOGS_TOPIC:-otlp_logs} + encoding: ${env:KAFKA_LOGS_ENCODING:-otlp_json} + + group_id: ${env:KAFKA_GROUP_ID:-axocloudconnector} + client_id: ${env:KAFKA_CLIENT_ID:-axocloudconnector} + initial_offset: ${env:KAFKA_INITIAL_OFFSET:-latest} + group_instance_id: ${env:KAFKA_GROUP_INSTANCE_ID} + + session_timeout: ${env:KAFKA_SESSION_TIMEOUT:-10s} + heartbeat_interval: ${env:KAFKA_HEARTBEAT_INTERVAL:-3s} + group_rebalance_strategy: ${env:KAFKA_GROUP_REBALANCE_STRATEGY:-range} + min_fetch_size: ${env:KAFKA_MIN_FETCH_SIZE:-1} + default_fetch_size: ${env:KAFKA_DEFAULT_FETCH_SIZE:-1048576} + max_fetch_size: ${env:KAFKA_MAX_FETCH_SIZE:-0} + max_fetch_wait: ${env:KAFKA_MAX_FETCH_WAIT:-250ms} + + tls: + insecure: ${env:KAFKA_TLS_INSECURE:-false} + ca_file: ${env:KAFKA_TLS_CA_FILE} + cert_file: ${env:KAFKA_TLS_CERT_FILE} + key_file: ${env:KAFKA_TLS_KEY_FILE} + insecure_skip_verify: ${env:KAFKA_TLS_INSECURE_SKIP_VERIFY:-false} + + autocommit: + enable: false + message_marking: + after: true + on_error: false + on_permanent_error: true + metadata: + full: true + retry: + max: 3 + backoff: 250ms + header_extraction: + extract_headers: true + error_backoff: + enabled: true + initial_interval: 5s + max_interval: 30s + max_elapsed_time: 0 + +extensions: + health_check: + endpoint: ${env:POD_IP}:13133 + file_storage: + directory: ${env:STORAGE_DIRECTORY} + create_directory: true + +service: + extensions: [health_check, file_storage] + pipelines: + logs: + receivers: [kafka] + processors: [resource/axoflow_device_id, resourcedetection/system, resource/axoflow] + exporters: [otlp/axorouter] From fe5fd227b4cca7ee64ceda5e9978c74187d77537 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Tue, 25 Nov 2025 15:30:43 +0100 Subject: [PATCH 2/4] feat: wire in kafka connector Signed-off-by: Bence Csati --- Dockerfile | 2 +- entrypoint.sh | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 80d268c..e512251 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.120.0-axoflow.1 AS axo-otelcol +FROM ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.129.0-axoflow.kafkareceiver AS axo-otelcol FROM alpine:3.21 AS base diff --git a/entrypoint.sh b/entrypoint.sh index f6153de..d51c690 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -10,6 +10,7 @@ detect_provider() { env | grep -q "^AZURE_" && provider="$provider azure" && count=$((count + 1)) env | grep -q "^AWS_" && provider="$provider aws" && count=$((count + 1)) + env | grep -q "^KAFKA_" && provider="$provider kafka" && count=$((count + 1)) # env | grep -q "^GCP_" && provider="$provider gcp" && count=$((count + 1)) if [ "$count" -gt 1 ]; then @@ -29,5 +30,6 @@ fi echo "No cloud provider configuration detected. Please set environment variables for one of:" echo " - Azure (AZURE_*)" echo " - AWS (AWS_*)" +echo " - Kafka (KAFKA_*)" # echo " - GCP (GCP_*)" exit 1 From 446f61fbd30f704b35df2e8a190454e61357d909 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Tue, 25 Nov 2025 15:36:18 +0100 Subject: [PATCH 3/4] feat: add docs for kafka connector Signed-off-by: Bence Csati --- README.md | 27 ++++++++++ connectors/kafka/README.md | 107 +++++++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+) create mode 100644 connectors/kafka/README.md diff --git a/README.md b/README.md index 73cf99b..68d045b 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ You can find guides per connector: - [Azure connector](./connectors/azure/README.md#quickstart) - [AWS connector](./connectors/aws/README.md#quickstart) +- [Kafka connector](./connectors/kafka/README.md#quickstart) ## Environment Variables @@ -46,6 +47,32 @@ You can find guides per connector: | `AWS_ACCESS_KEY_ID` | No | - | AWS access key ID for direct authentication | | `AWS_SECRET_ACCESS_KEY` | No | - | AWS secret access key for direct authentication | +### Kafka Provider + +| Variable | Required | Default | Description | +|----------|----------|---------|-------------| +| `KAFKA_BROKERS` | No | `localhost:9092` | Kafka broker addresses (e.g., `broker1:9092,broker2:9092`) | +| `KAFKA_PROTOCOL_VERSION` | No | `2.1.0` | Kafka protocol version | +| `KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY` | No | `false` | Resolve then reverse-lookup broker IPs during startup | +| `KAFKA_LOGS_TOPIC` | No | `otlp_logs` | Topic for consuming logs | +| `KAFKA_LOGS_ENCODING` | No | `otlp_json` | Encoding for logs (otlp_proto, otlp_json, raw, text, json, azure_resource_logs) | +| `KAFKA_GROUP_ID` | No | `axocloudconnector` | Consumer group ID | +| `KAFKA_CLIENT_ID` | No | `axocloudconnector` | Kafka client ID | +| `KAFKA_INITIAL_OFFSET` | No | `latest` | Initial offset (`latest` or `earliest`) | +| `KAFKA_GROUP_INSTANCE_ID` | No | - | Unique identifier for static consumer group membership | +| `KAFKA_SESSION_TIMEOUT` | No | `10s` | Timeout for detecting client failures | +| `KAFKA_HEARTBEAT_INTERVAL` | No | `3s` | Expected time between heartbeats | +| `KAFKA_GROUP_REBALANCE_STRATEGY` | No | `range` | Partition assignment strategy (range, roundrobin, sticky) | +| `KAFKA_MIN_FETCH_SIZE` | No | `1` | Minimum message bytes to fetch | +| `KAFKA_DEFAULT_FETCH_SIZE` | No | `1048576` | Default message bytes to fetch (1MB) | +| `KAFKA_MAX_FETCH_SIZE` | No | `0` | Maximum message bytes to fetch (0 = unlimited) | +| `KAFKA_MAX_FETCH_WAIT` | No | `250ms` | Maximum wait time for min_fetch_size bytes | +| `KAFKA_TLS_INSECURE` | No | `false` | Use insecure connection | +| `KAFKA_TLS_CA_FILE` | No | - | Path to CA certificate file | +| `KAFKA_TLS_CERT_FILE` | No | - | Path to client certificate file | +| `KAFKA_TLS_KEY_FILE` | No | - | Path to client key file | +| `KAFKA_TLS_INSECURE_SKIP_VERIFY` | No | `false` | Skip TLS certificate verification | + ## Usage ### Local Development with Docker diff --git a/connectors/kafka/README.md b/connectors/kafka/README.md new file mode 100644 index 0000000..742feec --- /dev/null +++ b/connectors/kafka/README.md @@ -0,0 +1,107 @@ +# Kafka Receiver Connector + +This directory contains the Axoflow Kafka receiver connector which helps collecting logs from Kafka topics. + +## Quickstart + +Create a topic for your logs, e.g: + +```sh +kubectl apply -f - </dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())") +AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1) + +docker run \ + --rm \ + -v "${STORAGE_DIRECTORY}":"${STORAGE_DIRECTORY}" \ + -e KAFKA_BROKERS="${KAFKA_BROKERS}" \ + -e KAFKA_LOGS_TOPIC="${KAFKA_LOGS_TOPIC}" \ + -e AXOROUTER_ENDPOINT="${AXOROUTER_ENDPOINT}" \ + -e STORAGE_DIRECTORY="${STORAGE_DIRECTORY}" \ + -e AXOCLOUDCONNECTOR_DEVICE_ID="${AXOCLOUDCONNECTOR_DEVICE_ID}" \ + ghcr.io/axoflow/axocloudconnectors:latest +``` + +### Using TLS + +```bash +UUID_FULL=$(uuidgen 2>/dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())") +AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1) + +docker run \ + --rm \ + -v "${STORAGE_DIRECTORY}":"${STORAGE_DIRECTORY}" \ + -v "${KAFKA_CERTS_PATH}:/certs:ro" \ + -e KAFKA_BROKERS="${KAFKA_BROKERS}" \ + -e KAFKA_LOGS_TOPIC="${KAFKA_LOGS_TOPIC}" \ + -e KAFKA_TLS_CA_FILE="/certs/ca.pem" \ + -e KAFKA_TLS_CERT_FILE="/certs/cert.pem" \ + -e KAFKA_TLS_KEY_FILE="/certs/key.pem" \ + -e AXOROUTER_ENDPOINT="${AXOROUTER_ENDPOINT}" \ + -e STORAGE_DIRECTORY="${STORAGE_DIRECTORY}" \ + -e AXOCLOUDCONNECTOR_DEVICE_ID="${AXOCLOUDCONNECTOR_DEVICE_ID}" \ + ghcr.io/axoflow/axocloudconnectors:latest +``` + +## Deploy with Helm-chart + +1. Set the required environment-variables. + +### Example deploy with Axorouter in cluster + +```bash +make minikube-cluster +make docker-build +make minikube-load-image + +kubectl create namespace cloudconnectors +kubectl create secret generic kafka-credentials \ + --from-literal=brokers="" \ + --from-literal=logs-topic="" \ + --namespace cloudconnectors \ + --dry-run=client -o yaml | kubectl apply -f - + +UUID_FULL=$(uuidgen 2>/dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())") +AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1) + +helm upgrade --install --wait --namespace cloudconnectors cloudconnectors ./charts/cloudconnectors \ + --set image.repository="axocloudconnectors" \ + --set image.tag="dev" \ + --set 'env[0].name=AXOROUTER_ENDPOINT' \ + --set 'env[0].value=axorouter.axoflow-local.svc.cluster.local:4317' \ + --set 'env[1].name=AXOCLOUDCONNECTOR_DEVICE_ID' \ + --set "env[1].value=${AXOCLOUDCONNECTOR_DEVICE_ID}" \ + --set 'env[2].name=KAFKA_BROKERS' \ + --set 'env[2].valueFrom.secretKeyRef.name=kafka-credentials' \ + --set 'env[2].valueFrom.secretKeyRef.key=brokers' \ + --set 'env[3].name=KAFKA_LOGS_TOPIC' \ + --set 'env[3].valueFrom.secretKeyRef.name=kafka-credentials' \ + --set 'env[3].valueFrom.secretKeyRef.key=logs-topic' +``` From 4515ee0ae74ff12a39e5530367423e56606ea9cd Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Tue, 25 Nov 2025 15:44:53 +0100 Subject: [PATCH 4/4] docs: kafka demo Signed-off-by: Bence Csati --- examples/kafka/README.md | 74 +++++++++++++++++++ examples/kafka/deploy-kafka.sh | 108 ++++++++++++++++++++++++++++ examples/kafka/produce-test-logs.sh | 42 +++++++++++ examples/kafka/test-helm.sh | 33 +++++++++ examples/kafka/test-plaintext.sh | 26 +++++++ 5 files changed, 283 insertions(+) create mode 100644 examples/kafka/README.md create mode 100755 examples/kafka/deploy-kafka.sh create mode 100755 examples/kafka/produce-test-logs.sh create mode 100755 examples/kafka/test-helm.sh create mode 100755 examples/kafka/test-plaintext.sh diff --git a/examples/kafka/README.md b/examples/kafka/README.md new file mode 100644 index 0000000..329a0a2 --- /dev/null +++ b/examples/kafka/README.md @@ -0,0 +1,74 @@ +# Kafka Connector Examples + +## Prerequisites + +It is assumed that you have Axoflow deployed locally. + +## Quick Start + +### 1. Deploy Kafka to Kubernetes + +```bash +./deploy-kafka.sh +``` + +This script: + +- Creates a `kafka` namespace +- Deploys Apache Kafka 4.1.1 in KRaft mode +- Creates a `kafka-client` pod for testing +- Creates the `otlp_logs` topic with 3 partitions + +### 2. Test the Connector + +#### Option A: Run in Kubernetes (Recommended) + +```bash +# Build the connector image +make docker-build + +# Deploy via Helm +./test-helm.sh +``` + +This deploys the connector as a StatefulSet in the `cloudconnectors` namespace. + +#### Option B: Run Locally with Docker + +**Requirements:** + +- Port-forward Kafka: `kubectl port-forward -n kafka pods/kafka-0 9092:9092` +- Port-forward AxoRouter: `kubectl port-forward -n axoflow-local pods/axorouter-0 4317:4317` + +```bash +# Build the connector image +make docker-build + +# Run locally +./test-plaintext.sh +``` + +**Note:** On macOS, the script uses `host.docker.internal` to access port-forwarded services. It also maps Kafka's cluster DNS names using `--add-host` to work around Docker networking limitations. + +### 3. Produce Test Logs + +Send test OTLP log messages to Kafka: + +```bash +./produce-test-logs.sh +``` + +This sends a sample OTLP JSON log message to the `otlp_logs` topic. The connector will consume it and forward it to Axorouter. + +## Clean Up + +```bash +# Remove Helm deployment +helm uninstall cloudconnectors -n cloudconnectors + +# Remove Kafka +kubectl delete namespace kafka + +# Remove local storage +rm -rf /tmp/kafka-connector-storage +``` diff --git a/examples/kafka/deploy-kafka.sh b/examples/kafka/deploy-kafka.sh new file mode 100755 index 0000000..545a0dd --- /dev/null +++ b/examples/kafka/deploy-kafka.sh @@ -0,0 +1,108 @@ +#!/bin/bash +set -e + +kubectl create namespace kafka --dry-run=client -o yaml | kubectl apply -f - + +kubectl apply -f - </dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())") +AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1) +export AXOCLOUDCONNECTOR_DEVICE_ID + +kubectl create secret generic kafka-credentials \ + --from-literal=brokers="${KAFKA_BROKERS}" \ + --from-literal=logs-topic="${KAFKA_LOGS_TOPIC}" \ + --namespace cloudconnectors \ + --dry-run=client -o yaml | kubectl apply -f - + +helm upgrade --install --wait --namespace cloudconnectors cloudconnectors ./charts/cloudconnectors \ + --set image.repository="axocloudconnectors" \ + --set image.tag="dev" \ + --set 'env[0].name=AXOROUTER_ENDPOINT' \ + --set 'env[0].value=axorouter.axoflow-local.svc.cluster.local:4317' \ + --set 'env[1].name=AXOCLOUDCONNECTOR_DEVICE_ID' \ + --set "env[1].value=${AXOCLOUDCONNECTOR_DEVICE_ID}" \ + --set 'env[2].name=KAFKA_BROKERS' \ + --set 'env[2].valueFrom.secretKeyRef.name=kafka-credentials' \ + --set 'env[2].valueFrom.secretKeyRef.key=brokers' \ + --set 'env[3].name=KAFKA_LOGS_TOPIC' \ + --set 'env[3].valueFrom.secretKeyRef.name=kafka-credentials' \ + --set 'env[3].valueFrom.secretKeyRef.key=logs-topic' \ + --set 'env[4].name=AXOROUTER_TLS_INSECURE' \ + --set-string 'env[4].value=true' \ + --set 'env[5].name=KAFKA_TLS_INSECURE' \ + --set-string 'env[5].value=true' diff --git a/examples/kafka/test-plaintext.sh b/examples/kafka/test-plaintext.sh new file mode 100755 index 0000000..bd00683 --- /dev/null +++ b/examples/kafka/test-plaintext.sh @@ -0,0 +1,26 @@ +#!/bin/bash +set -e + +KAFKA_BROKERS="${KAFKA_BROKERS:-host.docker.internal:9092}" +KAFKA_LOGS_TOPIC="${KAFKA_LOGS_TOPIC:-otlp_logs}" +AXOROUTER_ENDPOINT="${AXOROUTER_ENDPOINT:-http://host.docker.internal:4317}" +STORAGE_DIRECTORY="${STORAGE_DIRECTORY:-/tmp/kafka-connector-storage}" + +UUID_FULL=$(uuidgen 2>/dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())") +AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1) +export AXOCLOUDCONNECTOR_DEVICE_ID + +mkdir -p "${STORAGE_DIRECTORY}" + +docker run --rm \ + --add-host=kafka.kafka.svc.cluster.local:host-gateway \ + --add-host=kafka-0.kafka.kafka.svc.cluster.local:host-gateway \ + -v "${STORAGE_DIRECTORY}":"${STORAGE_DIRECTORY}" \ + -e KAFKA_BROKERS="${KAFKA_BROKERS}" \ + -e KAFKA_LOGS_TOPIC="${KAFKA_LOGS_TOPIC}" \ + -e AXOROUTER_ENDPOINT="${AXOROUTER_ENDPOINT}" \ + -e AXOROUTER_TLS_INSECURE="true" \ + -e KAFKA_TLS_INSECURE="true" \ + -e STORAGE_DIRECTORY="${STORAGE_DIRECTORY}" \ + -e AXOCLOUDCONNECTOR_DEVICE_ID="${AXOCLOUDCONNECTOR_DEVICE_ID}" \ + axocloudconnectors:dev