Skip to content

Commit 2119e51

Browse files
zsxwingcloud-fan
authored andcommitted
[SPARK-25336][SS]Revert SPARK-24863 and SPARK-24748
## What changes were proposed in this pull request? Revert SPARK-24863 (apache#21819) and SPARK-24748 (apache#21721) as per discussion in apache#21721. We will revisit them when the data source v2 APIs are out. ## How was this patch tested? Jenkins Closes apache#22334 from zsxwing/revert-SPARK-24863-SPARK-24748. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent ca861fe commit 2119e51

File tree

11 files changed

+22
-379
lines changed

11 files changed

+22
-379
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,6 @@ import org.json4s.jackson.Serialization
2929
*/
3030
private object JsonUtils {
3131
private implicit val formats = Serialization.formats(NoTypeHints)
32-
implicit val ordering = new Ordering[TopicPartition] {
33-
override def compare(x: TopicPartition, y: TopicPartition): Int = {
34-
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
35-
}
36-
}
3732

3833
/**
3934
* Read TopicPartitions from json string
@@ -56,7 +51,7 @@ private object JsonUtils {
5651
* Write TopicPartitions as json string
5752
*/
5853
def partitions(partitions: Iterable[TopicPartition]): String = {
59-
val result = HashMap.empty[String, List[Int]]
54+
val result = new HashMap[String, List[Int]]
6055
partitions.foreach { tp =>
6156
val parts: List[Int] = result.getOrElse(tp.topic, Nil)
6257
result += tp.topic -> (tp.partition::parts)
@@ -85,31 +80,19 @@ private object JsonUtils {
8580
* Write per-TopicPartition offsets as json string
8681
*/
8782
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
88-
val result = HashMap.empty[String, HashMap[Int, Long]]
83+
val result = new HashMap[String, HashMap[Int, Long]]()
84+
implicit val ordering = new Ordering[TopicPartition] {
85+
override def compare(x: TopicPartition, y: TopicPartition): Int = {
86+
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
87+
}
88+
}
8989
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
9090
partitions.foreach { tp =>
9191
val off = partitionOffsets(tp)
92-
val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
92+
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
9393
parts += tp.partition -> off
9494
result += tp.topic -> parts
9595
}
9696
Serialization.write(result)
9797
}
98-
99-
/**
100-
* Write per-topic partition lag as json string
101-
*/
102-
def partitionLags(
103-
latestOffsets: Map[TopicPartition, Long],
104-
processedOffsets: Map[TopicPartition, Long]): String = {
105-
val result = HashMap.empty[String, HashMap[Int, Long]]
106-
val partitions = latestOffsets.keySet.toSeq.sorted
107-
partitions.foreach { tp =>
108-
val lag = latestOffsets(tp) - processedOffsets.getOrElse(tp, 0L)
109-
val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
110-
parts += tp.partition -> lag
111-
result += tp.topic -> parts
112-
}
113-
Serialization.write(Map("lag" -> result))
114-
}
11598
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.io._
2222
import java.nio.charset.StandardCharsets
2323

2424
import org.apache.commons.io.IOUtils
25-
import org.apache.kafka.common.TopicPartition
2625

2726
import org.apache.spark.SparkEnv
2827
import org.apache.spark.internal.Logging
@@ -33,9 +32,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
3332
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
3433
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport
3534
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
36-
import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions}
35+
import org.apache.spark.sql.sources.v2.DataSourceOptions
3736
import org.apache.spark.sql.sources.v2.reader._
38-
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset, SupportsCustomReaderMetrics}
37+
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
3938
import org.apache.spark.sql.types.StructType
4039
import org.apache.spark.util.UninterruptibleThread
4140

@@ -61,8 +60,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
6160
options: DataSourceOptions,
6261
metadataPath: String,
6362
startingOffsets: KafkaOffsetRangeLimit,
64-
failOnDataLoss: Boolean)
65-
extends RateControlMicroBatchReadSupport with SupportsCustomReaderMetrics with Logging {
63+
failOnDataLoss: Boolean) extends RateControlMicroBatchReadSupport with Logging {
6664

6765
private val pollTimeoutMs = options.getLong(
6866
"kafkaConsumer.pollTimeoutMs",
@@ -156,13 +154,6 @@ private[kafka010] class KafkaMicroBatchReadSupport(
156154
KafkaMicroBatchReaderFactory
157155
}
158156

159-
// TODO: figure out the life cycle of custom metrics, and make this method take `ScanConfig` as
160-
// a parameter.
161-
override def getCustomMetrics(): CustomMetrics = {
162-
KafkaCustomMetrics(
163-
kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets.partitionToOffsets)
164-
}
165-
166157
override def deserializeOffset(json: String): Offset = {
167158
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
168159
}
@@ -384,18 +375,3 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
384375
}
385376
}
386377
}
387-
388-
/**
389-
* Currently reports per topic-partition lag.
390-
* This is the difference between the offset of the latest available data
391-
* in a topic-partition and the latest offset that has been processed.
392-
*/
393-
private[kafka010] case class KafkaCustomMetrics(
394-
latestOffsets: Map[TopicPartition, Long],
395-
processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics {
396-
override def json(): String = {
397-
JsonUtils.partitionLags(latestOffsets, processedOffsets)
398-
}
399-
400-
override def toString: String = json()
401-
}

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ import scala.util.Random
3030

3131
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
3232
import org.apache.kafka.common.TopicPartition
33-
import org.json4s.DefaultFormats
34-
import org.json4s.jackson.JsonMethods._
3533
import org.scalatest.concurrent.PatienceConfiguration.Timeout
3634
import org.scalatest.time.SpanSugar._
3735

@@ -958,41 +956,6 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
958956
intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) }
959957
}
960958

961-
test("custom lag metrics") {
962-
import testImplicits._
963-
val topic = newTopic()
964-
testUtils.createTopic(topic, partitions = 2)
965-
testUtils.sendMessages(topic, (1 to 100).map(_.toString).toArray)
966-
require(testUtils.getLatestOffsets(Set(topic)).size === 2)
967-
968-
val kafka = spark
969-
.readStream
970-
.format("kafka")
971-
.option("subscribe", topic)
972-
.option("startingOffsets", s"earliest")
973-
.option("maxOffsetsPerTrigger", 10)
974-
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
975-
.load()
976-
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
977-
.as[(String, String)]
978-
979-
implicit val formats = DefaultFormats
980-
981-
val mapped = kafka.map(kv => kv._2.toInt + 1)
982-
testStream(mapped)(
983-
StartStream(trigger = OneTimeTrigger),
984-
AssertOnQuery { query =>
985-
query.awaitTermination()
986-
val source = query.lastProgress.sources(0)
987-
// masOffsetsPerTrigger is 10, and there are two partitions containing 50 events each
988-
// so 5 events should be processed from each partition and a lag of 45 events
989-
val custom = parse(source.customMetrics)
990-
.extract[Map[String, Map[String, Map[String, Long]]]]
991-
custom("lag")(topic)("0") == 45 && custom("lag")(topic)("1") == 45
992-
}
993-
)
994-
}
995-
996959
}
997960

998961
abstract class KafkaSourceSuiteBase extends KafkaSourceTest {

sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java

Lines changed: 0 additions & 47 deletions
This file was deleted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java

Lines changed: 0 additions & 47 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,14 @@ import java.util.{Date, UUID}
2222

2323
import scala.collection.JavaConverters._
2424
import scala.collection.mutable
25-
import scala.util.control.NonFatal
26-
27-
import org.json4s.jackson.JsonMethods.parse
2825

2926
import org.apache.spark.internal.Logging
3027
import org.apache.spark.sql.SparkSession
3128
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
3229
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3330
import org.apache.spark.sql.execution.QueryExecution
34-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
35-
import org.apache.spark.sql.execution.streaming.sources.MicroBatchWritSupport
36-
import org.apache.spark.sql.sources.v2.CustomMetrics
37-
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, SupportsCustomReaderMetrics}
38-
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteSupport, SupportsCustomWriterMetrics}
31+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
32+
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport
3933
import org.apache.spark.sql.streaming._
4034
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
4135
import org.apache.spark.util.Clock
@@ -162,51 +156,19 @@ trait ProgressReporter extends Logging {
162156
}
163157
logDebug(s"Execution stats: $executionStats")
164158

165-
// extracts and validates custom metrics from readers and writers
166-
def extractMetrics(
167-
getMetrics: () => Option[CustomMetrics],
168-
onInvalidMetrics: (Exception) => Unit): Option[String] = {
169-
try {
170-
getMetrics().map(m => {
171-
val json = m.json()
172-
parse(json)
173-
json
174-
})
175-
} catch {
176-
case ex: Exception if NonFatal(ex) =>
177-
onInvalidMetrics(ex)
178-
None
179-
}
180-
}
181-
182159
val sourceProgress = sources.distinct.map { source =>
183-
val customReaderMetrics = source match {
184-
case s: SupportsCustomReaderMetrics =>
185-
extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)
186-
187-
case _ => None
188-
}
189-
190160
val numRecords = executionStats.inputRows.getOrElse(source, 0L)
191161
new SourceProgress(
192162
description = source.toString,
193163
startOffset = currentTriggerStartOffsets.get(source).orNull,
194164
endOffset = currentTriggerEndOffsets.get(source).orNull,
195165
numInputRows = numRecords,
196166
inputRowsPerSecond = numRecords / inputTimeSec,
197-
processedRowsPerSecond = numRecords / processingTimeSec,
198-
customReaderMetrics.orNull
167+
processedRowsPerSecond = numRecords / processingTimeSec
199168
)
200169
}
201170

202-
val customWriterMetrics = extractWriteSupport() match {
203-
case Some(s: SupportsCustomWriterMetrics) =>
204-
extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)
205-
206-
case _ => None
207-
}
208-
209-
val sinkProgress = new SinkProgress(sink.toString, customWriterMetrics.orNull)
171+
val sinkProgress = new SinkProgress(sink.toString)
210172

211173
val newProgress = new StreamingQueryProgress(
212174
id = id,
@@ -235,18 +197,6 @@ trait ProgressReporter extends Logging {
235197
currentStatus = currentStatus.copy(isTriggerActive = false)
236198
}
237199

238-
/** Extract writer from the executed query plan. */
239-
private def extractWriteSupport(): Option[StreamingWriteSupport] = {
240-
if (lastExecution == null) return None
241-
lastExecution.executedPlan.collect {
242-
case p if p.isInstanceOf[WriteToDataSourceV2Exec] =>
243-
p.asInstanceOf[WriteToDataSourceV2Exec].writeSupport
244-
}.headOption match {
245-
case Some(w: MicroBatchWritSupport) => Some(w.writeSupport)
246-
case _ => None
247-
}
248-
}
249-
250200
/** Extract statistics about stateful operators from the executed query plan. */
251201
private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
252202
if (lastExecution == null) return Nil

0 commit comments

Comments
 (0)