Skip to content

Commit d47c219

Browse files
gaborgsomogyiMarcelo Vanzin
authored andcommitted
[SPARK-28055][SS][DSTREAMS] Add delegation token custom AdminClient configurations.
## What changes were proposed in this pull request? At the moment Kafka delegation tokens are fetched through `AdminClient` but there is no possibility to add custom configuration parameters. In [options](https://spark.apache.org/docs/2.4.3/structured-streaming-kafka-integration.html#kafka-specific-configurations) there is already a possibility to add custom configurations. In this PR I've added similar this possibility to `AdminClient`. ## How was this patch tested? Existing + added unit tests. ``` cd docs/ SKIP_API=1 jekyll build ``` Manual webpage check. Closes apache#24875 from gaborgsomogyi/SPARK-28055. Authored-by: Gabor Somogyi <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent d1ef6be commit d47c219

File tree

6 files changed

+50
-8
lines changed

6 files changed

+50
-8
lines changed

docs/structured-streaming-kafka-integration.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,11 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
818818
</tr>
819819
</table>
820820

821+
#### Kafka Specific Configurations
822+
823+
Kafka's own configurations can be set with `kafka.` prefix, e.g, `--conf spark.kafka.clusters.${cluster}.kafka.retries=1`.
824+
For possible Kafka parameters, see [Kafka adminclient config docs](http://kafka.apache.org/documentation.html#adminclientconfigs).
825+
821826
#### Caveats
822827

823828
- Obtaining delegation token for proxy user is not yet supported ([KAFKA-6945](https://issues.apache.org/jira/browse/KAFKA-6945)).

external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL
2323

2424
import org.apache.spark.SparkConf
2525
import org.apache.spark.internal.Logging
26+
import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT
2627

2728
private[spark] case class KafkaTokenClusterConf(
2829
identifier: String,
@@ -35,19 +36,21 @@ private[spark] case class KafkaTokenClusterConf(
3536
keyStoreLocation: Option[String],
3637
keyStorePassword: Option[String],
3738
keyPassword: Option[String],
38-
tokenMechanism: String) {
39+
tokenMechanism: String,
40+
specifiedKafkaParams: Map[String, String]) {
3941
override def toString: String = s"KafkaTokenClusterConf{" +
4042
s"identifier=$identifier, " +
4143
s"authBootstrapServers=$authBootstrapServers, " +
4244
s"targetServersRegex=$targetServersRegex, " +
4345
s"securityProtocol=$securityProtocol, " +
4446
s"kerberosServiceName=$kerberosServiceName, " +
4547
s"trustStoreLocation=$trustStoreLocation, " +
46-
s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
48+
s"trustStorePassword=${trustStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
4749
s"keyStoreLocation=$keyStoreLocation, " +
48-
s"keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
49-
s"keyPassword=${keyPassword.map(_ => "xxx")}, " +
50-
s"tokenMechanism=$tokenMechanism}"
50+
s"keyStorePassword=${keyStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
51+
s"keyPassword=${keyPassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
52+
s"tokenMechanism=$tokenMechanism, " +
53+
s"specifiedKafkaParams=${KafkaRedactionUtil.redactParams(specifiedKafkaParams.toSeq)}}"
5154
}
5255

5356
private [kafka010] object KafkaTokenSparkConf extends Logging {
@@ -59,6 +62,8 @@ private [kafka010] object KafkaTokenSparkConf extends Logging {
5962
def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf = {
6063
val configPrefix = s"$CLUSTERS_CONFIG_PREFIX$identifier."
6164
val sparkClusterConf = sparkConf.getAllWithPrefix(configPrefix).toMap
65+
val configKafkaPrefix = s"${configPrefix}kafka."
66+
val sparkClusterKafkaConf = sparkConf.getAllWithPrefix(configKafkaPrefix).toMap
6267
val result = KafkaTokenClusterConf(
6368
identifier,
6469
sparkClusterConf
@@ -76,7 +81,8 @@ private [kafka010] object KafkaTokenSparkConf extends Logging {
7681
sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
7782
sparkClusterConf.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG),
7883
sparkClusterConf.getOrElse("sasl.token.mechanism",
79-
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
84+
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM),
85+
sparkClusterKafkaConf
8086
)
8187
logDebug(s"getClusterConfig($identifier): $result")
8288
result

external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,16 @@ private[spark] object KafkaTokenUtil extends Logging {
134134
}
135135
}
136136

137+
logDebug("AdminClient params before specified params: " +
138+
s"${KafkaRedactionUtil.redactParams(adminClientProperties.asScala.toSeq)}")
139+
140+
clusterConf.specifiedKafkaParams.foreach { param =>
141+
adminClientProperties.setProperty(param._1, param._2)
142+
}
143+
144+
logDebug("AdminClient params after specified params: " +
145+
s"${KafkaRedactionUtil.redactParams(adminClientProperties.asScala.toSeq)}")
146+
137147
adminClientProperties
138148
}
139149

external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
107107

108108
protected def createClusterConf(
109109
identifier: String,
110-
securityProtocol: String): KafkaTokenClusterConf = {
110+
securityProtocol: String,
111+
specifiedKafkaParams: Map[String, String] = Map.empty): KafkaTokenClusterConf = {
111112
KafkaTokenClusterConf(
112113
identifier,
113114
bootStrapServers,
@@ -119,6 +120,7 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
119120
Some(keyStoreLocation),
120121
Some(keyStorePassword),
121122
Some(keyPassword),
122-
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
123+
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM,
124+
specifiedKafkaParams)
123125
}
124126
}

external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,16 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach {
9696
assert(clusterConfig.tokenMechanism === tokenMechanism)
9797
}
9898

99+
test("getClusterConfig should return specified kafka params") {
100+
sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", authBootStrapServers)
101+
sparkConf.set(s"spark.kafka.clusters.$identifier1.kafka.customKey", "customValue")
102+
103+
val clusterConfig = KafkaTokenSparkConf.getClusterConfig(sparkConf, identifier1)
104+
assert(clusterConfig.identifier === identifier1)
105+
assert(clusterConfig.authBootstrapServers === authBootStrapServers)
106+
assert(clusterConfig.specifiedKafkaParams === Map("customKey" -> "customValue"))
107+
}
108+
99109
test("getAllClusterConfigs should return empty list when nothing configured") {
100110
assert(KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).isEmpty)
101111
}

external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,15 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
155155
assert(saslJaasConfig.contains("useTicketCache=true"))
156156
}
157157

158+
test("createAdminClientProperties with specified params should include it") {
159+
val clusterConf = createClusterConf(identifier1, SASL_SSL.name,
160+
Map("customKey" -> "customValue"))
161+
162+
val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)
163+
164+
assert(adminClientProperties.get("customKey") === "customValue")
165+
}
166+
158167
test("isGlobalJaasConfigurationProvided without global config should return false") {
159168
assert(!KafkaTokenUtil.isGlobalJaasConfigurationProvided)
160169
}

0 commit comments

Comments
 (0)