Skip to content

Commit 94adffa

Browse files
gaborgsomogyiMarcelo Vanzin
authored andcommitted
[SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug possibility
## What changes were proposed in this pull request? `Krb5LoginModule` supports debug parameter which is not yet supported from Spark side. This configuration makes it easier to debug authentication issues against Kafka. In this PR `Krb5LoginModule` debug flag controlled by either `sun.security.krb5.debug` or `com.ibm.security.krb5.Krb5Debug`. Additionally found some hardcoded values like `ssl.truststore.location`, etc... which could be error prone if Kafka changes it so in such cases Kafka define used. ## How was this patch tested? Existing + additional unit tests + on cluster. Closes apache#24204 from gaborgsomogyi/SPARK-27270. Authored-by: Gabor Somogyi <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 5d8aee5 commit 94adffa

File tree

2 files changed

+42
-22
lines changed

2 files changed

+42
-22
lines changed

external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.hadoop.security.token.{Token, TokenIdentifier}
2828
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
2929
import org.apache.kafka.clients.CommonClientConfigs
3030
import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions}
31-
import org.apache.kafka.common.config.SaslConfigs
31+
import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
3232
import org.apache.kafka.common.security.JaasContext
3333
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
3434
import org.apache.kafka.common.security.scram.ScramLoginModule
@@ -136,29 +136,30 @@ private[spark] object KafkaTokenUtil extends Logging {
136136

137137
private def setTrustStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
138138
sparkConf.get(Kafka.TRUSTSTORE_LOCATION).foreach { truststoreLocation =>
139-
properties.put("ssl.truststore.location", truststoreLocation)
139+
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation)
140140
}
141141
sparkConf.get(Kafka.TRUSTSTORE_PASSWORD).foreach { truststorePassword =>
142-
properties.put("ssl.truststore.password", truststorePassword)
142+
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword)
143143
}
144144
}
145145

146146
private def setKeyStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
147147
sparkConf.get(Kafka.KEYSTORE_LOCATION).foreach { keystoreLocation =>
148-
properties.put("ssl.keystore.location", keystoreLocation)
148+
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation)
149149
}
150150
sparkConf.get(Kafka.KEYSTORE_PASSWORD).foreach { keystorePassword =>
151-
properties.put("ssl.keystore.password", keystorePassword)
151+
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword)
152152
}
153153
sparkConf.get(Kafka.KEY_PASSWORD).foreach { keyPassword =>
154-
properties.put("ssl.key.password", keyPassword)
154+
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword)
155155
}
156156
}
157157

158158
private def getKeytabJaasParams(sparkConf: SparkConf): String = {
159159
val params =
160160
s"""
161161
|${getKrb5LoginModuleName} required
162+
| debug=${isGlobalKrbDebugEnabled()}
162163
| useKeyTab=true
163164
| serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
164165
| keyTab="${sparkConf.get(KEYTAB).get}"
@@ -175,6 +176,7 @@ private[spark] object KafkaTokenUtil extends Logging {
175176
val params =
176177
s"""
177178
|${getKrb5LoginModuleName} required
179+
| debug=${isGlobalKrbDebugEnabled()}
178180
| useTicketCache=true
179181
| serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}";
180182
""".stripMargin.replace("\n", "")
@@ -194,6 +196,16 @@ private[spark] object KafkaTokenUtil extends Logging {
194196
}
195197
}
196198

199+
private def isGlobalKrbDebugEnabled(): Boolean = {
200+
if (System.getProperty("java.vendor").contains("IBM")) {
201+
val debug = System.getenv("com.ibm.security.krb5.Krb5Debug")
202+
debug != null && debug.equalsIgnoreCase("all")
203+
} else {
204+
val debug = System.getenv("sun.security.krb5.debug")
205+
debug != null && debug.equalsIgnoreCase("true")
206+
}
207+
}
208+
197209
private def printToken(token: DelegationToken): Unit = {
198210
if (log.isDebugEnabled) {
199211
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")

external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.security.PrivilegedExceptionAction
2121

2222
import org.apache.hadoop.security.UserGroupInformation
2323
import org.apache.kafka.clients.CommonClientConfigs
24-
import org.apache.kafka.common.config.SaslConfigs
24+
import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
2525
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
2626

2727
import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -83,11 +83,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
8383
=== bootStrapServers)
8484
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
8585
=== SASL_PLAINTEXT.name)
86-
assert(!adminClientProperties.containsKey("ssl.truststore.location"))
87-
assert(!adminClientProperties.containsKey("ssl.truststore.password"))
88-
assert(!adminClientProperties.containsKey("ssl.keystore.location"))
89-
assert(!adminClientProperties.containsKey("ssl.keystore.password"))
90-
assert(!adminClientProperties.containsKey("ssl.key.password"))
86+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
87+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
88+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
89+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
90+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG))
9191
}
9292

9393
test("createAdminClientProperties with SASL_SSL protocol should include truststore config") {
@@ -105,11 +105,13 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
105105
=== bootStrapServers)
106106
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
107107
=== SASL_SSL.name)
108-
assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation)
109-
assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword)
110-
assert(!adminClientProperties.containsKey("ssl.keystore.location"))
111-
assert(!adminClientProperties.containsKey("ssl.keystore.password"))
112-
assert(!adminClientProperties.containsKey("ssl.key.password"))
108+
assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
109+
=== trustStoreLocation)
110+
assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)
111+
=== trustStorePassword)
112+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
113+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
114+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG))
113115
}
114116

115117
test("createAdminClientProperties with SSL protocol should include keystore and truststore " +
@@ -128,11 +130,13 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
128130
=== bootStrapServers)
129131
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
130132
=== SSL.name)
131-
assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation)
132-
assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword)
133-
assert(adminClientProperties.get("ssl.keystore.location") === keyStoreLocation)
134-
assert(adminClientProperties.get("ssl.keystore.password") === keyStorePassword)
135-
assert(adminClientProperties.get("ssl.key.password") === keyPassword)
133+
assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
134+
=== trustStoreLocation)
135+
assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)
136+
=== trustStorePassword)
137+
assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG) === keyStoreLocation)
138+
assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) === keyStorePassword)
139+
assert(adminClientProperties.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG) === keyPassword)
136140
}
137141

138142
test("createAdminClientProperties with global config should not set dynamic jaas config") {
@@ -165,7 +169,10 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
165169
assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
166170
val saslJaasConfig = adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
167171
assert(saslJaasConfig.contains("Krb5LoginModule required"))
172+
assert(saslJaasConfig.contains(s"debug="))
168173
assert(saslJaasConfig.contains("useKeyTab=true"))
174+
assert(saslJaasConfig.contains(s"""keyTab="$keytab""""))
175+
assert(saslJaasConfig.contains(s"""principal="$principal""""))
169176
}
170177

171178
test("createAdminClientProperties without keytab should set ticket cache dynamic jaas config") {
@@ -181,6 +188,7 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
181188
assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
182189
val saslJaasConfig = adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
183190
assert(saslJaasConfig.contains("Krb5LoginModule required"))
191+
assert(saslJaasConfig.contains(s"debug="))
184192
assert(saslJaasConfig.contains("useTicketCache=true"))
185193
}
186194

0 commit comments

Comments
 (0)