Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 900ce55

Browse files
zsxwingtdas
authored andcommitted
[SPARK-18826][SS] Add 'latestFirst' option to FileStreamSource
## What changes were proposed in this pull request? When starting a stream with a lot of backfill and maxFilesPerTrigger, the user could often want to start with most recent files first. This would let you keep low latency for recent data and slowly backfill historical data. This PR adds a new option `latestFirst` to control this behavior. When it's true, `FileStreamSource` will sort the files by the modified time from latest to oldest, and take the first `maxFilesPerTrigger` files as a new batch. ## How was this patch tested? The added test. Author: Shixiong Zhu <[email protected]> Closes apache#16251 from zsxwing/newest-first. (cherry picked from commit 68a6dc9) Signed-off-by: Tathagata Das <[email protected]>
1 parent e430915 commit 900ce55

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,18 @@ class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging {
5353
/** Options as specified by the user, in a case-insensitive map, without "path" set. */
5454
val optionMapWithoutPath: Map[String, String] =
5555
parameters.filterKeys(_ != "path")
56+
57+
/**
58+
* Whether to scan latest files first. If it's true, when the source finds unprocessed files in a
59+
* trigger, it will first process the latest files.
60+
*/
61+
val latestFirst: Boolean = parameters.get("latestFirst").map { str =>
62+
try {
63+
str.toBoolean
64+
} catch {
65+
case _: IllegalArgumentException =>
66+
throw new IllegalArgumentException(
67+
s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'")
68+
}
69+
}.getOrElse(false)
5670
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ class FileStreamSource(
6262
/** Maximum number of new files to be considered in each batch */
6363
private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger
6464

65+
private val fileSortOrder = if (sourceOptions.latestFirst) {
66+
logWarning(
67+
"""'latestFirst' is true. New files will be processed first.
68+
|It may affect the watermark value""".stripMargin)
69+
implicitly[Ordering[Long]].reverse
70+
} else {
71+
implicitly[Ordering[Long]]
72+
}
73+
6574
/** A mapping from a file that we have processed to some timestamp it was last modified. */
6675
// Visible for testing and debugging in production.
6776
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
@@ -155,7 +164,7 @@ class FileStreamSource(
155164
val startTime = System.nanoTime
156165
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
157166
val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
158-
val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
167+
val files = catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status =>
159168
(status.getPath.toUri.toString, status.getModificationTime)
160169
}
161170
val endTime = System.nanoTime

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming
2020
import java.io.File
2121

2222
import org.scalatest.PrivateMethodTester
23+
import org.scalatest.concurrent.Eventually._
2324
import org.scalatest.time.SpanSugar._
2425

2526
import org.apache.spark.sql._
@@ -1059,6 +1060,52 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
10591060
val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
10601061
SerializedOffset(str.trim)
10611062
}
1063+
1064+
test("FileStreamSource - latestFirst") {
1065+
withTempDir { src =>
1066+
// Prepare two files: 1.txt, 2.txt, and make sure they have different modified time.
1067+
val f1 = stringToFile(new File(src, "1.txt"), "1")
1068+
val f2 = stringToFile(new File(src, "2.txt"), "2")
1069+
f2.setLastModified(f1.lastModified + 1000)
1070+
1071+
def runTwoBatchesAndVerifyResults(
1072+
latestFirst: Boolean,
1073+
firstBatch: String,
1074+
secondBatch: String): Unit = {
1075+
val fileStream = createFileStream(
1076+
"text",
1077+
src.getCanonicalPath,
1078+
options = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1"))
1079+
val clock = new StreamManualClock()
1080+
testStream(fileStream)(
1081+
StartStream(trigger = ProcessingTime(10), triggerClock = clock),
1082+
AssertOnQuery { _ =>
1083+
// Block until the first batch finishes.
1084+
eventually(timeout(streamingTimeout)) {
1085+
assert(clock.isStreamWaitingAt(0))
1086+
}
1087+
true
1088+
},
1089+
CheckLastBatch(firstBatch),
1090+
AdvanceManualClock(10),
1091+
AssertOnQuery { _ =>
1092+
// Block until the second batch finishes.
1093+
eventually(timeout(streamingTimeout)) {
1094+
assert(clock.isStreamWaitingAt(10))
1095+
}
1096+
true
1097+
},
1098+
CheckLastBatch(secondBatch)
1099+
)
1100+
}
1101+
1102+
// Read oldest files first, so the first batch is "1", and the second batch is "2".
1103+
runTwoBatchesAndVerifyResults(latestFirst = false, firstBatch = "1", secondBatch = "2")
1104+
1105+
// Read latest files first, so the first batch is "2", and the second batch is "1".
1106+
runTwoBatchesAndVerifyResults(latestFirst = true, firstBatch = "2", secondBatch = "1")
1107+
}
1108+
}
10621109
}
10631110

10641111
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {

0 commit comments

Comments
 (0)