Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.120.0-axoflow.1 AS axo-otelcol
FROM ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.129.0-axoflow.kafkareceiver AS axo-otelcol

FROM alpine:3.21 AS base

Expand Down
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ You can find guides per connector:

- [Azure connector](./connectors/azure/README.md#quickstart)
- [AWS connector](./connectors/aws/README.md#quickstart)
- [Kafka connector](./connectors/kafka/README.md#quickstart)

## Environment Variables

Expand Down Expand Up @@ -46,6 +47,32 @@ You can find guides per connector:
| `AWS_ACCESS_KEY_ID` | No | - | AWS access key ID for direct authentication |
| `AWS_SECRET_ACCESS_KEY` | No | - | AWS secret access key for direct authentication |

### Kafka Provider

| Variable | Required | Default | Description |
|----------|----------|---------|-------------|
| `KAFKA_BROKERS` | No | `localhost:9092` | Kafka broker addresses (e.g., `broker1:9092,broker2:9092`) |
| `KAFKA_PROTOCOL_VERSION` | No | `2.1.0` | Kafka protocol version |
| `KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY` | No | `false` | Resolve then reverse-lookup broker IPs during startup |
| `KAFKA_LOGS_TOPIC` | No | `otlp_logs` | Topic for consuming logs |
| `KAFKA_LOGS_ENCODING` | No | `otlp_json` | Encoding for logs (otlp_proto, otlp_json, raw, text, json, azure_resource_logs) |
| `KAFKA_GROUP_ID` | No | `axocloudconnector` | Consumer group ID |
| `KAFKA_CLIENT_ID` | No | `axocloudconnector` | Kafka client ID |
| `KAFKA_INITIAL_OFFSET` | No | `latest` | Initial offset (`latest` or `earliest`) |
| `KAFKA_GROUP_INSTANCE_ID` | No | - | Unique identifier for static consumer group membership |
| `KAFKA_SESSION_TIMEOUT` | No | `10s` | Timeout for detecting client failures |
| `KAFKA_HEARTBEAT_INTERVAL` | No | `3s` | Expected time between heartbeats |
| `KAFKA_GROUP_REBALANCE_STRATEGY` | No | `range` | Partition assignment strategy (range, roundrobin, sticky) |
| `KAFKA_MIN_FETCH_SIZE` | No | `1` | Minimum message bytes to fetch |
| `KAFKA_DEFAULT_FETCH_SIZE` | No | `1048576` | Default message bytes to fetch (1MB) |
| `KAFKA_MAX_FETCH_SIZE` | No | `0` | Maximum message bytes to fetch (0 = unlimited) |
| `KAFKA_MAX_FETCH_WAIT` | No | `250ms` | Maximum wait time for min_fetch_size bytes |
| `KAFKA_TLS_INSECURE` | No | `false` | Use insecure connection |
| `KAFKA_TLS_CA_FILE` | No | - | Path to CA certificate file |
| `KAFKA_TLS_CERT_FILE` | No | - | Path to client certificate file |
| `KAFKA_TLS_KEY_FILE` | No | - | Path to client key file |
| `KAFKA_TLS_INSECURE_SKIP_VERIFY` | No | `false` | Skip TLS certificate verification |

## Usage

### Local Development with Docker
Expand Down
107 changes: 107 additions & 0 deletions connectors/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Kafka Receiver Connector

This directory contains the Axoflow Kafka receiver connector which helps collecting logs from Kafka topics.

## Quickstart

Create a topic for your logs, e.g:

```sh
kubectl apply -f - <<EOF
apiVersion: v1
kind: Pod
metadata:
name: kafka-client
namespace: kafka
spec:
containers:
- name: kafka-client
image: docker.io/apache/kafka:4.1.1
command: ["sleep", "infinity"]
EOF
kubectl wait --for=condition=ready pod/kafka-client -n kafka --timeout=60s

kubectl exec -n kafka kafka-client -- /opt/kafka/bin/kafka-topics.sh \
--create \
--topic otlp_logs \
--partitions 3 \
--replication-factor 1 \
--bootstrap-server kafka.kafka.svc.cluster.local:9092 \
--if-not-exists
```

Make sure the required environment variables are set before running the connector.

### Using plaintext connection

```bash
UUID_FULL=$(uuidgen 2>/dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())")
AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1)

docker run \
--rm \
-v "${STORAGE_DIRECTORY}":"${STORAGE_DIRECTORY}" \
-e KAFKA_BROKERS="${KAFKA_BROKERS}" \
-e KAFKA_LOGS_TOPIC="${KAFKA_LOGS_TOPIC}" \
-e AXOROUTER_ENDPOINT="${AXOROUTER_ENDPOINT}" \
-e STORAGE_DIRECTORY="${STORAGE_DIRECTORY}" \
-e AXOCLOUDCONNECTOR_DEVICE_ID="${AXOCLOUDCONNECTOR_DEVICE_ID}" \
ghcr.io/axoflow/axocloudconnectors:latest
```

### Using TLS

```bash
UUID_FULL=$(uuidgen 2>/dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())")
AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1)

docker run \
--rm \
-v "${STORAGE_DIRECTORY}":"${STORAGE_DIRECTORY}" \
-v "${KAFKA_CERTS_PATH}:/certs:ro" \
-e KAFKA_BROKERS="${KAFKA_BROKERS}" \
-e KAFKA_LOGS_TOPIC="${KAFKA_LOGS_TOPIC}" \
-e KAFKA_TLS_CA_FILE="/certs/ca.pem" \
-e KAFKA_TLS_CERT_FILE="/certs/cert.pem" \
-e KAFKA_TLS_KEY_FILE="/certs/key.pem" \
-e AXOROUTER_ENDPOINT="${AXOROUTER_ENDPOINT}" \
-e STORAGE_DIRECTORY="${STORAGE_DIRECTORY}" \
-e AXOCLOUDCONNECTOR_DEVICE_ID="${AXOCLOUDCONNECTOR_DEVICE_ID}" \
ghcr.io/axoflow/axocloudconnectors:latest
```

## Deploy with Helm-chart

1. Set the required environment-variables.

### Example deploy with Axorouter in cluster

```bash
make minikube-cluster
make docker-build
make minikube-load-image

kubectl create namespace cloudconnectors
kubectl create secret generic kafka-credentials \
--from-literal=brokers="<YOUR-KAFKA-BROKERS>" \
--from-literal=logs-topic="<YOUR-KAFKA-LOGS-TOPIC>" \
--namespace cloudconnectors \
--dry-run=client -o yaml | kubectl apply -f -

UUID_FULL=$(uuidgen 2>/dev/null || cat /proc/sys/kernel/random/uuid 2>/dev/null || python3 -c "import uuid; print(uuid.uuid4())")
AXOCLOUDCONNECTOR_DEVICE_ID=$(echo "$UUID_FULL" | cut -d'-' -f1)

helm upgrade --install --wait --namespace cloudconnectors cloudconnectors ./charts/cloudconnectors \
--set image.repository="axocloudconnectors" \
--set image.tag="dev" \
--set 'env[0].name=AXOROUTER_ENDPOINT' \
--set 'env[0].value=axorouter.axoflow-local.svc.cluster.local:4317' \
--set 'env[1].name=AXOCLOUDCONNECTOR_DEVICE_ID' \
--set "env[1].value=${AXOCLOUDCONNECTOR_DEVICE_ID}" \
--set 'env[2].name=KAFKA_BROKERS' \
--set 'env[2].valueFrom.secretKeyRef.name=kafka-credentials' \
--set 'env[2].valueFrom.secretKeyRef.key=brokers' \
--set 'env[3].name=KAFKA_LOGS_TOPIC' \
--set 'env[3].valueFrom.secretKeyRef.name=kafka-credentials' \
--set 'env[3].valueFrom.secretKeyRef.key=logs-topic'
```
112 changes: 112 additions & 0 deletions connectors/kafka/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
exporters:
otlp/axorouter:
endpoint: ${env:AXOROUTER_ENDPOINT}
retry_on_failure:
enabled: true
max_elapsed_time: 0
sending_queue:
enabled: true
storage: file_storage
tls:
insecure: ${env:AXOROUTER_TLS_INSECURE:-false}
ca_file: ${env:AXOROUTER_TLS_CA_FILE}
ca_pem: ${env:AXOROUTER_TLS_CA_PEM}
cert_file: ${env:AXOROUTER_TLS_CERT_FILE}
cert_pem: ${env:AXOROUTER_TLS_CERT_PEM}
key_file: ${env:AXOROUTER_TLS_KEY_FILE}
key_pem: ${env:AXOROUTER_TLS_KEY_PEM}
min_version: ${env:AXOROUTER_TLS_MIN_VERSION:-1.2}
max_version: ${env:AXOROUTER_TLS_MAX_VERSION}
include_system_ca_certs_pool: ${env:AXOROUTER_TLS_INCLUDE_SYSTEM_CA_CERTS_POOL:-false}
insecure_skip_verify: ${env:AXOROUTER_TLS_INSECURE_SKIP_VERIFY:-false}

processors:
resource/axoflow_device_id:
attributes:
- key: "com.axoflow.device_id"
action: insert
value: "${env:AXOCLOUDCONNECTOR_DEVICE_ID}"

resourcedetection/system:
detectors: ["system", "env"]
system:
hostname_sources: ["dns", "os", "cname", "lookup"]
resource_attributes:
host.name:
enabled: true
host.ip:
enabled: true
host.id:
enabled: true

resource/axoflow: # Provider specific!
attributes:
- key: "com.axoflow.product"
action: insert
value: "kafka"
- key: "com.axoflow.vendor"
action: insert
value: "apache"

receivers: # Provider specific!
kafka:
brokers: ${env:KAFKA_BROKERS:-localhost:9092}
protocol_version: ${env:KAFKA_PROTOCOL_VERSION:-2.1.0}
resolve_canonical_bootstrap_servers_only: ${env:KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY:-false}
logs:
topic: ${env:KAFKA_LOGS_TOPIC:-otlp_logs}
encoding: ${env:KAFKA_LOGS_ENCODING:-otlp_json}

group_id: ${env:KAFKA_GROUP_ID:-axocloudconnector}
client_id: ${env:KAFKA_CLIENT_ID:-axocloudconnector}
initial_offset: ${env:KAFKA_INITIAL_OFFSET:-latest}
group_instance_id: ${env:KAFKA_GROUP_INSTANCE_ID}

session_timeout: ${env:KAFKA_SESSION_TIMEOUT:-10s}
heartbeat_interval: ${env:KAFKA_HEARTBEAT_INTERVAL:-3s}
group_rebalance_strategy: ${env:KAFKA_GROUP_REBALANCE_STRATEGY:-range}
min_fetch_size: ${env:KAFKA_MIN_FETCH_SIZE:-1}
default_fetch_size: ${env:KAFKA_DEFAULT_FETCH_SIZE:-1048576}
max_fetch_size: ${env:KAFKA_MAX_FETCH_SIZE:-0}
max_fetch_wait: ${env:KAFKA_MAX_FETCH_WAIT:-250ms}

tls:
insecure: ${env:KAFKA_TLS_INSECURE:-false}
ca_file: ${env:KAFKA_TLS_CA_FILE}
cert_file: ${env:KAFKA_TLS_CERT_FILE}
key_file: ${env:KAFKA_TLS_KEY_FILE}
insecure_skip_verify: ${env:KAFKA_TLS_INSECURE_SKIP_VERIFY:-false}

autocommit:
enable: false
message_marking:
after: true
on_error: false
on_permanent_error: true
metadata:
full: true
retry:
max: 3
backoff: 250ms
header_extraction:
extract_headers: true
error_backoff:
enabled: true
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 0

extensions:
health_check:
endpoint: ${env:POD_IP}:13133
file_storage:
directory: ${env:STORAGE_DIRECTORY}
create_directory: true

service:
extensions: [health_check, file_storage]
pipelines:
logs:
receivers: [kafka]
processors: [resource/axoflow_device_id, resourcedetection/system, resource/axoflow]
exporters: [otlp/axorouter]
2 changes: 2 additions & 0 deletions entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ detect_provider() {

env | grep -q "^AZURE_" && provider="$provider azure" && count=$((count + 1))
env | grep -q "^AWS_" && provider="$provider aws" && count=$((count + 1))
env | grep -q "^KAFKA_" && provider="$provider kafka" && count=$((count + 1))
# env | grep -q "^GCP_" && provider="$provider gcp" && count=$((count + 1))

if [ "$count" -gt 1 ]; then
Expand All @@ -29,5 +30,6 @@ fi
echo "No cloud provider configuration detected. Please set environment variables for one of:"
echo " - Azure (AZURE_*)"
echo " - AWS (AWS_*)"
echo " - Kafka (KAFKA_*)"
# echo " - GCP (GCP_*)"
exit 1
74 changes: 74 additions & 0 deletions examples/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Kafka Connector Examples

## Prerequisites

It is assumed that you have Axoflow deployed locally.

## Quick Start

### 1. Deploy Kafka to Kubernetes

```bash
./deploy-kafka.sh
```

This script:

- Creates a `kafka` namespace
- Deploys Apache Kafka 4.1.1 in KRaft mode
- Creates a `kafka-client` pod for testing
- Creates the `otlp_logs` topic with 3 partitions

### 2. Test the Connector

#### Option A: Run in Kubernetes (Recommended)

```bash
# Build the connector image
make docker-build

# Deploy via Helm
./test-helm.sh
```

This deploys the connector as a StatefulSet in the `cloudconnectors` namespace.

#### Option B: Run Locally with Docker

**Requirements:**

- Port-forward Kafka: `kubectl port-forward -n kafka pods/kafka-0 9092:9092`
- Port-forward AxoRouter: `kubectl port-forward -n axoflow-local pods/axorouter-0 4317:4317`

```bash
# Build the connector image
make docker-build

# Run locally
./test-plaintext.sh
```

**Note:** On macOS, the script uses `host.docker.internal` to access port-forwarded services. It also maps Kafka's cluster DNS names using `--add-host` to work around Docker networking limitations.

### 3. Produce Test Logs

Send test OTLP log messages to Kafka:

```bash
./produce-test-logs.sh
```

This sends a sample OTLP JSON log message to the `otlp_logs` topic. The connector will consume it and forward it to Axorouter.

## Clean Up

```bash
# Remove Helm deployment
helm uninstall cloudconnectors -n cloudconnectors

# Remove Kafka
kubectl delete namespace kafka

# Remove local storage
rm -rf /tmp/kafka-connector-storage
```
Loading
Loading