Skip to content

Commit c0b0bce

Browse files
kafka connect
1 parent 3a96538 commit c0b0bce

File tree

7 files changed

+455
-94
lines changed

7 files changed

+455
-94
lines changed

kafka_connect/ansible_kafka_connect.yml

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,6 @@
7575
group: ubuntu
7676
mode: '0755'
7777

78-
- name: Copy setup script
79-
copy:
80-
src: "{{ playbook_dir }}/scripts/setup-kafka-connect.sh"
81-
dest: "{{ kafka_connect_dir }}/scripts/setup-kafka-connect.sh"
82-
owner: ubuntu
83-
group: ubuntu
84-
mode: '0755'
85-
8678
- name: Copy all management scripts
8779
copy:
8880
src: "{{ playbook_dir }}/scripts/"
@@ -91,11 +83,19 @@
9183
group: ubuntu
9284
mode: '0755'
9385

94-
- name: Run Kafka Connect setup script
95-
command: "{{ kafka_connect_dir }}/scripts/setup-kafka-connect.sh"
86+
- name: Run Kafka Connect setup script (Node 1)
87+
command: "{{ kafka_connect_dir }}/scripts/setup-kafka-connect-1.sh"
88+
args:
89+
chdir: "{{ kafka_connect_dir }}"
90+
register: setup_result
91+
when: inventory_hostname == 'kafka-connect-node-1'
92+
93+
- name: Run Kafka Connect setup script (Node 2)
94+
command: "{{ kafka_connect_dir }}/scripts/setup-kafka-connect-2.sh"
9695
args:
9796
chdir: "{{ kafka_connect_dir }}"
9897
register: setup_result
98+
when: inventory_hostname == 'kafka-connect-node-2'
9999

100100
- name: Display setup results
101101
debug:
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"name": "http-source-topics-connector",
3+
"config": {
4+
"connector.class": "com.github.castorm.kafka.connect.http.HttpSourceConnector",
5+
"tasks.max": "1",
6+
"http.request.url": "http://ec2-54-217-66-144.eu-west-1.compute.amazonaws.com:2020/topics",
7+
"http.request.method": "GET",
8+
"http.request.headers": "Authorization: Bearer token",
9+
"http.timer.interval.millis": "60000",
10+
"http.timer.catchup.interval.millis": "1000",
11+
"http.response.policy.class": "com.github.castorm.kafka.connect.http.response.PolicyHttpResponsePolicy",
12+
"http.response.policy.policy": "com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampPolicy",
13+
"kafka.topic": "topic-1",
14+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
15+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
16+
"value.converter.schemas.enable": "false",
17+
"http.offset.initial": "Offset{timestamp=0}",
18+
"http.auth.type": "None"
19+
}
20+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ services:
1313
# Connect Worker Configuration
1414
CONNECT_BOOTSTRAP_SERVERS: "${KAFKA_BOOTSTRAP_SERVERS}"
1515
CONNECT_REST_PORT: 8083
16-
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
16+
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect-1"
1717

1818
# Cluster Configuration
1919
CONNECT_GROUP_ID: "kafka-connect-cluster"

kafka_connect/docker-compose-2.yml

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
version: '3.8'
2+
3+
services:
4+
kafka-connect:
5+
image: confluentinc/cp-kafka-connect:7.5.0
6+
container_name: kafka-connect
7+
hostname: kafka-connect
8+
user: "0:0" # Run as root to access mounted SSL certificates
9+
ports:
10+
- "8083:8083"
11+
- "9404:9404" # JMX Exporter
12+
environment:
13+
# Connect Worker Configuration
14+
CONNECT_BOOTSTRAP_SERVERS: "${KAFKA_BOOTSTRAP_SERVERS}"
15+
CONNECT_REST_PORT: 8083
16+
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect-2"
17+
18+
# Cluster Configuration
19+
CONNECT_GROUP_ID: "kafka-connect-cluster"
20+
CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
21+
CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
22+
CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
23+
24+
# Topic Replication Factors
25+
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
26+
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
27+
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
28+
29+
# Converters
30+
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
31+
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
32+
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
33+
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
34+
35+
# Internal Converters
36+
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
37+
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
38+
39+
# Security Configuration (SASL_SSL)
40+
CONNECT_SECURITY_PROTOCOL: "${KAFKA_SECURITY_PROTOCOL:-SASL_SSL}"
41+
CONNECT_SASL_MECHANISM: "${KAFKA_SASL_MECHANISM:-SCRAM-SHA-512}"
42+
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"${KAFKA_SASL_USERNAME:-admin}\" password=\"${KAFKA_SASL_PASSWORD}\";"
43+
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
44+
CONNECT_SSL_TRUSTSTORE_LOCATION: "/etc/kafka/secrets/kafka_connect.truststore.jks"
45+
CONNECT_SSL_TRUSTSTORE_PASSWORD: "${SSL_TRUSTSTORE_PASSWORD:-confluenttruststorepass}"
46+
CONNECT_SSL_KEYSTORE_LOCATION: "/etc/kafka/secrets/kafka_connect.keystore.jks"
47+
CONNECT_SSL_KEYSTORE_PASSWORD: "${SSL_KEYSTORE_PASSWORD:-confluenttruststorepass}"
48+
CONNECT_SSL_KEY_PASSWORD: "${SSL_KEY_PASSWORD:-confluenttruststorepass}"
49+
50+
# Admin Client Security
51+
CONNECT_ADMIN_SECURITY_PROTOCOL: "${KAFKA_SECURITY_PROTOCOL:-SASL_SSL}"
52+
CONNECT_ADMIN_SASL_MECHANISM: "${KAFKA_SASL_MECHANISM:-SCRAM-SHA-512}"
53+
CONNECT_ADMIN_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"${KAFKA_SASL_USERNAME:-admin}\" password=\"${KAFKA_SASL_PASSWORD}\";"
54+
CONNECT_ADMIN_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
55+
CONNECT_ADMIN_SSL_TRUSTSTORE_LOCATION: "/etc/kafka/secrets/kafka_connect.truststore.jks"
56+
CONNECT_ADMIN_SSL_TRUSTSTORE_PASSWORD: "${SSL_TRUSTSTORE_PASSWORD:-confluenttruststorepass}"
57+
CONNECT_ADMIN_SSL_KEYSTORE_LOCATION: "/etc/kafka/secrets/kafka_connect.keystore.jks"
58+
CONNECT_ADMIN_SSL_KEYSTORE_PASSWORD: "${SSL_KEYSTORE_PASSWORD:-confluenttruststorepass}"
59+
CONNECT_ADMIN_SSL_KEY_PASSWORD: "${SSL_KEY_PASSWORD:-confluenttruststorepass}"
60+
61+
# Producer Security
62+
CONNECT_PRODUCER_SECURITY_PROTOCOL: "${KAFKA_SECURITY_PROTOCOL:-SASL_SSL}"
63+
CONNECT_PRODUCER_SASL_MECHANISM: "${KAFKA_SASL_MECHANISM:-SCRAM-SHA-512}"
64+
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"${KAFKA_SASL_USERNAME:-admin}\" password=\"${KAFKA_SASL_PASSWORD}\";"
65+
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
66+
CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: "/etc/kafka/secrets/kafka_connect.truststore.jks"
67+
CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: "${SSL_TRUSTSTORE_PASSWORD:-confluenttruststorepass}"
68+
CONNECT_PRODUCER_SSL_KEYSTORE_LOCATION: "/etc/kafka/secrets/kafka_connect.keystore.jks"
69+
CONNECT_PRODUCER_SSL_KEYSTORE_PASSWORD: "${SSL_KEYSTORE_PASSWORD:-confluenttruststorepass}"
70+
CONNECT_PRODUCER_SSL_KEY_PASSWORD: "${SSL_KEY_PASSWORD:-confluenttruststorepass}"
71+
72+
# Consumer Security
73+
CONNECT_CONSUMER_SECURITY_PROTOCOL: "${KAFKA_SECURITY_PROTOCOL:-SASL_SSL}"
74+
CONNECT_CONSUMER_SASL_MECHANISM: "${KAFKA_SASL_MECHANISM:-SCRAM-SHA-512}"
75+
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"${KAFKA_SASL_USERNAME:-admin}\" password=\"${KAFKA_SASL_PASSWORD}\";"
76+
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
77+
CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: "/etc/kafka/secrets/kafka_connect.truststore.jks"
78+
CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: "${SSL_TRUSTSTORE_PASSWORD:-confluenttruststorepass}"
79+
CONNECT_CONSUMER_SSL_KEYSTORE_LOCATION: "/etc/kafka/secrets/kafka_connect.keystore.jks"
80+
CONNECT_CONSUMER_SSL_KEYSTORE_PASSWORD: "${SSL_KEYSTORE_PASSWORD:-confluenttruststorepass}"
81+
CONNECT_CONSUMER_SSL_KEY_PASSWORD: "${SSL_KEY_PASSWORD:-confluenttruststorepass}"
82+
83+
# Plugin Path
84+
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka-connect/plugins"
85+
86+
# Logging
87+
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
88+
CONNECT_LOG4J_LOGGERS: "org.reflections=ERROR"
89+
90+
# JMX Configuration
91+
KAFKA_JMX_PORT: 9101
92+
KAFKA_JMX_HOSTNAME: localhost
93+
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9101"
94+
95+
# JMX Exporter for Prometheus + SSL Certificate Verification Disable
96+
KAFKA_OPTS: "-javaagent:/usr/share/jmx_exporter/jmx_prometheus_javaagent.jar=9404:/etc/kafka-connect/jmx-exporter-config.yml -Dssl.endpoint.identification.algorithm="
97+
98+
volumes:
99+
- ./plugins:/etc/kafka-connect/plugins
100+
- ./jmx_prometheus_javaagent.jar:/usr/share/jmx_exporter/jmx_prometheus_javaagent.jar
101+
- ./jmx-exporter-config.yml:/etc/kafka-connect/jmx-exporter-config.yml
102+
- ./connectors:/etc/kafka-connect/connectors
103+
- /var/ssl/private:/etc/kafka/secrets:ro
104+
105+
healthcheck:
106+
test: ["CMD", "curl", "-f", "http://localhost:8083/"]
107+
interval: 30s
108+
timeout: 10s
109+
retries: 5
110+
start_period: 60s
111+
112+
restart: unless-stopped
113+
114+
networks:
115+
- kafka-network
116+
117+
networks:
118+
kafka-network:
119+
driver: bridge

kafka_connect/generate-inventory.sh

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,6 @@ SSL_TRUSTSTORE_PASSWORD=${SSL_TRUSTSTORE_PASSWORD:-confluenttruststorepass}
116116
SSL_KEYSTORE_PASSWORD=${SSL_KEYSTORE_PASSWORD:-confluentkeystorestorepass}
117117
SSL_KEY_PASSWORD=${SSL_KEY_PASSWORD:-confluentkeystorestorepass}
118118
119-
# Kafka Connect Configuration (Node 1 as primary for connector config)
120-
CONNECT_PUBLIC_DNS=${CONNECT_NODE1_DNS}
121-
CONNECT_PUBLIC_IP=${CONNECT_NODE1_IP}
122-
123-
# Kafka Connect Node 2
124-
CONNECT_PUBLIC_DNS_NODE2=${CONNECT_NODE2_DNS}
125-
CONNECT_PUBLIC_IP_NODE2=${CONNECT_NODE2_IP}
126-
127119
# JWT Token for FastAPI Authentication
128120
JWT_TOKEN=${JWT_TOKEN:-token}
129121
EOF
@@ -157,17 +149,17 @@ if [ -d "${SCRIPT_DIR}/scripts" ]; then
157149
if [ -f "$script" ]; then
158150
# Check if script contains KAFKA_BOOTSTRAP_SERVERS or CONNECT_HOST
159151
if grep -q "KAFKA_BOOTSTRAP_SERVERS\|CONNECT_HOST" "$script" 2>/dev/null; then
160-
# Create backup
161-
cp "$script" "$script.bak"
162-
163152
# Update KAFKA_BOOTSTRAP_SERVERS and CONNECT_HOST
164153
sed -i "s|KAFKA_BOOTSTRAP_SERVERS=.*|KAFKA_BOOTSTRAP_SERVERS=\"${KAFKA_BOOTSTRAP_SERVERS}\"|g" "$script"
165-
sed -i "s|CONNECT_HOST=.*|CONNECT_HOST=\"${CONNECT_NODE1_DNS}\"|g" "$script"
154+
sed -i "s|CONNECT_HOST=.*|CONNECT_HOST=\"localhost:8083\"|g" "$script"
166155

167156
echo " ✅ Updated $(basename $script)"
168157
fi
169158
fi
170159
done
160+
161+
# Clean up backup files
162+
rm -f "${SCRIPT_DIR}/scripts"/*.sh.bak
171163
echo ""
172164
fi
173165

0 commit comments

Comments
 (0)