Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 4fbf748

Browse files
committed
[SPARK-21893][BUILD][STREAMING][WIP] Put Kafka 0.8 behind a profile
## What changes were proposed in this pull request? Put Kafka 0.8 support behind a kafka-0-8 profile. ## How was this patch tested? Existing tests, but, until PR builder and Jenkins configs are updated the effect here is to not build or test Kafka 0.8 support at all. Author: Sean Owen <[email protected]> Closes apache#19134 from srowen/SPARK-21893.
1 parent a1d98c6 commit 4fbf748

File tree

22 files changed

+127
-258
lines changed

22 files changed

+127
-258
lines changed

dev/create-release/release-build.sh

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,17 @@ NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging uploads
8080
BASE_DIR=$(pwd)
8181

8282
MVN="build/mvn --force"
83-
PUBLISH_PROFILES="-Pmesos -Pyarn -Phive -Phive-thriftserver"
84-
PUBLISH_PROFILES="$PUBLISH_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
83+
84+
# Hive-specific profiles for some builds
85+
HIVE_PROFILES="-Phive -Phive-thriftserver"
86+
# Profiles for publishing snapshots and release to Maven Central
87+
PUBLISH_PROFILES="-Pmesos -Pyarn $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
88+
# Profiles for building binary releases
89+
BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Psparkr"
90+
# Scala 2.11 only profiles for some builds
91+
SCALA_2_11_PROFILES="-Pkafka-0-8"
92+
# Scala 2.12 only profiles for some builds
93+
SCALA_2_12_PROFILES="-Pscala-2.12"
8594

8695
rm -rf spark
8796
git clone https://git-wip-us.apache.org/repos/asf/spark.git
@@ -235,10 +244,9 @@ if [[ "$1" == "package" ]]; then
235244

236245
# We increment the Zinc port each time to avoid OOM's and other craziness if multiple builds
237246
# share the same Zinc server.
238-
FLAGS="-Psparkr -Phive -Phive-thriftserver -Pyarn -Pmesos"
239-
make_binary_release "hadoop2.6" "-Phadoop-2.6 $FLAGS" "3035" "withr" &
240-
make_binary_release "hadoop2.7" "-Phadoop-2.7 $FLAGS" "3036" "withpip" &
241-
make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn -Pmesos" "3038" &
247+
make_binary_release "hadoop2.6" "-Phadoop-2.6 $HIVE_PROFILES $SCALA_2_11_PROFILES $BASE_RELEASE_PROFILES" "3035" "withr" &
248+
make_binary_release "hadoop2.7" "-Phadoop-2.7 $HIVE_PROFILES $SCALA_2_11_PROFILES $BASE_RELEASE_PROFILES" "3036" "withpip" &
249+
make_binary_release "without-hadoop" "-Phadoop-provided $SCALA_2_11_PROFILES $BASE_RELEASE_PROFILES" "3038" &
242250
wait
243251
rm -rf spark-$SPARK_VERSION-bin-*/
244252

@@ -304,10 +312,10 @@ if [[ "$1" == "publish-snapshot" ]]; then
304312
# Generate random point for Zinc
305313
export ZINC_PORT=$(python -S -c "import random; print random.randrange(3030,4030)")
306314

307-
$MVN -DzincPort=$ZINC_PORT --settings $tmp_settings -DskipTests $PUBLISH_PROFILES deploy
315+
$MVN -DzincPort=$ZINC_PORT --settings $tmp_settings -DskipTests $SCALA_2_11_PROFILES $PUBLISH_PROFILES deploy
308316
#./dev/change-scala-version.sh 2.12
309-
#$MVN -DzincPort=$ZINC_PORT -Pscala-2.12 --settings $tmp_settings \
310-
# -DskipTests $PUBLISH_PROFILES clean deploy
317+
#$MVN -DzincPort=$ZINC_PORT --settings $tmp_settings \
318+
# -DskipTests $SCALA_2_12_PROFILES $PUBLISH_PROFILES clean deploy
311319

312320
# Clean-up Zinc nailgun process
313321
/usr/sbin/lsof -P |grep $ZINC_PORT | grep LISTEN | awk '{ print $2; }' | xargs kill
@@ -340,11 +348,11 @@ if [[ "$1" == "publish-release" ]]; then
340348
# Generate random point for Zinc
341349
export ZINC_PORT=$(python -S -c "import random; print random.randrange(3030,4030)")
342350

343-
$MVN -DzincPort=$ZINC_PORT -Dmaven.repo.local=$tmp_repo -DskipTests $PUBLISH_PROFILES clean install
351+
$MVN -DzincPort=$ZINC_PORT -Dmaven.repo.local=$tmp_repo -DskipTests $SCALA_2_11_PROFILES $PUBLISH_PROFILES clean install
344352

345353
#./dev/change-scala-version.sh 2.12
346-
#$MVN -DzincPort=$ZINC_PORT -Dmaven.repo.local=$tmp_repo -Pscala-2.12 \
347-
# -DskipTests $PUBLISH_PROFILES clean install
354+
#$MVN -DzincPort=$ZINC_PORT -Dmaven.repo.local=$tmp_repo \
355+
# -DskipTests $SCALA_2_12_PROFILES §$PUBLISH_PROFILES clean install
348356

349357
# Clean-up Zinc nailgun process
350358
/usr/sbin/lsof -P |grep $ZINC_PORT | grep LISTEN | awk '{ print $2; }' | xargs kill

dev/mima

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ set -e
2424
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
2525
cd "$FWDIR"
2626

27-
SPARK_PROFILES="-Pmesos -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
27+
SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
2828
TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)"
2929
OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"
3030

dev/scalastyle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ ERRORS=$(echo -e "q\n" \
2323
| build/sbt \
2424
-Pkinesis-asl \
2525
-Pmesos \
26+
-Pkafka-0-8 \
2627
-Pyarn \
2728
-Phive \
2829
-Phive-thriftserver \

dev/sparktestsupport/modules.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,12 @@ def __hash__(self):
249249
"external/kafka-0-8",
250250
"external/kafka-0-8-assembly",
251251
],
252+
build_profile_flags=[
253+
"-Pkafka-0-8",
254+
],
255+
environ={
256+
"ENABLE_KAFKA_0_8_TESTS": "1"
257+
},
252258
sbt_test_goals=[
253259
"streaming-kafka-0-8/test",
254260
]

dev/test-dependencies.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export LC_ALL=C
2929
# TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution.
3030

3131
# NOTE: These should match those in the release publishing script
32-
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pyarn -Phive"
32+
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Phive"
3333
MVN="build/mvn"
3434
HADOOP_PROFILES=(
3535
hadoop-2.6

docs/building-spark.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,15 @@ like ZooKeeper and Hadoop itself.
9090
## Building with Mesos support
9191

9292
./build/mvn -Pmesos -DskipTests clean package
93+
94+
## Building with Kafka 0.8 support
95+
96+
Kafka 0.8 support must be explicitly enabled with the `kafka-0-8` profile.
97+
Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.
98+
99+
./build/mvn -Pkafka-0-8 -DskipTests clean package
100+
101+
Kafka 0.10 support is still automatically built.
93102

94103
## Building submodules individually
95104

docs/streaming-kafka-0-8-integration.md

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
layout: global
33
title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher)
44
---
5+
6+
**Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.**
7+
58
Here we explain how to configure Spark Streaming to receive data from Kafka. There are two approaches to this - the old approach using Receivers and Kafka's high-level API, and a new approach (introduced in Spark 1.3) without using Receivers. They have different programming models, performance characteristics, and semantics guarantees, so read on for more details. Both approaches are considered stable APIs as of the current version of Spark.
69

710
## Approach 1: Receiver-based Approach
@@ -28,8 +31,7 @@ Next, we discuss how to use this approach in your streaming application.
2831
val kafkaStream = KafkaUtils.createStream(streamingContext,
2932
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
3033

31-
You can also specify the key and value classes and their corresponding decoder classes using variations of `createStream`. See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
32-
and the [example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
34+
You can also specify the key and value classes and their corresponding decoder classes using variations of `createStream`. See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$).
3335
</div>
3436
<div data-lang="java" markdown="1">
3537
import org.apache.spark.streaming.kafka.*;
@@ -38,8 +40,7 @@ Next, we discuss how to use this approach in your streaming application.
3840
KafkaUtils.createStream(streamingContext,
3941
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
4042

41-
You can also specify the key and value classes and their corresponding decoder classes using variations of `createStream`. See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
42-
and the [example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
43+
You can also specify the key and value classes and their corresponding decoder classes using variations of `createStream`. See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html).
4344

4445
</div>
4546
<div data-lang="python" markdown="1">
@@ -48,8 +49,7 @@ Next, we discuss how to use this approach in your streaming application.
4849
kafkaStream = KafkaUtils.createStream(streamingContext, \
4950
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
5051

51-
By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
52-
and the [example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/kafka_wordcount.py).
52+
By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils).
5353
</div>
5454
</div>
5555

@@ -71,7 +71,7 @@ Next, we discuss how to use this approach in your streaming application.
7171
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
7272

7373
Alternatively, you can also download the JAR of the Maven artifact `spark-streaming-kafka-0-8-assembly` from the
74-
[Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-0-8-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`.
74+
[Maven repository](https://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-0-8-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`.
7575

7676
## Approach 2: Direct Approach (No Receivers)
7777
This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this feature was introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
@@ -105,8 +105,7 @@ Next, we discuss how to use this approach in your streaming application.
105105
streamingContext, [map of Kafka parameters], [set of topics to consume])
106106

107107
You can also pass a `messageHandler` to `createDirectStream` to access `MessageAndMetadata` that contains metadata about the current message and transform it to any desired type.
108-
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
109-
and the [example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala).
108+
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$).
110109
</div>
111110
<div data-lang="java" markdown="1">
112111
import org.apache.spark.streaming.kafka.*;
@@ -117,17 +116,15 @@ Next, we discuss how to use this approach in your streaming application.
117116
[map of Kafka parameters], [set of topics to consume]);
118117

119118
You can also pass a `messageHandler` to `createDirectStream` to access `MessageAndMetadata` that contains metadata about the current message and transform it to any desired type.
120-
See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
121-
and the [example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
119+
See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html).
122120

123121
</div>
124122
<div data-lang="python" markdown="1">
125123
from pyspark.streaming.kafka import KafkaUtils
126124
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
127125

128126
You can also pass a `messageHandler` to `createDirectStream` to access `KafkaMessageAndMetadata` that contains metadata about the current message and transform it to any desired type.
129-
By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
130-
and the [example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/direct_kafka_wordcount.py).
127+
By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils).
131128
</div>
132129
</div>
133130

docs/streaming-kafka-integration.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ layout: global
33
title: Spark Streaming + Kafka Integration Guide
44
---
55

6-
[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Please read the [Kafka documentation](http://kafka.apache.org/documentation.html) thoroughly before starting an integration using Spark.
6+
[Apache Kafka](https://kafka.apache.org/) is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Please read the [Kafka documentation](https://kafka.apache.org/documentation.html) thoroughly before starting an integration using Spark.
77

8-
The Kafka project introduced a new consumer api between versions 0.8 and 0.10, so there are 2 separate corresponding Spark Streaming packages available. Please choose the correct package for your brokers and desired features; note that the 0.8 integration is compatible with later 0.9 and 0.10 brokers, but the 0.10 integration is not compatible with earlier brokers.
8+
The Kafka project introduced a new consumer API between versions 0.8 and 0.10, so there are 2 separate corresponding Spark Streaming packages available. Please choose the correct package for your brokers and desired features; note that the 0.8 integration is compatible with later 0.9 and 0.10 brokers, but the 0.10 integration is not compatible with earlier brokers.
99

10+
**Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.**
1011

1112
<table class="table">
1213
<tr><th></th><th><a href="streaming-kafka-0-8-integration.html">spark-streaming-kafka-0-8</a></th><th><a href="streaming-kafka-0-10-integration.html">spark-streaming-kafka-0-10</a></th></tr>
@@ -16,9 +17,9 @@ The Kafka project introduced a new consumer api between versions 0.8 and 0.10, s
1617
<td>0.10.0 or higher</td>
1718
</tr>
1819
<tr>
19-
<td>Api Stability</td>
20+
<td>API Maturity</td>
21+
<td>Deprecated</td>
2022
<td>Stable</td>
21-
<td>Experimental</td>
2223
</tr>
2324
<tr>
2425
<td>Language Support</td>
@@ -41,7 +42,7 @@ The Kafka project introduced a new consumer api between versions 0.8 and 0.10, s
4142
<td>Yes</td>
4243
</tr>
4344
<tr>
44-
<td>Offset Commit Api</td>
45+
<td>Offset Commit API</td>
4546
<td>No</td>
4647
<td>Yes</td>
4748
</tr>

docs/streaming-programming-guide.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,14 +401,14 @@ some of the common ones are as follows.
401401

402402
<table class="table">
403403
<tr><th>Source</th><th>Artifact</th></tr>
404-
<tr><td> Kafka </td><td> spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}} </td></tr>
404+
<tr><td> Kafka </td><td> spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}} </td></tr>
405405
<tr><td> Flume </td><td> spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} </td></tr>
406406
<tr><td> Kinesis<br/></td><td>spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} [Amazon Software License] </td></tr>
407407
<tr><td></td><td></td></tr>
408408
</table>
409409

410410
For an up-to-date list, please refer to the
411-
[Maven repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
411+
[Maven repository](https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
412412
for the full list of supported sources and artifacts.
413413

414414
***
@@ -1899,7 +1899,7 @@ To run a Spark Streaming applications, you need to have the following.
18991899
if your application uses [advanced sources](#advanced-sources) (e.g. Kafka, Flume),
19001900
then you will have to package the extra artifact they link to, along with their dependencies,
19011901
in the JAR that is used to deploy the application. For example, an application using `KafkaUtils`
1902-
will have to include `spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and all its
1902+
will have to include `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and all its
19031903
transitive dependencies in the application JAR.
19041904

19051905
- *Configuring sufficient memory for the executors* - Since the received data must be stored in

examples/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
</dependency>
8787
<dependency>
8888
<groupId>org.apache.spark</groupId>
89-
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
89+
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
9090
<version>${project.version}</version>
9191
<scope>provided</scope>
9292
</dependency>

0 commit comments

Comments
 (0)