@@ -24,8 +24,8 @@ import org.json4s.jackson.Serialization
24
24
25
25
import org .apache .spark .sql .Row
26
26
import org .apache .spark .sql .catalyst .util .DateTimeUtils
27
- import org .apache .spark .sql .execution .streaming .{RateSourceProvider , RateStreamOffset , ValueRunTimeMsPair }
28
- import org .apache .spark .sql .execution .streaming .sources .RateStreamSourceV2
27
+ import org .apache .spark .sql .execution .streaming .{RateStreamOffset , ValueRunTimeMsPair }
28
+ import org .apache .spark .sql .execution .streaming .sources .RateStreamProvider
29
29
import org .apache .spark .sql .sources .v2 .DataSourceOptions
30
30
import org .apache .spark .sql .sources .v2 .reader ._
31
31
import org .apache .spark .sql .sources .v2 .reader .streaming .{ContinuousDataReader , ContinuousReader , Offset , PartitionOffset }
@@ -40,8 +40,8 @@ class RateStreamContinuousReader(options: DataSourceOptions)
40
40
41
41
val creationTime = System .currentTimeMillis()
42
42
43
- val numPartitions = options.get(RateStreamSourceV2 .NUM_PARTITIONS ).orElse(" 5" ).toInt
44
- val rowsPerSecond = options.get(RateStreamSourceV2 .ROWS_PER_SECOND ).orElse(" 6" ).toLong
43
+ val numPartitions = options.get(RateStreamProvider .NUM_PARTITIONS ).orElse(" 5" ).toInt
44
+ val rowsPerSecond = options.get(RateStreamProvider .ROWS_PER_SECOND ).orElse(" 6" ).toLong
45
45
val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
46
46
47
47
override def mergeOffsets (offsets : Array [PartitionOffset ]): Offset = {
@@ -57,12 +57,12 @@ class RateStreamContinuousReader(options: DataSourceOptions)
57
57
RateStreamOffset (Serialization .read[Map [Int , ValueRunTimeMsPair ]](json))
58
58
}
59
59
60
- override def readSchema (): StructType = RateSourceProvider .SCHEMA
60
+ override def readSchema (): StructType = RateStreamProvider .SCHEMA
61
61
62
62
private var offset : Offset = _
63
63
64
64
override def setStartOffset (offset : java.util.Optional [Offset ]): Unit = {
65
- this .offset = offset.orElse(RateStreamSourceV2 . createInitialOffset(numPartitions, creationTime))
65
+ this .offset = offset.orElse(createInitialOffset(numPartitions, creationTime))
66
66
}
67
67
68
68
override def getStartOffset (): Offset = offset
@@ -98,6 +98,19 @@ class RateStreamContinuousReader(options: DataSourceOptions)
98
98
override def commit (end : Offset ): Unit = {}
99
99
override def stop (): Unit = {}
100
100
101
+ private def createInitialOffset (numPartitions : Int , creationTimeMs : Long ) = {
102
+ RateStreamOffset (
103
+ Range (0 , numPartitions).map { i =>
104
+ // Note that the starting offset is exclusive, so we have to decrement the starting value
105
+ // by the increment that will later be applied. The first row output in each
106
+ // partition will have a value equal to the partition index.
107
+ (i,
108
+ ValueRunTimeMsPair (
109
+ (i - numPartitions).toLong,
110
+ creationTimeMs))
111
+ }.toMap)
112
+ }
113
+
101
114
}
102
115
103
116
case class RateStreamContinuousDataReaderFactory (
0 commit comments