Skip to content

Commit 98a2939

Browse files
authored
chore: pick kafka/milvus backup from release-1.0 (#2275)
1 parent 71b049f commit 98a2939

File tree

15 files changed

+302
-2
lines changed

15 files changed

+302
-2
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#!/bin/bash
2+
3+
# topics.txt format is like:
4+
# (topic name) (partitions) (replication factor)
5+
# topic1 1 1
6+
# topic2 1 1
7+
#
8+
# We also ignores the __consumer_offsets topic as offsets won't be backuped up.
9+
echo "getting topics..."
10+
kafkactl get topics | tail -n +2 | grep -v __consumer_offsets | datasafed push - topics.txt
11+
readarray -t topics < <(kafkactl get topics -o compact | grep -v __consumer_offsets)
12+
13+
for topic in "${topics[@]}"; do
14+
echo "backing up ${topic}..."
15+
kafkactl consume "${topic}" --from-beginning --print-keys --print-timestamps --exit --print-headers -o json-raw | datasafed push - "data/${topic}.json"
16+
done
17+
18+
# use datasafed to get backup size
19+
# if we do not write into $DP_BACKUP_INFO_FILE, the backup job will stuck
20+
TOTAL_SIZE=$(datasafed stat / | grep TotalSize | awk '{print $2}')
21+
DP_save_backup_status_info "$TOTAL_SIZE"
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#!/bin/bash
2+
3+
set -eo pipefail
4+
5+
# Save backup status info file for syncing progress.
6+
# timeFormat: %Y-%m-%dT%H:%M:%SZ
7+
function DP_save_backup_status_info() {
8+
local totalSize=$1
9+
local startTime=$2
10+
local stopTime=$3
11+
local timeZone=$4
12+
local extras=$5
13+
local timeZoneStr=""
14+
if [ -n "${timeZone}" ]; then
15+
timeZoneStr=$(printf ',"timeZone":"%s"' "${timeZone}")
16+
fi
17+
if [ -z "${stopTime}" ]; then
18+
printf '{"totalSize":"%s"}' "${totalSize}" > "${DP_BACKUP_INFO_FILE}"
19+
elif [ -z "${startTime}" ]; then
20+
printf '{"totalSize":"%s","extras":[%s],"timeRange":{"end":"%s"%s}}' "${totalSize}" "${extras}" "${stopTime}" "${timeZoneStr}" > "${DP_BACKUP_INFO_FILE}"
21+
else
22+
printf '{"totalSize":"%s","extras":[%s],"timeRange":{"start":"%s","end":"%s"%s}}' "${totalSize}" "${extras}" "${startTime}" "${stopTime}" "${timeZoneStr}" > "${DP_BACKUP_INFO_FILE}"
23+
fi
24+
}
25+
26+
if [[ -z $DP_DB_PORT ]]; then
27+
DP_DB_PORT=9092
28+
fi
29+
30+
export BROKERS="$DP_DB_HOST:$DP_DB_PORT"
31+
export PATH="$PATH:$DP_DATASAFED_BIN_PATH"
32+
export DATASAFED_BACKEND_BASE_PATH=${DP_BACKUP_BASE_PATH}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#!/bin/bash
2+
3+
echo "getting topics..."
4+
readarray -t lines < <(datasafed pull topics.txt -)
5+
for line in "${lines[@]}"; do
6+
read -r topic partitions replication <<< "$line"
7+
echo "restoring ${topic}..."
8+
kafkactl create topic "$topic" --partitions "$partitions" --replication-factor "$replication"
9+
datasafed pull "data/${topic}.json" - | kafkactl produce "$topic" --input-format=json
10+
done

addons/kafka/scripts/kafka-env.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ kafka_env_vars=(
4242
KAFKA_CFG_MAX_PARTITION_FETCH_BYTES
4343
KAFKA_ENABLE_KRAFT
4444
KAFKA_KRAFT_CLUSTER_ID
45+
KAFKA_SKIP_KRAFT_STORAGE_INIT
4546
KAFKA_ZOOKEEPER_PROTOCOL
4647
KAFKA_ZOOKEEPER_PASSWORD
4748
KAFKA_ZOOKEEPER_USER
@@ -107,6 +108,7 @@ export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="${KAFKA_CFG_INTER_BROKER_LISTENER_N
107108
export KAFKA_CFG_MAX_REQUEST_SIZE="${KAFKA_CFG_MAX_REQUEST_SIZE:-}"
108109
export KAFKA_CFG_MAX_PARTITION_FETCH_BYTES="${KAFKA_CFG_MAX_PARTITION_FETCH_BYTES:-}"
109110
export KAFKA_ENABLE_KRAFT="${KAFKA_ENABLE_KRAFT:-yes}"
111+
export KAFKA_SKIP_KRAFT_STORAGE_INIT="${KAFKA_SKIP_KRAFT_STORAGE_INIT:-false}"
110112
export KAFKA_KRAFT_CLUSTER_ID="${KAFKA_KRAFT_CLUSTER_ID:-}"
111113

112114
# ZooKeeper connection settings
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
apiVersion: dataprotection.kubeblocks.io/v1alpha1
2+
kind: ActionSet
3+
metadata:
4+
name: kafka-topics
5+
labels:
6+
{{- include "kafka.labels" . | nindent 4 }}
7+
spec:
8+
backupType: Full
9+
backup:
10+
backupData:
11+
image: {{ .Values.images.registry | default "docker.io" }}/{{ .Values.images.kafkactl.repository }}:{{ .Values.images.kafkactl.tag }}
12+
syncProgress:
13+
enabled: false
14+
intervalSeconds: 5
15+
command:
16+
- bash
17+
- -c
18+
- |
19+
{{- .Files.Get "dataprotection/common.sh" | nindent 8 }}
20+
{{- .Files.Get "dataprotection/backup.sh" | nindent 8 }}
21+
restore:
22+
postReady:
23+
- job:
24+
image: {{ .Values.images.registry | default "docker.io" }}/{{ .Values.images.kafkactl.repository }}:{{ .Values.images.kafkactl.tag }}
25+
command:
26+
- bash
27+
- -c
28+
- |
29+
{{- .Files.Get "dataprotection/common.sh" | nindent 12 }}
30+
{{- .Files.Get "dataprotection/restore.sh" | nindent 12 }}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
apiVersion: apps.kubeblocks.io/v1alpha1
2+
kind: BackupPolicyTemplate
3+
metadata:
4+
name: kafka-backup-policy-template
5+
labels:
6+
{{- include "kafka.labels" . | nindent 4 }}
7+
annotations:
8+
dataprotection.kubeblocks.io/is-default-policy-template: "true"
9+
spec:
10+
clusterDefinitionRef: kafka
11+
backupPolicies:
12+
- componentDefs:
13+
- {{ include "kafka-broker2_8.componentDefName" . }}
14+
- {{ include "kafka-broker3_2.componentDefName" . }}
15+
- {{ include "kafka-combine.componentDefName" . }}
16+
backupMethods:
17+
- name: topics
18+
snapshotVolumes: false
19+
actionSetName: kafka-topics
20+
schedules:
21+
- backupMethod: topics
22+
enabled: false
23+
cronExpression: "0 18 * * *"
24+
retentionPeriod: 7d

addons/kafka/templates/cmpd-controller.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ spec:
7171
value: "CONTROLLER://:9093"
7272
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
7373
value: "CONTROLLER:PLAINTEXT"
74+
# FIXME: why this config is needed for controller?
75+
# kafka 3.8/3.9 controller failed to start without this
76+
- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
77+
value: "CONTROLLER"
7478
- name: ALLOW_PLAINTEXT_LISTENER
7579
value: "yes"
7680
- name: JMX_PORT

addons/kafka/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ images:
4747
jmxExporter:
4848
repository: apecloud/jmx-exporter
4949
tag: 0.18.0-debian-11-r20
50+
kafkactl:
51+
# mirrored from deviceinsight/kafkactl
52+
repository: apecloud/kafkactl
53+
tag: v5.15.0
5054

5155

5256
## @param debugEnabled enables containers' debug logging
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#!/bin/bash
2+
3+
# if the script exits with a non-zero exit code, touch a file to indicate that the backup failed,
4+
# the sync progress container will check this file and exit if it exists
5+
function handle_exit() {
6+
exit_code=$?
7+
if [ $exit_code -ne 0 ]; then
8+
echo "failed with exit code $exit_code"
9+
touch "${DP_BACKUP_INFO_FILE}.exit"
10+
exit $exit_code
11+
fi
12+
}
13+
14+
trap handle_exit EXIT
15+
setStorageConfig
16+
17+
./milvus-backup create -n "$BACKUP_NAME"
18+
19+
# use datasafed to get backup size
20+
# if we do not write into $DP_BACKUP_INFO_FILE, the backup job will stuck
21+
TOTAL_SIZE=$(datasafed stat / | grep TotalSize | awk '{print $2}')
22+
DP_save_backup_status_info "$TOTAL_SIZE"
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#!/bin/bash
2+
3+
set -eo pipefail
4+
5+
function getToolConfigValue() {
6+
local var=$1
7+
grep "$var" < "$TOOL_CONFIG" | awk '{print $NF}'
8+
}
9+
10+
# shellcheck disable=SC2034
11+
function setStorageConfig() {
12+
TOOL_CONFIG=/etc/datasafed/datasafed.conf
13+
14+
ACCESS_KEY_ID=$(getToolConfigValue access_key_id)
15+
SECRET_ACCESS_KEY=$(getToolConfigValue secret_access_key)
16+
ENDPOINT=$(getToolConfigValue endpoint)
17+
BUCKET=$(getToolConfigValue "root =")
18+
PROVIDER=$(getToolConfigValue provider)
19+
20+
if [[ "$PROVIDER" == "Alibaba" ]]; then
21+
ENDPOINT="https://${ENDPOINT}"
22+
fi
23+
24+
export PATH="$PATH:$DP_DATASAFED_BIN_PATH"
25+
export DATASAFED_BACKEND_BASE_PATH=${DP_BACKUP_BASE_PATH}
26+
27+
# only underscores are allowed in backup name
28+
BACKUP_NAME=${DP_BACKUP_NAME//-/_}
29+
30+
BACKUP_CONFIG=configs/backup.yaml
31+
MILVUS_CONFIG=/milvus/configs/user.yaml
32+
33+
if [[ -z $DP_DB_PORT ]]; then
34+
DP_DB_PORT=19530
35+
fi
36+
37+
# connection config
38+
yq -i ".milvus.address = \"$DP_DB_HOST\"" "$BACKUP_CONFIG"
39+
yq -i ".milvus.port = $DP_DB_PORT" "$BACKUP_CONFIG"
40+
yq -i ".milvus.user = \"\"" "$BACKUP_CONFIG"
41+
yq -i ".milvus.password = \"\"" "$BACKUP_CONFIG"
42+
yq -i ".backup.gcPause.address = \"http://$DP_DB_HOST:9091\"" "$BACKUP_CONFIG"
43+
44+
# milvus storage config
45+
yq -i ".minio.address = \"$MINIO_HOST\"" "$BACKUP_CONFIG"
46+
yq -i ".minio.port = \"$MINIO_PORT\"" "$BACKUP_CONFIG"
47+
yq -i ".minio.accessKeyID = \"$MINIO_ACCESS_KEY\"" "$BACKUP_CONFIG"
48+
yq -i ".minio.secretAccessKey = \"$MINIO_SECRET_KEY\"" "$BACKUP_CONFIG"
49+
50+
yq -i ".minio.bucketName = \"$MINIO_BUCKET\"" "$BACKUP_CONFIG"
51+
if [[ $MINIO_PORT == "443" ]]; then
52+
yq -i ".minio.useSSL = true" "$BACKUP_CONFIG"
53+
fi
54+
yq -i ".minio.rootPath = \"$MINIO_ROOT_PATH\"" "$BACKUP_CONFIG"
55+
# TODO: is this right?
56+
yq -i ".minio.storageType = (load(\"$MILVUS_CONFIG\") | .minio.cloudProvider)" "$BACKUP_CONFIG"
57+
58+
# backup storage config
59+
without_scheme=${ENDPOINT#http://}
60+
IFS=":" read -r -a parts <<< "$without_scheme"
61+
yq -i ".minio.backupAddress = \"${parts[0]}\"" "$BACKUP_CONFIG"
62+
# FIXME: will backupPort be empty?
63+
yq -i ".minio.backupPort = \"${parts[1]}\"" "$BACKUP_CONFIG"
64+
yq -i ".minio.backupAccessKeyID = \"$ACCESS_KEY_ID\"" "$BACKUP_CONFIG"
65+
yq -i ".minio.backupSecretAccessKey = \"$SECRET_ACCESS_KEY\"" "$BACKUP_CONFIG"
66+
yq -i ".minio.backupBucketName = \"$BUCKET\"" "$BACKUP_CONFIG"
67+
# eliminate the leading slash, or go-minio will return an empty list when listing
68+
BACKUP_ROOT_PATH=${DP_BACKUP_BASE_PATH#/}
69+
yq -i ".minio.backupRootPath = \"$BACKUP_ROOT_PATH\"" "$BACKUP_CONFIG"
70+
}
71+
72+
# Save backup status info file for syncing progress.
73+
# timeFormat: %Y-%m-%dT%H:%M:%SZ
74+
function DP_save_backup_status_info() {
75+
local totalSize=$1
76+
local startTime=$2
77+
local stopTime=$3
78+
local timeZone=$4
79+
local extras=$5
80+
local timeZoneStr=""
81+
if [ -n "${timeZone}" ]; then
82+
timeZoneStr=$(printf ',"timeZone":"%s"' "${timeZone}")
83+
fi
84+
if [ -z "${stopTime}" ]; then
85+
printf '{"totalSize":"%s"}' "${totalSize}" > "${DP_BACKUP_INFO_FILE}"
86+
elif [ -z "${startTime}" ]; then
87+
printf '{"totalSize":"%s","extras":[%s],"timeRange":{"end":"%s"%s}}' "${totalSize}" "${extras}" "${stopTime}" "${timeZoneStr}" > "${DP_BACKUP_INFO_FILE}"
88+
else
89+
printf '{"totalSize":"%s","extras":[%s],"timeRange":{"start":"%s","end":"%s"%s}}' "${totalSize}" "${extras}" "${startTime}" "${stopTime}" "${timeZoneStr}" > "${DP_BACKUP_INFO_FILE}"
90+
fi
91+
}

0 commit comments

Comments
 (0)