Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion addons/kafka/dataprotection/backup.sh
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
#!/bin/bash

# if the script exits with a non-zero exit code, touch a file to indicate that the backup failed,
# the sync progress container will check this file and exit if it exists
function handle_exit() {
exit_code=$?
if [ $exit_code -ne 0 ]; then
echo "failed with exit code $exit_code"
touch "${DP_BACKUP_INFO_FILE}.exit"
exit $exit_code
fi
}

trap handle_exit EXIT

# topics.txt format is like:
# (topic name) (partitions) (replication factor)
# topic1 1 1
# topic2 1 1
#
# We also ignores the __consumer_offsets topic as offsets won't be backuped up.
echo "getting topics..."
kafkactl get topics | tail -n +2 | grep -v __consumer_offsets | datasafed push - topics.txt
topic_list=$(kafkactl get topics | tail -n +2)
if [[ -z $topic_list ]]; then
echo "nothing to backup"
exit 1
fi
echo $topic_list | grep -v __consumer_offsets | datasafed push - topics.txt
readarray -t topics < <(kafkactl get topics -o compact | grep -v __consumer_offsets)

for topic in "${topics[@]}"; do
Expand Down
12 changes: 12 additions & 0 deletions addons/kafka/dataprotection/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,15 @@ function DP_save_backup_status_info() {
export BROKERS="$DP_DB_HOST:$DP_DB_PORT"
export PATH="$PATH:$DP_DATASAFED_BIN_PATH"
export DATASAFED_BACKEND_BASE_PATH=${DP_BACKUP_BASE_PATH}

if [[ $KB_KAFKA_SASL_ENABLE == "true" ]]; then
echo "using sasl auth.."
if [[ $KB_KAFKA_SASL_MECHANISMS != *"PLAIN"* ]]; then
echo "unsupported KB_KAFKA_SASL_MECHANISMS: $KB_KAFKA_SASL_MECHANISMS"
exit 1
fi
export SASL_ENABLED="true"
export SASL_MECHANISM="plaintext"
export SASL_USERNAME=$KAFKA_ADMIN_USER
export SASL_PASSWORD=$KAFKA_ADMIN_PASSWORD
fi