Skip to content

Commit f510761

Browse files
gengliangwangcloud-fan
authored andcommitted
[SPARK-28089][SQL] File source v2: support reading output of file streaming Sink
## What changes were proposed in this pull request? File source V1 supports reading output of FileStreamSink as batch. apache#11897 We should support this in file source V2 as well. When reading with paths, we first check if there is metadata log of FileStreamSink. If yes, we use `MetadataLogFileIndex` for listing files; Otherwise, we use `InMemoryFileIndex`. ## How was this patch tested? Unit test Closes apache#24900 from gengliangwang/FileStreamV2. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent b276788 commit f510761

File tree

5 files changed

+223
-161
lines changed

5 files changed

+223
-161
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ import java.util
2020

2121
import scala.collection.JavaConverters._
2222

23-
import org.apache.hadoop.fs.FileStatus
23+
import org.apache.hadoop.fs.{FileStatus, Path}
2424

2525
import org.apache.spark.sql.{AnalysisException, SparkSession}
2626
import org.apache.spark.sql.catalog.v2.expressions.Transform
2727
import org.apache.spark.sql.execution.datasources._
28+
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
2829
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability}
2930
import org.apache.spark.sql.sources.v2.TableCapability._
3031
import org.apache.spark.sql.types.{DataType, StructType}
@@ -44,23 +45,37 @@ abstract class FileTable(
4445
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
4546
// Hadoop Configurations are case sensitive.
4647
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
47-
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
48-
checkEmptyGlobPath = true, checkFilesExist = true)
49-
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
50-
new InMemoryFileIndex(
51-
sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache)
48+
if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
49+
// We are reading from the results of a streaming query. We will load files from
50+
// the metadata log instead of listing them using HDFS APIs.
51+
new MetadataLogFileIndex(sparkSession, new Path(paths.head),
52+
options.asScala.toMap, userSpecifiedSchema)
53+
} else {
54+
// This is a non-streaming file based datasource.
55+
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
56+
checkEmptyGlobPath = true, checkFilesExist = true)
57+
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
58+
new InMemoryFileIndex(
59+
sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache)
60+
}
5261
}
5362

54-
lazy val dataSchema: StructType = userSpecifiedSchema.map { schema =>
55-
val partitionSchema = fileIndex.partitionSchema
56-
val resolver = sparkSession.sessionState.conf.resolver
57-
StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name))))
58-
}.orElse {
59-
inferSchema(fileIndex.allFiles())
60-
}.getOrElse {
61-
throw new AnalysisException(
62-
s"Unable to infer schema for $formatName. It must be specified manually.")
63-
}.asNullable
63+
lazy val dataSchema: StructType = {
64+
val schema = userSpecifiedSchema.map { schema =>
65+
val partitionSchema = fileIndex.partitionSchema
66+
val resolver = sparkSession.sessionState.conf.resolver
67+
StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name))))
68+
}.orElse {
69+
inferSchema(fileIndex.allFiles())
70+
}.getOrElse {
71+
throw new AnalysisException(
72+
s"Unable to infer schema for $formatName. It must be specified manually.")
73+
}
74+
fileIndex match {
75+
case _: MetadataLogFileIndex => schema
76+
case _ => schema.asNullable
77+
}
78+
}
6479

6580
override lazy val schema: StructType = {
6681
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 120 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,19 @@ import scala.collection.JavaConverters._
2525

2626
import org.apache.hadoop.fs.Path
2727

28+
import org.apache.spark.SparkConf
2829
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
2930
import org.apache.spark.sql.{AnalysisException, DataFrame}
3031
import org.apache.spark.sql.execution.DataSourceScanExec
3132
import org.apache.spark.sql.execution.datasources._
33+
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable}
3234
import org.apache.spark.sql.execution.streaming._
3335
import org.apache.spark.sql.functions._
3436
import org.apache.spark.sql.internal.SQLConf
3537
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
3638
import org.apache.spark.util.Utils
3739

38-
class FileStreamSinkSuite extends StreamTest {
40+
abstract class FileStreamSinkSuite extends StreamTest {
3941
import testImplicits._
4042

4143
override def beforeAll(): Unit = {
@@ -51,6 +53,8 @@ class FileStreamSinkSuite extends StreamTest {
5153
}
5254
}
5355

56+
protected def checkQueryExecution(df: DataFrame): Unit
57+
5458
test("unpartitioned writing and batch reading") {
5559
val inputData = MemoryStream[Int]
5660
val df = inputData.toDF()
@@ -121,78 +125,36 @@ class FileStreamSinkSuite extends StreamTest {
121125

122126
var query: StreamingQuery = null
123127

124-
// TODO: test file source V2 as well.
125-
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") {
126-
try {
127-
query =
128-
ds.map(i => (i, i * 1000))
129-
.toDF("id", "value")
130-
.writeStream
131-
.partitionBy("id")
132-
.option("checkpointLocation", checkpointDir)
133-
.format("parquet")
134-
.start(outputDir)
135-
136-
inputData.addData(1, 2, 3)
137-
failAfter(streamingTimeout) {
138-
query.processAllAvailable()
139-
}
128+
try {
129+
query =
130+
ds.map(i => (i, i * 1000))
131+
.toDF("id", "value")
132+
.writeStream
133+
.partitionBy("id")
134+
.option("checkpointLocation", checkpointDir)
135+
.format("parquet")
136+
.start(outputDir)
140137

141-
val outputDf = spark.read.parquet(outputDir)
142-
val expectedSchema = new StructType()
143-
.add(StructField("value", IntegerType, nullable = false))
144-
.add(StructField("id", IntegerType))
145-
assert(outputDf.schema === expectedSchema)
146-
147-
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
148-
// been inferred
149-
val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect {
150-
case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation
151-
}
152-
assert(hadoopdFsRelations.size === 1)
153-
assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
154-
assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
155-
assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))
156-
157-
// Verify the data is correctly read
158-
checkDatasetUnorderly(
159-
outputDf.as[(Int, Int)],
160-
(1000, 1), (2000, 2), (3000, 3))
161-
162-
/** Check some condition on the partitions of the FileScanRDD generated by a DF */
163-
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
164-
val getFileScanRDD = df.queryExecution.executedPlan.collect {
165-
case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] =>
166-
scan.inputRDDs().head.asInstanceOf[FileScanRDD]
167-
}.headOption.getOrElse {
168-
fail(s"No FileScan in query\n${df.queryExecution}")
169-
}
170-
func(getFileScanRDD.filePartitions)
171-
}
138+
inputData.addData(1, 2, 3)
139+
failAfter(streamingTimeout) {
140+
query.processAllAvailable()
141+
}
172142

173-
// Read without pruning
174-
checkFileScanPartitions(outputDf) { partitions =>
175-
// There should be as many distinct partition values as there are distinct ids
176-
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
177-
}
143+
val outputDf = spark.read.parquet(outputDir)
144+
val expectedSchema = new StructType()
145+
.add(StructField("value", IntegerType, nullable = false))
146+
.add(StructField("id", IntegerType))
147+
assert(outputDf.schema === expectedSchema)
178148

179-
// Read with pruning, should read only files in partition dir id=1
180-
checkFileScanPartitions(outputDf.filter("id = 1")) { partitions =>
181-
val filesToBeRead = partitions.flatMap(_.files)
182-
assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
183-
assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
184-
}
149+
// Verify the data is correctly read
150+
checkDatasetUnorderly(
151+
outputDf.as[(Int, Int)],
152+
(1000, 1), (2000, 2), (3000, 3))
185153

186-
// Read with pruning, should read only files in partition dir id=1 and id=2
187-
checkFileScanPartitions(outputDf.filter("id in (1,2)")) { partitions =>
188-
val filesToBeRead = partitions.flatMap(_.files)
189-
assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
190-
assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
191-
}
192-
} finally {
193-
if (query != null) {
194-
query.stop()
195-
}
154+
checkQueryExecution(outputDf)
155+
} finally {
156+
if (query != null) {
157+
query.stop()
196158
}
197159
}
198160
}
@@ -512,3 +474,92 @@ class FileStreamSinkSuite extends StreamTest {
512474
}
513475
}
514476
}
477+
478+
class FileStreamSinkV1Suite extends FileStreamSinkSuite {
479+
override protected def sparkConf: SparkConf =
480+
super
481+
.sparkConf
482+
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "csv,json,orc,text,parquet")
483+
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "csv,json,orc,text,parquet")
484+
485+
override def checkQueryExecution(df: DataFrame): Unit = {
486+
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
487+
// been inferred
488+
val hadoopdFsRelations = df.queryExecution.analyzed.collect {
489+
case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation
490+
}
491+
assert(hadoopdFsRelations.size === 1)
492+
assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
493+
assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
494+
assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))
495+
496+
/** Check some condition on the partitions of the FileScanRDD generated by a DF */
497+
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
498+
val getFileScanRDD = df.queryExecution.executedPlan.collect {
499+
case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] =>
500+
scan.inputRDDs().head.asInstanceOf[FileScanRDD]
501+
}.headOption.getOrElse {
502+
fail(s"No FileScan in query\n${df.queryExecution}")
503+
}
504+
func(getFileScanRDD.filePartitions)
505+
}
506+
507+
// Read without pruning
508+
checkFileScanPartitions(df) { partitions =>
509+
// There should be as many distinct partition values as there are distinct ids
510+
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
511+
}
512+
513+
// Read with pruning, should read only files in partition dir id=1
514+
checkFileScanPartitions(df.filter("id = 1")) { partitions =>
515+
val filesToBeRead = partitions.flatMap(_.files)
516+
assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
517+
assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
518+
}
519+
520+
// Read with pruning, should read only files in partition dir id=1 and id=2
521+
checkFileScanPartitions(df.filter("id in (1,2)")) { partitions =>
522+
val filesToBeRead = partitions.flatMap(_.files)
523+
assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
524+
assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
525+
}
526+
}
527+
}
528+
529+
class FileStreamSinkV2Suite extends FileStreamSinkSuite {
530+
override protected def sparkConf: SparkConf =
531+
super
532+
.sparkConf
533+
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "")
534+
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "")
535+
536+
override def checkQueryExecution(df: DataFrame): Unit = {
537+
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
538+
// been inferred
539+
val table = df.queryExecution.analyzed.collect {
540+
case DataSourceV2Relation(table: FileTable, _, _) => table
541+
}
542+
assert(table.size === 1)
543+
assert(table.head.fileIndex.isInstanceOf[MetadataLogFileIndex])
544+
assert(table.head.fileIndex.partitionSchema.exists(_.name == "id"))
545+
assert(table.head.dataSchema.exists(_.name == "value"))
546+
547+
/** Check some condition on the partitions of the FileScanRDD generated by a DF */
548+
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
549+
val fileScan = df.queryExecution.executedPlan.collect {
550+
case batch: BatchScanExec if batch.scan.isInstanceOf[FileScan] =>
551+
batch.scan.asInstanceOf[FileScan]
552+
}.headOption.getOrElse {
553+
fail(s"No FileScan in query\n${df.queryExecution}")
554+
}
555+
func(fileScan.planInputPartitions().map(_.asInstanceOf[FilePartition]))
556+
}
557+
558+
// Read without pruning
559+
checkFileScanPartitions(df) { partitions =>
560+
// There should be as many distinct partition values as there are distinct ids
561+
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
562+
}
563+
// TODO: test partition pruning when file source V2 supports it.
564+
}
565+
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -218,11 +218,12 @@ class StreamSuite extends StreamTest {
218218
}
219219
}
220220

221-
// TODO: fix file source V2 as well.
222-
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") {
223-
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
224-
assertDF(df)
225-
assertDF(df)
221+
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
222+
Seq("", "parquet").foreach { useV1SourceReader =>
223+
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReader) {
224+
assertDF(df)
225+
assertDF(df)
226+
}
226227
}
227228
}
228229

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -197,32 +197,30 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
197197
}
198198

199199
test("deduplicate with file sink") {
200-
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") {
201-
withTempDir { output =>
202-
withTempDir { checkpointDir =>
203-
val outputPath = output.getAbsolutePath
204-
val inputData = MemoryStream[String]
205-
val result = inputData.toDS().dropDuplicates()
206-
val q = result.writeStream
207-
.format("parquet")
208-
.outputMode(Append)
209-
.option("checkpointLocation", checkpointDir.getPath)
210-
.start(outputPath)
211-
try {
212-
inputData.addData("a")
213-
q.processAllAvailable()
214-
checkDataset(spark.read.parquet(outputPath).as[String], "a")
215-
216-
inputData.addData("a") // Dropped
217-
q.processAllAvailable()
218-
checkDataset(spark.read.parquet(outputPath).as[String], "a")
219-
220-
inputData.addData("b")
221-
q.processAllAvailable()
222-
checkDataset(spark.read.parquet(outputPath).as[String], "a", "b")
223-
} finally {
224-
q.stop()
225-
}
200+
withTempDir { output =>
201+
withTempDir { checkpointDir =>
202+
val outputPath = output.getAbsolutePath
203+
val inputData = MemoryStream[String]
204+
val result = inputData.toDS().dropDuplicates()
205+
val q = result.writeStream
206+
.format("parquet")
207+
.outputMode(Append)
208+
.option("checkpointLocation", checkpointDir.getPath)
209+
.start(outputPath)
210+
try {
211+
inputData.addData("a")
212+
q.processAllAvailable()
213+
checkDataset(spark.read.parquet(outputPath).as[String], "a")
214+
215+
inputData.addData("a") // Dropped
216+
q.processAllAvailable()
217+
checkDataset(spark.read.parquet(outputPath).as[String], "a")
218+
219+
inputData.addData("b")
220+
q.processAllAvailable()
221+
checkDataset(spark.read.parquet(outputPath).as[String], "a", "b")
222+
} finally {
223+
q.stop()
226224
}
227225
}
228226
}

0 commit comments

Comments
 (0)