Skip to content

Commit 2512a1d

Browse files
Anastasios Zouziaskoeninger
authored andcommitted
[SPARK-26121][STRUCTURED STREAMING] Allow users to define prefix of Kafka's consumer group (group.id)
## What changes were proposed in this pull request? Allow the Spark Structured Streaming user to specify the prefix of the consumer group (group.id), compared to force consumer group ids of the form `spark-kafka-source-*` ## How was this patch tested? Unit tests provided by Spark (backwards compatible change, i.e., user can optionally use the functionality) `mvn test -pl external/kafka-0-10` Closes apache#23103 from zouzias/SPARK-26121. Authored-by: Anastasios Zouzias <[email protected]> Signed-off-by: cody koeninger <[email protected]>
1 parent 1bb60ab commit 2512a1d

File tree

2 files changed

+38
-17
lines changed

2 files changed

+38
-17
lines changed

docs/structured-streaming-kafka-integration.md

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
123123
</div>
124124
</div>
125125

126-
### Creating a Kafka Source for Batch Queries
126+
### Creating a Kafka Source for Batch Queries
127127
If you have a use case that is better suited to batch processing,
128128
you can create a Dataset/DataFrame for a defined range of offsets.
129129

@@ -374,17 +374,24 @@ The following configurations are optional:
374374
<td>streaming and batch</td>
375375
<td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td>
376376
</tr>
377+
<tr>
378+
<td>groupIdPrefix</td>
379+
<td>string</td>
380+
<td>spark-kafka-source</td>
381+
<td>streaming and batch</td>
382+
<td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries</td>
383+
</tr>
377384
</table>
378385

379386
## Writing Data to Kafka
380387

381-
Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that
388+
Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that
382389
Apache Kafka only supports at least once write semantics. Consequently, when writing---either Streaming Queries
383390
or Batch Queries---to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs
384391
to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record.
385-
Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However,
392+
Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However,
386393
if writing the query is successful, then you can assume that the query output was written at least once. A possible
387-
solution to remove duplicates when reading the written data could be to introduce a primary (unique) key
394+
solution to remove duplicates when reading the written data could be to introduce a primary (unique) key
388395
that can be used to perform de-duplication when reading.
389396

390397
The Dataframe being written to Kafka should have the following columns in schema:
@@ -405,8 +412,8 @@ The Dataframe being written to Kafka should have the following columns in schema
405412
</table>
406413
\* The topic column is required if the "topic" configuration option is not specified.<br>
407414

408-
The value column is the only required option. If a key column is not specified then
409-
a ```null``` valued key column will be automatically added (see Kafka semantics on
415+
The value column is the only required option. If a key column is not specified then
416+
a ```null``` valued key column will be automatically added (see Kafka semantics on
410417
how ```null``` valued key values are handled). If a topic column exists then its value
411418
is used as the topic when writing the given row to Kafka, unless the "topic" configuration
412419
option is set i.e., the "topic" configuration option overrides the topic column.
@@ -568,31 +575,33 @@ df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
568575
.format("kafka") \
569576
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
570577
.save()
571-
578+
572579
{% endhighlight %}
573580
</div>
574581
</div>
575582

576583

577584
## Kafka Specific Configurations
578585

579-
Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g,
580-
`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka parameters, see
586+
Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g,
587+
`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka parameters, see
581588
[Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs) for
582589
parameters related to reading data, and [Kafka producer config docs](http://kafka.apache.org/documentation/#producerconfigs)
583590
for parameters related to writing data.
584591

585592
Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
586593

587-
- **group.id**: Kafka source will create a unique group id for each query automatically.
594+
- **group.id**: Kafka source will create a unique group id for each query automatically. The user can
595+
set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`, default value
596+
is "spark-kafka-source".
588597
- **auto.offset.reset**: Set the source option `startingOffsets` to specify
589-
where to start instead. Structured Streaming manages which offsets are consumed internally, rather
590-
than rely on the kafka Consumer to do it. This will ensure that no data is missed when new
598+
where to start instead. Structured Streaming manages which offsets are consumed internally, rather
599+
than rely on the kafka Consumer to do it. This will ensure that no data is missed when new
591600
topics/partitions are dynamically subscribed. Note that `startingOffsets` only applies when a new
592601
streaming query is started, and that resuming will always pick up from where the query left off.
593-
- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use
602+
- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use
594603
DataFrame operations to explicitly deserialize the keys.
595-
- **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer.
604+
- **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer.
596605
Use DataFrame operations to explicitly deserialize the values.
597606
- **key.serializer**: Keys are always serialized with ByteArraySerializer or StringSerializer. Use
598607
DataFrame operations to explicitly serialize the keys into either strings or byte arrays.

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
7777
// Each running query should use its own group id. Otherwise, the query may be only assigned
7878
// partial data since Kafka will assign partitions to multiple consumers having the same group
7979
// id. Hence, we should generate a unique id for each query.
80-
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
80+
val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
8181

8282
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
8383
val specifiedKafkaParams =
@@ -119,7 +119,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
119119
// Each running query should use its own group id. Otherwise, the query may be only assigned
120120
// partial data since Kafka will assign partitions to multiple consumers having the same group
121121
// id. Hence, we should generate a unique id for each query.
122-
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
122+
val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
123123

124124
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
125125
val specifiedKafkaParams =
@@ -159,7 +159,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
159159
// Each running query should use its own group id. Otherwise, the query may be only assigned
160160
// partial data since Kafka will assign partitions to multiple consumers having the same group
161161
// id. Hence, we should generate a unique id for each query.
162-
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
162+
val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
163163

164164
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
165165
val specifiedKafkaParams =
@@ -538,6 +538,18 @@ private[kafka010] object KafkaSourceProvider extends Logging {
538538
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
539539
.build()
540540

541+
/**
542+
* Returns a unique consumer group (group.id), allowing the user to set the prefix of
543+
* the consumer group
544+
*/
545+
private def streamingUniqueGroupId(
546+
parameters: Map[String, String],
547+
metadataPath: String): String = {
548+
val groupIdPrefix = parameters
549+
.getOrElse("groupIdPrefix", "spark-kafka-source")
550+
s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}"
551+
}
552+
541553
/** Class to conveniently update Kafka config params, while logging the changes */
542554
private case class ConfigUpdater(module: String, kafkaParams: Map[String, String]) {
543555
private val map = new ju.HashMap[String, Object](kafkaParams.asJava)

0 commit comments

Comments
 (0)