Skip to content

Commit 25431d7

Browse files
HeartSaVioRzsxwing
authored andcommitted
[SPARK-29953][SS] Don't clean up source files for FileStreamSource if the files belong to the output of FileStreamSink
### What changes were proposed in this pull request? This patch prevents the cleanup operation in FileStreamSource if the source files belong to the FileStreamSink. This is needed because the output of FileStreamSink can be read with multiple Spark queries and queries will read the files based on the metadata log, which won't reflect the cleanup. To simplify the logic, the patch only takes care of the case of when the source path without glob pattern refers to the output directory of FileStreamSink, via checking FileStreamSource to see whether it leverages metadata directory or not to list the source files. ### Why are the changes needed? Without this patch, if end users turn on cleanup option with the path which is the output of FileStreamSink, there may be out of sync between metadata and available files which may break other queries reading the path. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added UT. Closes apache#26590 from HeartSaVioR/SPARK-29953. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 755d889 commit 25431d7

File tree

3 files changed

+81
-21
lines changed

3 files changed

+81
-21
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ Here are the details of all the sources in Spark.
551551
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. <code>/archived/here</code>. This will ensure archived files are never included as new source files.<br/>
552552
Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
553553
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
554-
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.<br/>
554+
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.<br/>
555555
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.
556556
<br/><br/>
557557
For file-format-specific options, see the related methods in <code>DataStreamReader</code>

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,17 @@ class FileStreamSource(
206206
CaseInsensitiveMap(options), None).allFiles()
207207
}
208208

209+
private def setSourceHasMetadata(newValue: Option[Boolean]): Unit = newValue match {
210+
case Some(true) =>
211+
if (sourceCleaner.isDefined) {
212+
throw new UnsupportedOperationException("Clean up source files is not supported when" +
213+
" reading from the output directory of FileStreamSink.")
214+
}
215+
sourceHasMetadata = Some(true)
216+
case _ =>
217+
sourceHasMetadata = newValue
218+
}
219+
209220
/**
210221
* Returns a list of files found, sorted by their timestamp.
211222
*/
@@ -216,7 +227,7 @@ class FileStreamSource(
216227
sourceHasMetadata match {
217228
case None =>
218229
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) {
219-
sourceHasMetadata = Some(true)
230+
setSourceHasMetadata(Some(true))
220231
allFiles = allFilesUsingMetadataLogFileIndex()
221232
} else {
222233
allFiles = allFilesUsingInMemoryFileIndex()
@@ -228,10 +239,10 @@ class FileStreamSource(
228239
// metadata log and data files are only generated after the previous
229240
// `FileStreamSink.hasMetadata` check
230241
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) {
231-
sourceHasMetadata = Some(true)
242+
setSourceHasMetadata(Some(true))
232243
allFiles = allFilesUsingMetadataLogFileIndex()
233244
} else {
234-
sourceHasMetadata = Some(false)
245+
setSourceHasMetadata(Some(false))
235246
// `allFiles` have already been fetched using InMemoryFileIndex in this round
236247
}
237248
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.hadoop.util.Progressable
2929
import org.scalatest.PrivateMethodTester
3030
import org.scalatest.time.SpanSugar._
3131

32+
import org.apache.spark.deploy.SparkHadoopUtil
3233
import org.apache.spark.sql._
3334
import org.apache.spark.sql.catalyst.util._
3435
import org.apache.spark.sql.execution.streaming._
@@ -149,6 +150,20 @@ abstract class FileStreamSourceTest
149150
}
150151
}
151152

153+
case class AddFilesToFileStreamSinkLog(
154+
fs: FileSystem,
155+
srcDir: Path,
156+
sinkLog: FileStreamSinkLog,
157+
batchId: Int)(
158+
pathFilter: Path => Boolean) extends ExternalAction {
159+
override def runAction(): Unit = {
160+
val statuses = fs.listStatus(srcDir, new PathFilter {
161+
override def accept(path: Path): Boolean = pathFilter(path)
162+
})
163+
sinkLog.add(batchId, statuses.map(SinkFileStatus(_)))
164+
}
165+
}
166+
152167
/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
153168
def createFileStream(
154169
format: String,
@@ -1617,14 +1632,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
16171632
}
16181633

16191634
test("remove completed files when remove option is enabled") {
1620-
def assertFileIsRemoved(files: Array[String], fileName: String): Unit = {
1621-
assert(!files.exists(_.startsWith(fileName)))
1622-
}
1623-
1624-
def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = {
1625-
assert(files.exists(_.startsWith(fileName)))
1626-
}
1627-
16281635
withTempDirs { case (src, tmp) =>
16291636
withSQLConf(
16301637
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
@@ -1642,28 +1649,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
16421649
CheckAnswer("keep1"),
16431650
AssertOnQuery("input file removed") { _: StreamExecution =>
16441651
// it doesn't rename any file yet
1645-
assertFileIsNotRemoved(src.list(), "keep1")
1652+
assertFileIsNotRemoved(src, "keep1")
16461653
true
16471654
},
16481655
AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
16491656
CheckAnswer("keep1", "keep2"),
16501657
AssertOnQuery("input file removed") { _: StreamExecution =>
1651-
val files = src.list()
1652-
16531658
// it renames input file for first batch, but not for second batch yet
1654-
assertFileIsRemoved(files, "keep1")
1655-
assertFileIsNotRemoved(files, "ke ep2 %")
1659+
assertFileIsRemoved(src, "keep1")
1660+
assertFileIsNotRemoved(src, "ke ep2 %")
16561661

16571662
true
16581663
},
16591664
AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"),
16601665
CheckAnswer("keep1", "keep2", "keep3"),
16611666
AssertOnQuery("input file renamed") { _: StreamExecution =>
1662-
val files = src.list()
1663-
16641667
// it renames input file for second batch, but not third batch yet
1665-
assertFileIsRemoved(files, "ke ep2 %")
1666-
assertFileIsNotRemoved(files, "keep3")
1668+
assertFileIsRemoved(src, "ke ep2 %")
1669+
assertFileIsNotRemoved(src, "keep3")
16671670

16681671
true
16691672
}
@@ -1739,6 +1742,44 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
17391742
}
17401743
}
17411744

1745+
Seq("delete", "archive").foreach { cleanOption =>
1746+
test(s"Throw UnsupportedOperationException on configuring $cleanOption when source path" +
1747+
" refers the output dir of FileStreamSink") {
1748+
withThreeTempDirs { case (src, tmp, archiveDir) =>
1749+
withSQLConf(
1750+
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
1751+
// Force deleting the old logs
1752+
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
1753+
) {
1754+
val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
1755+
"cleanSource" -> cleanOption, "sourceArchiveDir" -> archiveDir.getAbsolutePath)
1756+
1757+
val fileStream = createFileStream("text", src.getCanonicalPath, options = option)
1758+
val filtered = fileStream.filter($"value" contains "keep")
1759+
1760+
// create FileStreamSinkLog under source directory
1761+
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
1762+
new File(src, FileStreamSink.metadataDir).getCanonicalPath)
1763+
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
1764+
val srcPath = new Path(src.getCanonicalPath)
1765+
val fileSystem = srcPath.getFileSystem(hadoopConf)
1766+
1767+
// Here we will just check whether the source file is removed or not, as we cover
1768+
// functionality test of "archive" in other UT.
1769+
testStream(filtered)(
1770+
AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"),
1771+
AddFilesToFileStreamSinkLog(fileSystem, srcPath, sinkLog, 0) { path =>
1772+
path.getName.startsWith("keep1")
1773+
},
1774+
ExpectFailure[UnsupportedOperationException](
1775+
t => assert(t.getMessage.startsWith("Clean up source files is not supported")),
1776+
isFatalError = false)
1777+
)
1778+
}
1779+
}
1780+
}
1781+
}
1782+
17421783
class FakeFileSystem(scheme: String) extends FileSystem {
17431784
override def exists(f: Path): Boolean = true
17441785

@@ -1797,6 +1838,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
17971838
}
17981839
}
17991840

1841+
private def assertFileIsRemoved(sourceDir: File, fileName: String): Unit = {
1842+
assert(!sourceDir.list().exists(_.startsWith(fileName)))
1843+
}
1844+
1845+
private def assertFileIsNotRemoved(sourceDir: File, fileName: String): Unit = {
1846+
assert(sourceDir.list().exists(_.startsWith(fileName)))
1847+
}
1848+
18001849
private def assertFileIsNotMoved(sourceDir: File, expectedDir: File, filePrefix: String): Unit = {
18011850
assert(sourceDir.exists())
18021851
assert(sourceDir.list().exists(_.startsWith(filePrefix)))

0 commit comments

Comments
 (0)