@@ -103,6 +103,13 @@ kafka_get_version() {
103103
104104# #######################
105105# Returns true if ZooKeeper is supported as metadata storage
106+ # Globals:
107+ # None
108+ # Arguments:
109+ # None
110+ # Returns:
111+ # true/false
112+ # ########################
106113kafka_is_zookeeper_supported () {
107114 major_version=" $( get_sematic_version " $( kafka_get_version) " 1) "
108115 if [[ " $major_version " -lt " 4" ]]; then
@@ -943,7 +950,9 @@ kafka_initialize() {
943950 cp -Lr " $KAFKA_MOUNTED_CONF_DIR " /* " $KAFKA_CONF_DIR "
944951 fi
945952 # Copy truststore to cert directory
946- for cert_var in KAFKA_TLS_TRUSTSTORE_FILE KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE; do
953+ local -a certs_vars=(" KAFKA_TLS_TRUSTSTORE_FILE" )
954+ kafka_is_zookeeper_supported && certs_vars+=(" KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE" )
955+ for cert_var in " ${certs_vars[@]} " ; do
947956 # Only copy if the file exists and it is in a different location than KAFKA_CERTS_DIR (to avoid copying to the same location)
948957 if [[ -f " ${! cert_var} " ]] && ! [[ " ${! cert_var} " =~ $KAFKA_CERTS_DIR ]]; then
949958 info " Copying truststore ${! cert_var} to ${KAFKA_CERTS_DIR} "
@@ -995,41 +1004,43 @@ kafka_initialize() {
9951004 kafka_server_conf_set sasl.enabled.mechanisms " $KAFKA_CFG_SASL_ENABLED_MECHANISMS "
9961005 fi
9971006 # Settings for each Kafka Listener are configured individually
998- read -r -a protocol_maps <<< " $(tr ',' ' ' <<<" $KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP " )"
999- for protocol_map in " ${protocol_maps[@]} " ; do
1000- read -r -a map <<< " $(tr ':' ' ' <<<" $protocol_map " )"
1001- # Obtain the listener and protocol from protocol map string, e.g. CONTROLLER:PLAINTEXT
1002- listener=" ${map[0]} "
1003- protocol=" ${map[1]} "
1004- listener_lower=" $( echo " $listener " | tr ' [:upper:]' ' [:lower:]' ) "
1007+ if ! is_empty_value " ${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:- } " ; then
1008+ read -r -a protocol_maps <<< " $(tr ',' ' ' <<<" $KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP " )"
1009+ for protocol_map in " ${protocol_maps[@]} " ; do
1010+ read -r -a map <<< " $(tr ':' ' ' <<<" $protocol_map " )"
1011+ # Obtain the listener and protocol from protocol map string, e.g. CONTROLLER:PLAINTEXT
1012+ listener=" ${map[0]} "
1013+ protocol=" ${map[1]} "
1014+ listener_lower=" $( echo " $listener " | tr ' [:upper:]' ' [:lower:]' ) "
10051015
1006- if [[ " $protocol " = " SSL" || " $protocol " = " SASL_SSL" ]]; then
1007- listener_upper=" $( echo " $listener " | tr ' [:lower:]' ' [:upper:]' ) "
1008- env_name=" KAFKA_TLS_${listener_upper} _CLIENT_AUTH"
1009- [[ -n " ${! env_name:- } " ]] && kafka_server_conf_set " listener.name.${listener_lower} .ssl.client.auth" " ${! env_name} "
1010- fi
1011- if [[ " $protocol " = " SASL_PLAINTEXT" || " $protocol " = " SASL_SSL" ]]; then
1012- local role=" "
1013- if [[ " $listener " = " ${KAFKA_CFG_INTER_BROKER_LISTENER_NAME:- INTERNAL} " ]]; then
1014- kafka_server_conf_set sasl.mechanism.inter.broker.protocol " $KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL "
1015- role=" inter-broker"
1016- elif [[ " ${KAFKA_CFG_CONTROLLER_LISTENER_NAMES:- CONTROLLER} " =~ $listener ]]; then
1017- kafka_server_conf_set sasl.mechanism.controller.protocol " $KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL "
1018- kafka_server_conf_set " listener.name.${listener_lower} .sasl.enabled.mechanisms" " $KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL "
1019- role=" controller"
1020- fi
1021- # If KAFKA_CLIENT_LISTENER_NAME is found in the listeners list, configure the producer/consumer accordingly
1022- if [[ " $listener " = " ${KAFKA_CLIENT_LISTENER_NAME:- CLIENT} " ]]; then
1023- kafka_configure_consumer_producer_jaas
1024- kafka_producer_consumer_conf_set security.protocol " $protocol "
1025- kafka_producer_consumer_conf_set sasl.mechanism " ${KAFKA_CLIENT_SASL_MECHANISM:- $(kafka_client_sasl_mechanism)} "
1016+ if [[ " $protocol " = " SSL" || " $protocol " = " SASL_SSL" ]]; then
1017+ listener_upper=" $( echo " $listener " | tr ' [:lower:]' ' [:upper:]' ) "
1018+ env_name=" KAFKA_TLS_${listener_upper} _CLIENT_AUTH"
1019+ [[ -n " ${! env_name:- } " ]] && kafka_server_conf_set " listener.name.${listener_lower} .ssl.client.auth" " ${! env_name} "
10261020 fi
1027- # Configure inline listener jaas configuration, omitted if mounted JAAS conf file detected
1028- if [[ ! -f " ${KAFKA_CONF_DIR} /kafka_jaas.conf" ]]; then
1029- kafka_configure_server_jaas " $listener_lower " " ${role:- } "
1021+ if [[ " $protocol " = " SASL_PLAINTEXT" || " $protocol " = " SASL_SSL" ]]; then
1022+ local role=" "
1023+ if [[ " $listener " = " ${KAFKA_CFG_INTER_BROKER_LISTENER_NAME:- INTERNAL} " ]]; then
1024+ kafka_server_conf_set sasl.mechanism.inter.broker.protocol " $KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL "
1025+ role=" inter-broker"
1026+ elif [[ " ${KAFKA_CFG_CONTROLLER_LISTENER_NAMES:- CONTROLLER} " =~ $listener ]]; then
1027+ kafka_server_conf_set sasl.mechanism.controller.protocol " $KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL "
1028+ kafka_server_conf_set " listener.name.${listener_lower} .sasl.enabled.mechanisms" " $KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL "
1029+ role=" controller"
1030+ fi
1031+ # If KAFKA_CLIENT_LISTENER_NAME is found in the listeners list, configure the producer/consumer accordingly
1032+ if [[ " $listener " = " ${KAFKA_CLIENT_LISTENER_NAME:- CLIENT} " ]]; then
1033+ kafka_configure_consumer_producer_jaas
1034+ kafka_producer_consumer_conf_set security.protocol " $protocol "
1035+ kafka_producer_consumer_conf_set sasl.mechanism " ${KAFKA_CLIENT_SASL_MECHANISM:- $(kafka_client_sasl_mechanism)} "
1036+ fi
1037+ # Configure inline listener jaas configuration, omitted if mounted JAAS conf file detected
1038+ if [[ ! -f " ${KAFKA_CONF_DIR} /kafka_jaas.conf" ]]; then
1039+ kafka_configure_server_jaas " $listener_lower " " ${role:- } "
1040+ fi
10301041 fi
1031- fi
1032- done
1042+ done
1043+ fi
10331044 # Configure Kafka using environment variables
10341045 # This is executed at the end, to allow users to override properties set by the initialization logic
10351046 kafka_configure_from_environment_variables
0 commit comments