Skip to content

Commit 24c03b8

Browse files
authored
feat: kafka backup topics (#2260)
1 parent 4bbf648 commit 24c03b8

File tree

9 files changed

+122
-0
lines changed

9 files changed

+122
-0
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#!/bin/bash
2+
3+
# topics.txt format is like:
4+
# (topic name) (partitions) (replication factor)
5+
# topic1 1 1
6+
# topic2 1 1
7+
#
8+
# We also ignores the __consumer_offsets topic as offsets won't be backuped up.
9+
echo "getting topics..."
10+
kafkactl get topics | tail -n +2 | grep -v __consumer_offsets | datasafed push - topics.txt
11+
readarray -t topics < <(kafkactl get topics -o compact | grep -v __consumer_offsets)
12+
13+
for topic in "${topics[@]}"; do
14+
echo "backing up ${topic}..."
15+
kafkactl consume "${topic}" --from-beginning --print-keys --print-timestamps --exit --print-headers -o json-raw | datasafed push - "data/${topic}.json"
16+
done
17+
18+
# use datasafed to get backup size
19+
# if we do not write into $DP_BACKUP_INFO_FILE, the backup job will stuck
20+
TOTAL_SIZE=$(datasafed stat / | grep TotalSize | awk '{print $2}')
21+
DP_save_backup_status_info "$TOTAL_SIZE"
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/bin/bash
2+
3+
set -eo pipefail
4+
5+
# Save backup status info file for syncing progress.
6+
# timeFormat: %Y-%m-%dT%H:%M:%SZ
7+
function DP_save_backup_status_info() {
8+
local totalSize=$1
9+
local startTime=$2
10+
local stopTime=$3
11+
local timeZone=$4
12+
local extras=$5
13+
local timeZoneStr=""
14+
if [ -n "${timeZone}" ]; then
15+
timeZoneStr=$(printf ',"timeZone":"%s"' "${timeZone}")
16+
fi
17+
if [ -z "${stopTime}" ]; then
18+
printf '{"totalSize":"%s"}' "${totalSize}" > "${DP_BACKUP_INFO_FILE}"
19+
elif [ -z "${startTime}" ]; then
20+
printf '{"totalSize":"%s","extras":[%s],"timeRange":{"end":"%s"%s}}' "${totalSize}" "${extras}" "${stopTime}" "${timeZoneStr}" > "${DP_BACKUP_INFO_FILE}"
21+
else
22+
printf '{"totalSize":"%s","extras":[%s],"timeRange":{"start":"%s","end":"%s"%s}}' "${totalSize}" "${extras}" "${startTime}" "${stopTime}" "${timeZoneStr}" > "${DP_BACKUP_INFO_FILE}"
23+
fi
24+
}
25+
26+
export BROKERS="$DP_DB_HOST:$DP_DB_PORT"
27+
export PATH="$PATH:$DP_DATASAFED_BIN_PATH"
28+
export DATASAFED_BACKEND_BASE_PATH=${DP_BACKUP_BASE_PATH}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#!/bin/bash
2+
3+
echo "getting topics..."
4+
readarray -t lines < <(datasafed pull topics.txt -)
5+
for line in "${lines[@]}"; do
6+
read -r topic partitions replication <<< "$line"
7+
echo "restoring ${topic}..."
8+
kafkactl create topic "$topic" --partitions "$partitions" --replication-factor "$replication"
9+
datasafed pull "data/${topic}.json" - | kafkactl produce "$topic" --input-format=json
10+
done

addons/kafka/scripts/kafka-env.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ kafka_env_vars=(
4242
KAFKA_CFG_MAX_PARTITION_FETCH_BYTES
4343
KAFKA_ENABLE_KRAFT
4444
KAFKA_KRAFT_CLUSTER_ID
45+
KAFKA_SKIP_KRAFT_STORAGE_INIT
4546
KAFKA_ZOOKEEPER_PROTOCOL
4647
KAFKA_ZOOKEEPER_PASSWORD
4748
KAFKA_ZOOKEEPER_USER
@@ -107,6 +108,7 @@ export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="${KAFKA_CFG_INTER_BROKER_LISTENER_N
107108
export KAFKA_CFG_MAX_REQUEST_SIZE="${KAFKA_CFG_MAX_REQUEST_SIZE:-}"
108109
export KAFKA_CFG_MAX_PARTITION_FETCH_BYTES="${KAFKA_CFG_MAX_PARTITION_FETCH_BYTES:-}"
109110
export KAFKA_ENABLE_KRAFT="${KAFKA_ENABLE_KRAFT:-yes}"
111+
export KAFKA_SKIP_KRAFT_STORAGE_INIT="${KAFKA_SKIP_KRAFT_STORAGE_INIT:-false}"
110112
export KAFKA_KRAFT_CLUSTER_ID="${KAFKA_KRAFT_CLUSTER_ID:-}"
111113

112114
# ZooKeeper connection settings
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
apiVersion: dataprotection.kubeblocks.io/v1alpha1
2+
kind: ActionSet
3+
metadata:
4+
name: kafka-topics
5+
labels:
6+
{{- include "kafka.labels" . | nindent 4 }}
7+
spec:
8+
backupType: Full
9+
backup:
10+
backupData:
11+
image: {{ .Values.images.registry | default "docker.io" }}/{{ .Values.images.kafkactl.repository }}:{{ .Values.images.kafkactl.tag }}
12+
syncProgress:
13+
enabled: false
14+
intervalSeconds: 5
15+
command:
16+
- bash
17+
- -c
18+
- |
19+
{{- .Files.Get "dataprotection/common.sh" | nindent 8 }}
20+
{{- .Files.Get "dataprotection/backup.sh" | nindent 8 }}
21+
restore:
22+
postReady:
23+
- job:
24+
image: {{ .Values.images.registry | default "docker.io" }}/{{ .Values.images.kafkactl.repository }}:{{ .Values.images.kafkactl.tag }}
25+
command:
26+
- bash
27+
- -c
28+
- |
29+
{{- .Files.Get "dataprotection/common.sh" | nindent 12 }}
30+
{{- .Files.Get "dataprotection/restore.sh" | nindent 12 }}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
apiVersion: dataprotection.kubeblocks.io/v1alpha1
2+
kind: BackupPolicyTemplate
3+
metadata:
4+
name: kafka-backup-policy-template
5+
labels:
6+
{{- include "kafka.labels" . | nindent 4 }}
7+
spec:
8+
serviceKind: Kafka
9+
compDefs:
10+
- {{ include "kafka2-broker.cmpdRegexpPattern" . }}
11+
- {{ include "kafka-broker.cmpdRegexpPattern" . }}
12+
- {{ include "kafka-combine.cmpdRegexpPattern" . }}
13+
backupMethods:
14+
- name: topics
15+
snapshotVolumes: false
16+
actionSetName: kafka-topics
17+
schedules:
18+
- backupMethod: topics
19+
enabled: false
20+
cronExpression: "0 18 * * *"
21+
retentionPeriod: 7d

addons/kafka/templates/cmpd-broker.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ spec:
1111
description: Kafka broker component definition
1212
serviceKind: kafka
1313
serviceVersion: {{ .Values.defaultServiceVersion.broker }}
14+
podManagementPolicy: Parallel
1415
services:
1516
- name: advertised-listener
1617
serviceName: advertised-listener

addons/kafka/templates/cmpd-controller.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ spec:
1111
description: Kafka controller that act as controllers (kraft) only server.
1212
serviceKind: kafka-controller
1313
serviceVersion: {{ .Values.defaultServiceVersion.controller }}
14+
podManagementPolicy: Parallel
1415
vars:
1516
- name: CLUSTER_UID
1617
valueFrom:
@@ -96,6 +97,10 @@ spec:
9697
value: "CONTROLLER://:9093"
9798
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
9899
value: "CONTROLLER:PLAINTEXT"
100+
# FIXME: why this config is needed for controller?
101+
# kafka 3.8/3.9 controller failed to start without this
102+
- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
103+
value: "CONTROLLER"
99104
- name: ALLOW_PLAINTEXT_LISTENER
100105
value: "yes"
101106
- name: JMX_PORT

addons/kafka/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ images:
4141
jmxExporter:
4242
repository: apecloud/jmx-exporter
4343
tag: 0.18.0-debian-11-r20
44+
kafkactl:
45+
# mirrored from deviceinsight/kafkactl
46+
repository: apecloud/kafkactl
47+
tag: v5.15.0
4448

4549
## @param define default serviceVersion of each Component
4650
defaultServiceVersion:

0 commit comments

Comments
 (0)