Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions addons/kafka/dataprotection/backup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

# topics.txt format is like:
# (topic name) (partitions) (replication factor)
# topic1 1 1
# topic2 1 1
#
# We also ignores the __consumer_offsets topic as offsets won't be backuped up.
echo "getting topics..."
kafkactl get topics | tail -n +2 | grep -v __consumer_offsets | datasafed push - topics.txt
readarray -t topics < <(kafkactl get topics -o compact | grep -v __consumer_offsets)

for topic in "${topics[@]}"; do
echo "backing up ${topic}..."
kafkactl consume "${topic}" --from-beginning --print-keys --print-timestamps --exit --print-headers -o json-raw | datasafed push - "data/${topic}.json"
done

# use datasafed to get backup size
# if we do not write into $DP_BACKUP_INFO_FILE, the backup job will stuck
TOTAL_SIZE=$(datasafed stat / | grep TotalSize | awk '{print $2}')
DP_save_backup_status_info "$TOTAL_SIZE"
32 changes: 32 additions & 0 deletions addons/kafka/dataprotection/common.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash

set -eo pipefail

# Save backup status info file for syncing progress.
# timeFormat: %Y-%m-%dT%H:%M:%SZ
function DP_save_backup_status_info() {
local totalSize=$1
local startTime=$2
local stopTime=$3
local timeZone=$4
local extras=$5
local timeZoneStr=""
if [ -n "${timeZone}" ]; then
timeZoneStr=$(printf ',"timeZone":"%s"' "${timeZone}")
fi
if [ -z "${stopTime}" ]; then
printf '{"totalSize":"%s"}' "${totalSize}" > "${DP_BACKUP_INFO_FILE}"
elif [ -z "${startTime}" ]; then
printf '{"totalSize":"%s","extras":[%s],"timeRange":{"end":"%s"%s}}' "${totalSize}" "${extras}" "${stopTime}" "${timeZoneStr}" > "${DP_BACKUP_INFO_FILE}"
else
printf '{"totalSize":"%s","extras":[%s],"timeRange":{"start":"%s","end":"%s"%s}}' "${totalSize}" "${extras}" "${startTime}" "${stopTime}" "${timeZoneStr}" > "${DP_BACKUP_INFO_FILE}"
fi
}

if [[ -z $DP_DB_PORT ]]; then
DP_DB_PORT=9092
fi

export BROKERS="$DP_DB_HOST:$DP_DB_PORT"
export PATH="$PATH:$DP_DATASAFED_BIN_PATH"
export DATASAFED_BACKEND_BASE_PATH=${DP_BACKUP_BASE_PATH}
10 changes: 10 additions & 0 deletions addons/kafka/dataprotection/restore.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

echo "getting topics..."
readarray -t lines < <(datasafed pull topics.txt -)
for line in "${lines[@]}"; do
read -r topic partitions replication <<< "$line"
echo "restoring ${topic}..."
kafkactl create topic "$topic" --partitions "$partitions" --replication-factor "$replication"
datasafed pull "data/${topic}.json" - | kafkactl produce "$topic" --input-format=json
done
2 changes: 2 additions & 0 deletions addons/kafka/scripts/kafka-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ kafka_env_vars=(
KAFKA_CFG_MAX_PARTITION_FETCH_BYTES
KAFKA_ENABLE_KRAFT
KAFKA_KRAFT_CLUSTER_ID
KAFKA_SKIP_KRAFT_STORAGE_INIT
KAFKA_ZOOKEEPER_PROTOCOL
KAFKA_ZOOKEEPER_PASSWORD
KAFKA_ZOOKEEPER_USER
Expand Down Expand Up @@ -107,6 +108,7 @@ export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="${KAFKA_CFG_INTER_BROKER_LISTENER_N
export KAFKA_CFG_MAX_REQUEST_SIZE="${KAFKA_CFG_MAX_REQUEST_SIZE:-}"
export KAFKA_CFG_MAX_PARTITION_FETCH_BYTES="${KAFKA_CFG_MAX_PARTITION_FETCH_BYTES:-}"
export KAFKA_ENABLE_KRAFT="${KAFKA_ENABLE_KRAFT:-yes}"
export KAFKA_SKIP_KRAFT_STORAGE_INIT="${KAFKA_SKIP_KRAFT_STORAGE_INIT:-false}"
export KAFKA_KRAFT_CLUSTER_ID="${KAFKA_KRAFT_CLUSTER_ID:-}"

# ZooKeeper connection settings
Expand Down
30 changes: 30 additions & 0 deletions addons/kafka/templates/actionset.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
apiVersion: dataprotection.kubeblocks.io/v1alpha1
kind: ActionSet
metadata:
name: kafka-topics
labels:
{{- include "kafka.labels" . | nindent 4 }}
spec:
backupType: Full
backup:
backupData:
image: {{ .Values.images.registry | default "docker.io" }}/{{ .Values.images.kafkactl.repository }}:{{ .Values.images.kafkactl.tag }}
syncProgress:
enabled: false
intervalSeconds: 5
command:
- bash
- -c
- |
{{- .Files.Get "dataprotection/common.sh" | nindent 8 }}
{{- .Files.Get "dataprotection/backup.sh" | nindent 8 }}
restore:
postReady:
- job:
image: {{ .Values.images.registry | default "docker.io" }}/{{ .Values.images.kafkactl.repository }}:{{ .Values.images.kafkactl.tag }}
command:
- bash
- -c
- |
{{- .Files.Get "dataprotection/common.sh" | nindent 12 }}
{{- .Files.Get "dataprotection/restore.sh" | nindent 12 }}
24 changes: 24 additions & 0 deletions addons/kafka/templates/backuppolicytemplate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: apps.kubeblocks.io/v1alpha1
kind: BackupPolicyTemplate
metadata:
name: kafka-backup-policy-template
labels:
{{- include "kafka.labels" . | nindent 4 }}
annotations:
dataprotection.kubeblocks.io/is-default-policy-template: "true"
spec:
clusterDefinitionRef: kafka
backupPolicies:
- componentDefs:
- {{ include "kafka-broker2_8.componentDefName" . }}
- {{ include "kafka-broker3_2.componentDefName" . }}
- {{ include "kafka-combine.componentDefName" . }}
backupMethods:
- name: topics
snapshotVolumes: false
actionSetName: kafka-topics
schedules:
- backupMethod: topics
enabled: false
cronExpression: "0 18 * * *"
retentionPeriod: 7d
4 changes: 4 additions & 0 deletions addons/kafka/templates/cmpd-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ spec:
value: "CONTROLLER://:9093"
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
value: "CONTROLLER:PLAINTEXT"
# FIXME: why this config is needed for controller?
# kafka 3.8/3.9 controller failed to start without this
- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
value: "CONTROLLER"
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
- name: JMX_PORT
Expand Down
4 changes: 4 additions & 0 deletions addons/kafka/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ images:
jmxExporter:
repository: apecloud/jmx-exporter
tag: 0.18.0-debian-11-r20
kafkactl:
# mirrored from deviceinsight/kafkactl
repository: apecloud/kafkactl
tag: v5.15.0


## @param debugEnabled enables containers' debug logging
Expand Down
22 changes: 22 additions & 0 deletions addons/milvus/dataprotection/backup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash

# if the script exits with a non-zero exit code, touch a file to indicate that the backup failed,
# the sync progress container will check this file and exit if it exists
function handle_exit() {
exit_code=$?
if [ $exit_code -ne 0 ]; then
echo "failed with exit code $exit_code"
touch "${DP_BACKUP_INFO_FILE}.exit"
exit $exit_code
fi
}

trap handle_exit EXIT
setStorageConfig

./milvus-backup create -n "$BACKUP_NAME"

# use datasafed to get backup size
# if we do not write into $DP_BACKUP_INFO_FILE, the backup job will stuck
TOTAL_SIZE=$(datasafed stat / | grep TotalSize | awk '{print $2}')
DP_save_backup_status_info "$TOTAL_SIZE"
91 changes: 91 additions & 0 deletions addons/milvus/dataprotection/common.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/bin/bash

set -eo pipefail

function getToolConfigValue() {
local var=$1
grep "$var" < "$TOOL_CONFIG" | awk '{print $NF}'
}

# shellcheck disable=SC2034
function setStorageConfig() {
TOOL_CONFIG=/etc/datasafed/datasafed.conf

ACCESS_KEY_ID=$(getToolConfigValue access_key_id)
SECRET_ACCESS_KEY=$(getToolConfigValue secret_access_key)
ENDPOINT=$(getToolConfigValue endpoint)
BUCKET=$(getToolConfigValue "root =")
PROVIDER=$(getToolConfigValue provider)

if [[ "$PROVIDER" == "Alibaba" ]]; then
ENDPOINT="https://${ENDPOINT}"
fi

export PATH="$PATH:$DP_DATASAFED_BIN_PATH"
export DATASAFED_BACKEND_BASE_PATH=${DP_BACKUP_BASE_PATH}

# only underscores are allowed in backup name
BACKUP_NAME=${DP_BACKUP_NAME//-/_}

BACKUP_CONFIG=configs/backup.yaml
MILVUS_CONFIG=/milvus/configs/user.yaml

if [[ -z $DP_DB_PORT ]]; then
DP_DB_PORT=19530
fi

# connection config
yq -i ".milvus.address = \"$DP_DB_HOST\"" "$BACKUP_CONFIG"
yq -i ".milvus.port = $DP_DB_PORT" "$BACKUP_CONFIG"
yq -i ".milvus.user = \"\"" "$BACKUP_CONFIG"
yq -i ".milvus.password = \"\"" "$BACKUP_CONFIG"
yq -i ".backup.gcPause.address = \"http://$DP_DB_HOST:9091\"" "$BACKUP_CONFIG"

# milvus storage config
yq -i ".minio.address = \"$MINIO_HOST\"" "$BACKUP_CONFIG"
yq -i ".minio.port = \"$MINIO_PORT\"" "$BACKUP_CONFIG"
yq -i ".minio.accessKeyID = \"$MINIO_ACCESS_KEY\"" "$BACKUP_CONFIG"
yq -i ".minio.secretAccessKey = \"$MINIO_SECRET_KEY\"" "$BACKUP_CONFIG"

yq -i ".minio.bucketName = \"$MINIO_BUCKET\"" "$BACKUP_CONFIG"
if [[ $MINIO_PORT == "443" ]]; then
yq -i ".minio.useSSL = true" "$BACKUP_CONFIG"
fi
yq -i ".minio.rootPath = \"$MINIO_ROOT_PATH\"" "$BACKUP_CONFIG"
# TODO: is this right?
yq -i ".minio.storageType = (load(\"$MILVUS_CONFIG\") | .minio.cloudProvider)" "$BACKUP_CONFIG"

# backup storage config
without_scheme=${ENDPOINT#http://}
IFS=":" read -r -a parts <<< "$without_scheme"
yq -i ".minio.backupAddress = \"${parts[0]}\"" "$BACKUP_CONFIG"
# FIXME: will backupPort be empty?
yq -i ".minio.backupPort = \"${parts[1]}\"" "$BACKUP_CONFIG"
yq -i ".minio.backupAccessKeyID = \"$ACCESS_KEY_ID\"" "$BACKUP_CONFIG"
yq -i ".minio.backupSecretAccessKey = \"$SECRET_ACCESS_KEY\"" "$BACKUP_CONFIG"
yq -i ".minio.backupBucketName = \"$BUCKET\"" "$BACKUP_CONFIG"
# eliminate the leading slash, or go-minio will return an empty list when listing
BACKUP_ROOT_PATH=${DP_BACKUP_BASE_PATH#/}
yq -i ".minio.backupRootPath = \"$BACKUP_ROOT_PATH\"" "$BACKUP_CONFIG"
}

# Save backup status info file for syncing progress.
# timeFormat: %Y-%m-%dT%H:%M:%SZ
function DP_save_backup_status_info() {
local totalSize=$1
local startTime=$2
local stopTime=$3
local timeZone=$4
local extras=$5
local timeZoneStr=""
if [ -n "${timeZone}" ]; then
timeZoneStr=$(printf ',"timeZone":"%s"' "${timeZone}")
fi
if [ -z "${stopTime}" ]; then
printf '{"totalSize":"%s"}' "${totalSize}" > "${DP_BACKUP_INFO_FILE}"
elif [ -z "${startTime}" ]; then
printf '{"totalSize":"%s","extras":[%s],"timeRange":{"end":"%s"%s}}' "${totalSize}" "${extras}" "${stopTime}" "${timeZoneStr}" > "${DP_BACKUP_INFO_FILE}"
else
printf '{"totalSize":"%s","extras":[%s],"timeRange":{"start":"%s","end":"%s"%s}}' "${totalSize}" "${extras}" "${startTime}" "${stopTime}" "${timeZoneStr}" > "${DP_BACKUP_INFO_FILE}"
fi
}
5 changes: 5 additions & 0 deletions addons/milvus/dataprotection/restore.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

setStorageConfig

./milvus-backup restore -n "$BACKUP_NAME"
33 changes: 33 additions & 0 deletions addons/milvus/templates/actionset.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: dataprotection.kubeblocks.io/v1alpha1
kind: ActionSet
metadata:
name: milvus-full
labels:
{{- include "milvus.labels" . | nindent 4 }}
spec:
backupType: Full
backup:
backupData:
image: {{ .Values.images.registry | default "docker.io" }}/{{ .Values.images.milvusBackup.repository }}:v0.5.9-yq
# runOnTargetPodNode is needed to let backup controller mount volumes for backup job
runOnTargetPodNode: true
syncProgress:
enabled: false
intervalSeconds: 5
command:
- bash
- -c
- |
{{- .Files.Get "dataprotection/common.sh" | nindent 8 }}
{{- .Files.Get "dataprotection/backup.sh" | nindent 8 }}
restore:
postReady:
- job:
image: {{ .Values.images.registry | default "docker.io" }}/{{ .Values.images.milvusBackup.repository }}:v0.5.9-yq
runOnTargetPodNode: true
command:
- bash
- -c
- |
{{- .Files.Get "dataprotection/common.sh" | nindent 12 }}
{{- .Files.Get "dataprotection/restore.sh" | nindent 12 }}
18 changes: 16 additions & 2 deletions addons/milvus/templates/backuppolicytemplate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,29 @@ metadata:
spec:
clusterDefinitionRef: milvus
backupPolicies:
- componentDefRef: milvus
- componentDefs:
- milvus-standalone
backupMethods:
- name: volume-snapshot
snapshotVolumes: true
targetVolumes:
volumes:
- data
- componentDefs:
- milvus-proxy
backupMethods:
- name: full
snapshotVolumes: false
actionSetName: milvus-full
targetVolumes:
volumes:
- milvus-config
volumeMounts:
- mountPath: /milvus/configs/
name: milvus-config
readOnly: true
schedules:
- backupMethod: volume-snapshot
enabled: false
retentionPeriod: 7d
cronExpression: "0 18 * * 0"
cronExpression: "0 18 * * 0"
6 changes: 6 additions & 0 deletions addons/milvus/tools/Dockerfile.backup
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# build with
# docker buildx build --platform linux/amd64,linux/arm64 --tag apecloud/milvus-backup:v0.5.9-yq -f Dockerfile.backup .

FROM milvusdb/milvus-backup:v0.5.9

RUN apk update && apk add --no-cache bash yq
2 changes: 2 additions & 0 deletions addons/milvus/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ images:
registry: ""
repository: apecloud/os-shell
tag: 11-debian-11-r90
milvusBackup:
repository: apecloud/milvus-backup

livenessProbe:
enabled: true
Expand Down
Loading