Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 9962390

Browse files
dongjoon-hyunzsxwing
authored andcommitted
[SPARK-22781][SS] Support creating streaming dataset with ORC files
## What changes were proposed in this pull request? Like `Parquet`, users can use `ORC` with Apache Spark structured streaming. This PR adds `orc()` to `DataStreamReader`(Scala/Python) in order to support creating streaming dataset with ORC file format more easily like the other file formats. Also, this adds a test coverage for ORC data source and updates the document. **BEFORE** ```scala scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start() <console>:24: error: value orc is not a member of org.apache.spark.sql.streaming.DataStreamReader spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start() ``` **AFTER** ```scala scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start() res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper678b3746 scala> ------------------------------------------- Batch: 0 ------------------------------------------- +---+ | a| +---+ | 1| +---+ ``` ## How was this patch tested? Pass the newly added test cases. Author: Dongjoon Hyun <[email protected]> Closes apache#19975 from dongjoon-hyun/SPARK-22781.
1 parent 13268a5 commit 9962390

File tree

5 files changed

+148
-1
lines changed

5 files changed

+148
-1
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with th
493493
#### Input Sources
494494
There are a few built-in sources.
495495

496-
- **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
496+
- **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, orc, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
497497

498498
- **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details.
499499

python/pyspark/sql/streaming.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,23 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
490490
else:
491491
raise TypeError("path can be only a single string")
492492

493+
@since(2.3)
494+
def orc(self, path):
495+
"""Loads a ORC file stream, returning the result as a :class:`DataFrame`.
496+
497+
.. note:: Evolving.
498+
499+
>>> orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp())
500+
>>> orc_sdf.isStreaming
501+
True
502+
>>> orc_sdf.schema == sdf_schema
503+
True
504+
"""
505+
if isinstance(path, basestring):
506+
return self._df(self._jreader.orc(path))
507+
else:
508+
raise TypeError("path can be only a single string")
509+
493510
@since(2.0)
494511
def parquet(self, path):
495512
"""Loads a Parquet file stream, returning the result as a :class:`DataFrame`.

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,21 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
298298
*/
299299
def csv(path: String): DataFrame = format("csv").load(path)
300300

301+
/**
302+
* Loads a ORC file stream, returning the result as a `DataFrame`.
303+
*
304+
* You can set the following ORC-specific option(s) for reading ORC files:
305+
* <ul>
306+
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
307+
* considered in every trigger.</li>
308+
* </ul>
309+
*
310+
* @since 2.3.0
311+
*/
312+
def orc(path: String): DataFrame = {
313+
format("orc").load(path)
314+
}
315+
301316
/**
302317
* Loads a Parquet file stream, returning the result as a `DataFrame`.
303318
*

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,10 @@ class FileStreamSinkSuite extends StreamTest {
305305
testFormat(Some("parquet"))
306306
}
307307

308+
test("orc") {
309+
testFormat(Some("orc"))
310+
}
311+
308312
test("text") {
309313
testFormat(Some("text"))
310314
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,28 @@ abstract class FileStreamSourceTest
8787
}
8888
}
8989

90+
case class AddOrcFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
91+
override def addData(source: FileStreamSource): Unit = {
92+
AddOrcFileData.writeToFile(data, src, tmp)
93+
}
94+
}
95+
96+
object AddOrcFileData {
97+
def apply(seq: Seq[String], src: File, tmp: File): AddOrcFileData = {
98+
AddOrcFileData(seq.toDS().toDF(), src, tmp)
99+
}
100+
101+
/** Write orc files in a temp dir, and move the individual files to the 'src' dir */
102+
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
103+
val tmpDir = Utils.tempFileWith(new File(tmp, "orc"))
104+
df.write.orc(tmpDir.getCanonicalPath)
105+
src.mkdirs()
106+
tmpDir.listFiles().foreach { f =>
107+
f.renameTo(new File(src, s"${f.getName}"))
108+
}
109+
}
110+
}
111+
90112
case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
91113
override def addData(source: FileStreamSource): Unit = {
92114
AddParquetFileData.writeToFile(data, src, tmp)
@@ -249,6 +271,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
249271
}
250272
}
251273

274+
// =============== ORC file stream schema tests ================
275+
276+
test("FileStreamSource schema: orc, existing files, no schema") {
277+
withTempDir { src =>
278+
Seq("a", "b", "c").toDS().as("userColumn").toDF().write
279+
.mode(org.apache.spark.sql.SaveMode.Overwrite)
280+
.orc(src.getCanonicalPath)
281+
282+
// Without schema inference, should throw error
283+
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") {
284+
intercept[IllegalArgumentException] {
285+
createFileStreamSourceAndGetSchema(
286+
format = Some("orc"), path = Some(src.getCanonicalPath), schema = None)
287+
}
288+
}
289+
290+
// With schema inference, should infer correct schema
291+
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
292+
val schema = createFileStreamSourceAndGetSchema(
293+
format = Some("orc"), path = Some(src.getCanonicalPath), schema = None)
294+
assert(schema === new StructType().add("value", StringType))
295+
}
296+
}
297+
}
298+
299+
test("FileStreamSource schema: orc, existing files, schema") {
300+
withTempPath { src =>
301+
Seq("a", "b", "c").toDS().as("oldUserColumn").toDF()
302+
.write.orc(new File(src, "1").getCanonicalPath)
303+
val userSchema = new StructType().add("userColumn", StringType)
304+
val schema = createFileStreamSourceAndGetSchema(
305+
format = Some("orc"), path = Some(src.getCanonicalPath), schema = Some(userSchema))
306+
assert(schema === userSchema)
307+
}
308+
}
309+
252310
// =============== Parquet file stream schema tests ================
253311

254312
test("FileStreamSource schema: parquet, existing files, no schema") {
@@ -508,6 +566,59 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
508566
}
509567
}
510568

569+
// =============== ORC file stream tests ================
570+
571+
test("read from orc files") {
572+
withTempDirs { case (src, tmp) =>
573+
val fileStream = createFileStream("orc", src.getCanonicalPath, Some(valueSchema))
574+
val filtered = fileStream.filter($"value" contains "keep")
575+
576+
testStream(filtered)(
577+
AddOrcFileData(Seq("drop1", "keep2", "keep3"), src, tmp),
578+
CheckAnswer("keep2", "keep3"),
579+
StopStream,
580+
AddOrcFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
581+
StartStream(),
582+
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
583+
AddOrcFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
584+
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
585+
)
586+
}
587+
}
588+
589+
test("read from orc files with changing schema") {
590+
withTempDirs { case (src, tmp) =>
591+
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
592+
593+
// Add a file so that we can infer its schema
594+
AddOrcFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
595+
596+
val fileStream = createFileStream("orc", src.getCanonicalPath)
597+
598+
// FileStreamSource should infer the column "k"
599+
assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
600+
601+
// After creating DF and before starting stream, add data with different schema
602+
// Should not affect the inferred schema any more
603+
AddOrcFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
604+
605+
testStream(fileStream)(
606+
// Should not pick up column v in the file added before start
607+
AddOrcFileData(Seq("value2").toDF("k"), src, tmp),
608+
CheckAnswer("value0", "value1", "value2"),
609+
610+
// Should read data in column k, and ignore v
611+
AddOrcFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
612+
CheckAnswer("value0", "value1", "value2", "value3"),
613+
614+
// Should ignore rows that do not have the necessary k column
615+
AddOrcFileData(Seq("value5").toDF("v"), src, tmp),
616+
CheckAnswer("value0", "value1", "value2", "value3", null)
617+
)
618+
}
619+
}
620+
}
621+
511622
// =============== Parquet file stream tests ================
512623

513624
test("read from parquet files") {

0 commit comments

Comments
 (0)