Skip to content

Commit 5b5a36e

Browse files
jose-torrescloud-fan
authored andcommitted
Roll forward "[SPARK-23096][SS] Migrate rate source to V2"
## What changes were proposed in this pull request? Roll forward c68ec4e (apache#20688). There are two minor test changes required: * An error which used to be TreeNodeException[ArithmeticException] is no longer wrapped and is now just ArithmeticException. * The test framework simply does not set the active Spark session. (Or rather, it doesn't do so early enough - I think it only happens when a query is analyzed.) I've added the required logic to SQLTestUtils. ## How was this patch tested? existing tests Author: Jose Torres <[email protected]> Author: jerryshao <[email protected]> Closes apache#20922 from jose-torres/ratefix.
1 parent b02e76c commit 5b5a36e

File tree

9 files changed

+524
-663
lines changed

9 files changed

+524
-663
lines changed

sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,5 @@ org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
55
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
66
org.apache.spark.sql.execution.datasources.text.TextFileFormat
77
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
8-
org.apache.spark.sql.execution.streaming.RateSourceProvider
8+
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
99
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
10-
org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
4141
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
4242
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
4343
import org.apache.spark.sql.execution.streaming._
44-
import org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
44+
import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
4545
import org.apache.spark.sql.internal.SQLConf
4646
import org.apache.spark.sql.sources._
4747
import org.apache.spark.sql.streaming.OutputMode
@@ -566,6 +566,7 @@ object DataSource extends Logging {
566566
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
567567
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
568568
val socket = classOf[TextSocketSourceProvider].getCanonicalName
569+
val rate = classOf[RateStreamProvider].getCanonicalName
569570

570571
Map(
571572
"org.apache.spark.sql.jdbc" -> jdbc,
@@ -587,7 +588,8 @@ object DataSource extends Logging {
587588
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
588589
"org.apache.spark.ml.source.libsvm" -> libsvm,
589590
"com.databricks.spark.csv" -> csv,
590-
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket
591+
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
592+
"org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
591593
)
592594
}
593595

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala

Lines changed: 0 additions & 262 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import org.json4s.jackson.Serialization
2424

2525
import org.apache.spark.sql.Row
2626
import org.apache.spark.sql.catalyst.util.DateTimeUtils
27-
import org.apache.spark.sql.execution.streaming.{RateSourceProvider, RateStreamOffset, ValueRunTimeMsPair}
28-
import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
27+
import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
28+
import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
2929
import org.apache.spark.sql.sources.v2.DataSourceOptions
3030
import org.apache.spark.sql.sources.v2.reader._
3131
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
@@ -40,8 +40,8 @@ class RateStreamContinuousReader(options: DataSourceOptions)
4040

4141
val creationTime = System.currentTimeMillis()
4242

43-
val numPartitions = options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt
44-
val rowsPerSecond = options.get(RateStreamSourceV2.ROWS_PER_SECOND).orElse("6").toLong
43+
val numPartitions = options.get(RateStreamProvider.NUM_PARTITIONS).orElse("5").toInt
44+
val rowsPerSecond = options.get(RateStreamProvider.ROWS_PER_SECOND).orElse("6").toLong
4545
val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
4646

4747
override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
@@ -57,12 +57,12 @@ class RateStreamContinuousReader(options: DataSourceOptions)
5757
RateStreamOffset(Serialization.read[Map[Int, ValueRunTimeMsPair]](json))
5858
}
5959

60-
override def readSchema(): StructType = RateSourceProvider.SCHEMA
60+
override def readSchema(): StructType = RateStreamProvider.SCHEMA
6161

6262
private var offset: Offset = _
6363

6464
override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
65-
this.offset = offset.orElse(RateStreamSourceV2.createInitialOffset(numPartitions, creationTime))
65+
this.offset = offset.orElse(createInitialOffset(numPartitions, creationTime))
6666
}
6767

6868
override def getStartOffset(): Offset = offset
@@ -98,6 +98,19 @@ class RateStreamContinuousReader(options: DataSourceOptions)
9898
override def commit(end: Offset): Unit = {}
9999
override def stop(): Unit = {}
100100

101+
private def createInitialOffset(numPartitions: Int, creationTimeMs: Long) = {
102+
RateStreamOffset(
103+
Range(0, numPartitions).map { i =>
104+
// Note that the starting offset is exclusive, so we have to decrement the starting value
105+
// by the increment that will later be applied. The first row output in each
106+
// partition will have a value equal to the partition index.
107+
(i,
108+
ValueRunTimeMsPair(
109+
(i - numPartitions).toLong,
110+
creationTimeMs))
111+
}.toMap)
112+
}
113+
101114
}
102115

103116
case class RateStreamContinuousDataReaderFactory(

0 commit comments

Comments
 (0)