Skip to content

Commit bd7510b

Browse files
HeartSaVioRMarcelo Vanzin
authored andcommitted
[SPARK-30281][SS] Consider partitioned/recursive option while verifying archive path on FileStreamSource
### What changes were proposed in this pull request? This patch renews the verification logic of archive path for FileStreamSource, as we found the logic doesn't take partitioned/recursive options into account. Before the patch, it only requires the archive path to have depth more than 2 (two subdirectories from root), leveraging the fact FileStreamSource normally reads the files where the parent directory matches the pattern or the file itself matches the pattern. Given 'archive' operation moves the files to the base archive path with retaining the full path, archive path is tend to be safe if the depth is more than 2, meaning FileStreamSource doesn't re-read archived files as new source files. WIth partitioned/recursive options, the fact is invalid, as FileStreamSource can read any files in any depth of subdirectories for source pattern. To deal with this correctly, we have to renew the verification logic, which may not intuitive and simple but works for all cases. The new verification logic prevents both cases: 1) archive path matches with source pattern as "prefix" (the depth of archive path > the depth of source pattern) e.g. * source pattern: `/hello*/spar?` * archive path: `/hello/spark/structured/streaming` Any files in archive path will match with source pattern when recursive option is enabled. 2) source pattern matches with archive path as "prefix" (the depth of source pattern > the depth of archive path) e.g. * source pattern: `/hello*/spar?/structured/hello2*` * archive path: `/hello/spark/structured` Some archive files will not match with source pattern, e.g. file path: `/hello/spark/structured/hello2`, then final archived path: `/hello/spark/structured/hello/spark/structured/hello2`. But some other archive files will still match with source pattern, e.g. file path: `/hello2/spark/structured/hello2`, then final archived path: `/hello/spark/structured/hello2/spark/structured/hello2` which matches with source pattern when recursive is enabled. Implicitly it also prevents archive path matches with source pattern as full match (same depth). We would want to prevent any source files to be archived and added to new source files again, so the patch takes most restrictive approach to prevent the possible cases. ### Why are the changes needed? Without this patch, there's a chance archived files are included as new source files when partitioned/recursive option is enabled, as current condition doesn't take these options into account. ### Does this PR introduce any user-facing change? Only for Spark 3.0.0-preview (only preview 1 for now, but possibly preview 2 as well) - end users are required to provide archive path with ensuring a bit complicated conditions, instead of simply higher than 2 depths. ### How was this patch tested? New UT. Closes apache#26920 from HeartSaVioR/SPARK-30281. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 0a72dba commit bd7510b

File tree

3 files changed

+80
-22
lines changed

3 files changed

+80
-22
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,8 @@ Here are the details of all the sources in Spark.
548548
"s3a://a/b/c/dataset.txt"<br/>
549549
<code>cleanSource</code>: option to clean up completed files after processing.<br/>
550550
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/>
551-
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. <code>/archived/here</code>. This will ensure archived files are never included as new source files.<br/>
551+
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.<br/>
552+
For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match.<br/>
552553
Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
553554
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
554555
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.<br/>

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit._
2323
import scala.util.control.NonFatal
2424

2525
import org.apache.hadoop.conf.Configuration
26-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
26+
import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
2727

2828
import org.apache.spark.deploy.SparkHadoopUtil
2929
import org.apache.spark.internal.Logging
@@ -389,20 +389,63 @@ object FileStreamSource {
389389
s"on a different file system than the source files. source path: $sourcePath" +
390390
s" / base archive path: $baseArchivePath")
391391

392-
/**
393-
* FileStreamSource reads the files which one of below conditions is met:
394-
* 1) file itself is matched with source path
395-
* 2) parent directory is matched with source path
396-
*
397-
* Checking with glob pattern is costly, so set this requirement to eliminate the cases
398-
* where the archive path can be matched with source path. For example, when file is moved
399-
* to archive directory, destination path will retain input file's path as suffix, so
400-
* destination path can't be matched with source path if archive directory's depth is longer
401-
* than 2, as neither file nor parent directory of destination path can be matched with
402-
* source path.
403-
*/
404-
require(baseArchivePath.depth() > 2, "Base archive path must have at least 2 " +
405-
"subdirectories from root directory. e.g. '/data/archive'")
392+
require(!isBaseArchivePathMatchedAgainstSourcePattern, "Base archive path cannot be set to" +
393+
" the path where archived path can possibly match with source pattern. Ensure the base " +
394+
"archive path doesn't match with source pattern in depth, where the depth is minimum of" +
395+
" depth on both paths.")
396+
}
397+
398+
private def getAncestorEnsuringDepth(path: Path, depth: Int): Path = {
399+
var newPath = path
400+
while (newPath.depth() > depth) {
401+
newPath = newPath.getParent
402+
}
403+
newPath
404+
}
405+
406+
private def isBaseArchivePathMatchedAgainstSourcePattern: Boolean = {
407+
// We should disallow end users to set base archive path which path matches against source
408+
// pattern to avoid checking each source file. There're couple of cases which allow
409+
// FileStreamSource to read any depth of subdirectory under the source pattern, so we should
410+
// consider all three cases 1) both has same depth 2) base archive path is longer than source
411+
// pattern 3) source pattern is longer than base archive path. To handle all cases, we take
412+
// min of depth for both paths, and check the match.
413+
414+
val minDepth = math.min(sourcePath.depth(), baseArchivePath.depth())
415+
416+
val sourcePathMinDepth = getAncestorEnsuringDepth(sourcePath, minDepth)
417+
val baseArchivePathMinDepth = getAncestorEnsuringDepth(baseArchivePath, minDepth)
418+
419+
val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePathMinDepth)
420+
421+
var matched = true
422+
423+
// pathToCompare should have same depth as sourceGlobFilters.length
424+
var pathToCompare = baseArchivePathMinDepth
425+
var index = 0
426+
do {
427+
// GlobFilter only matches against its name, not full path so it's safe to compare
428+
if (!sourceGlobFilters(index).accept(pathToCompare)) {
429+
matched = false
430+
} else {
431+
pathToCompare = pathToCompare.getParent
432+
index += 1
433+
}
434+
} while (matched && !pathToCompare.isRoot)
435+
436+
matched
437+
}
438+
439+
private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
440+
val filters = new scala.collection.mutable.MutableList[GlobFilter]()
441+
442+
var currentPath = sourcePath
443+
while (!currentPath.isRoot) {
444+
filters += new GlobFilter(currentPath.getName)
445+
currentPath = currentPath.getParent
446+
}
447+
448+
filters.toList
406449
}
407450

408451
override def clean(entry: FileEntry): Unit = {

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1814,15 +1814,29 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
18141814
override def getFileStatus(f: Path): FileStatus = throw new NotImplementedError
18151815
}
18161816

1817-
test("SourceFileArchiver - base archive path depth <= 2") {
1817+
test("SourceFileArchiver - fail when base archive path matches source pattern") {
18181818
val fakeFileSystem = new FakeFileSystem("fake")
18191819

1820-
val sourcePatternPath = new Path("/hello*/h{e,f}ll?")
1821-
val baseArchiveDirPath = new Path("/hello")
1822-
1823-
intercept[IllegalArgumentException] {
1824-
new SourceFileArchiver(fakeFileSystem, sourcePatternPath, fakeFileSystem, baseArchiveDirPath)
1820+
def assertThrowIllegalArgumentException(sourcePatttern: Path, baseArchivePath: Path): Unit = {
1821+
intercept[IllegalArgumentException] {
1822+
new SourceFileArchiver(fakeFileSystem, sourcePatttern, fakeFileSystem, baseArchivePath)
1823+
}
18251824
}
1825+
1826+
// 1) prefix of base archive path matches source pattern (baseArchiveDirPath has more depths)
1827+
val sourcePatternPath = new Path("/hello*/spar?")
1828+
val baseArchiveDirPath = new Path("/hello/spark/structured/streaming")
1829+
assertThrowIllegalArgumentException(sourcePatternPath, baseArchiveDirPath)
1830+
1831+
// 2) prefix of source pattern matches base archive path (source pattern has more depths)
1832+
val sourcePatternPath2 = new Path("/hello*/spar?/structured/streaming")
1833+
val baseArchiveDirPath2 = new Path("/hello/spark/structured")
1834+
assertThrowIllegalArgumentException(sourcePatternPath2, baseArchiveDirPath2)
1835+
1836+
// 3) source pattern matches base archive path (both have same depth)
1837+
val sourcePatternPath3 = new Path("/hello*/spar?/structured/*")
1838+
val baseArchiveDirPath3 = new Path("/hello/spark/structured/streaming")
1839+
assertThrowIllegalArgumentException(sourcePatternPath3, baseArchiveDirPath3)
18261840
}
18271841

18281842
test("SourceFileArchiver - different filesystems between source and archive") {

0 commit comments

Comments
 (0)