Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 6ef7a5b

Browse files
committed
[SPARK-21167][SS] Decode the path generated by File sink to handle special characters
## What changes were proposed in this pull request? Decode the path generated by File sink to handle special characters. ## How was this patch tested? The added unit test. Author: Shixiong Zhu <[email protected]> Closes apache#18381 from zsxwing/SPARK-21167. (cherry picked from commit d66b143) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 198e3a0 commit 6ef7a5b

File tree

2 files changed

+33
-1
lines changed

2 files changed

+33
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20+
import java.net.URI
21+
2022
import org.apache.hadoop.fs.{FileStatus, Path}
2123
import org.json4s.NoTypeHints
2224
import org.json4s.jackson.Serialization
@@ -47,7 +49,8 @@ case class SinkFileStatus(
4749
action: String) {
4850

4951
def toFileStatus: FileStatus = {
50-
new FileStatus(size, isDir, blockReplication, blockSize, modificationTime, new Path(path))
52+
new FileStatus(
53+
size, isDir, blockReplication, blockSize, modificationTime, new Path(new URI(path)))
5154
}
5255
}
5356

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,35 @@ class FileStreamSinkSuite extends StreamTest {
6464
}
6565
}
6666

67+
test("SPARK-21167: encode and decode path correctly") {
68+
val inputData = MemoryStream[String]
69+
val ds = inputData.toDS()
70+
71+
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
72+
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
73+
74+
val query = ds.map(s => (s, s.length))
75+
.toDF("value", "len")
76+
.writeStream
77+
.partitionBy("value")
78+
.option("checkpointLocation", checkpointDir)
79+
.format("parquet")
80+
.start(outputDir)
81+
82+
try {
83+
// The output is partitoned by "value", so the value will appear in the file path.
84+
// This is to test if we handle spaces in the path correctly.
85+
inputData.addData("hello world")
86+
failAfter(streamingTimeout) {
87+
query.processAllAvailable()
88+
}
89+
val outputDf = spark.read.parquet(outputDir)
90+
checkDatasetUnorderly(outputDf.as[(Int, String)], ("hello world".length, "hello world"))
91+
} finally {
92+
query.stop()
93+
}
94+
}
95+
6796
test("partitioned writing and batch reading") {
6897
val inputData = MemoryStream[Int]
6998
val ds = inputData.toDS()

0 commit comments

Comments
 (0)