Skip to content

Commit 3ae4f07

Browse files
ScrapCodessteveloughran
authored andcommitted
[SPARK-17159][STREAM] Significant speed up for running spark streaming against Object store.
## What changes were proposed in this pull request? Original work by Steve Loughran. Based on apache#17745. This is a minimal patch of changes to FileInputDStream to reduce File status requests when querying files. Each call to file status is 3+ http calls to object store. This patch eliminates the need for it, by using FileStatus objects. This is a minor optimisation when working with filesystems, but significant when working with object stores. ## How was this patch tested? Tests included. Existing tests pass. Closes apache#22339 from ScrapCodes/PR_17745. Lead-authored-by: Prashant Sharma <[email protected]> Co-authored-by: Steve Loughran <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 95ae209 commit 3ae4f07

File tree

3 files changed

+118
-51
lines changed

3 files changed

+118
-51
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,19 @@
1717

1818
package org.apache.spark.streaming.dstream
1919

20-
import java.io.{IOException, ObjectInputStream}
20+
import java.io.{FileNotFoundException, IOException, ObjectInputStream}
2121

2222
import scala.collection.mutable
2323
import scala.reflect.ClassTag
2424

2525
import org.apache.hadoop.conf.Configuration
26-
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
26+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2727
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
2828

2929
import org.apache.spark.rdd.{RDD, UnionRDD}
3030
import org.apache.spark.streaming._
3131
import org.apache.spark.streaming.scheduler.StreamInputInfo
32-
import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils}
32+
import org.apache.spark.util.{SerializableConfiguration, Utils}
3333

3434
/**
3535
* This class represents an input stream that monitors a Hadoop-compatible filesystem for new
@@ -122,9 +122,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
122122
// Set of files that were selected in the remembered batches
123123
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
124124

125-
// Read-through cache of file mod times, used to speed up mod time lookups
126-
@transient private var fileToModTime = new TimeStampedHashMap[String, Long](true)
127-
128125
// Timestamp of the last round of finding files
129126
@transient private var lastNewFileFindingTime = 0L
130127

@@ -140,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
140137
* a union RDD out of them. Note that this maintains the list of files that were processed
141138
* in the latest modification time in the previous call to this method. This is because the
142139
* modification time returned by the FileStatus API seems to return times only at the
143-
* granularity of seconds. And new files may have the same modification time as the
140+
* granularity of seconds in HDFS. And new files may have the same modification time as the
144141
* latest modification time in the previous call to this method yet was not reported in
145142
* the previous call.
146143
*/
@@ -174,8 +171,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
174171
logDebug("Cleared files are:\n" +
175172
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
176173
}
177-
// Delete file mod times that weren't accessed in the last round of getting new files
178-
fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
179174
}
180175

181176
/**
@@ -197,29 +192,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
197192
logDebug(s"Getting new files for time $currentTime, " +
198193
s"ignoring files older than $modTimeIgnoreThreshold")
199194

200-
val newFileFilter = new PathFilter {
201-
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
202-
}
203-
val directoryFilter = new PathFilter {
204-
override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
205-
}
206-
val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
195+
val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
196+
.filter(_.isDirectory)
197+
.map(_.getPath)
207198
val newFiles = directories.flatMap(dir =>
208-
fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
199+
fs.listStatus(dir)
200+
.filter(isNewFile(_, currentTime, modTimeIgnoreThreshold))
201+
.map(_.getPath.toString))
209202
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
210-
logInfo("Finding new files took " + timeTaken + " ms")
211-
logDebug("# cached file times = " + fileToModTime.size)
203+
logDebug(s"Finding new files took $timeTaken ms")
212204
if (timeTaken > slideDuration.milliseconds) {
213205
logWarning(
214-
"Time taken to find new files exceeds the batch size. " +
206+
s"Time taken to find new files $timeTaken exceeds the batch size. " +
215207
"Consider increasing the batch size or reducing the number of " +
216-
"files in the monitored directory."
208+
"files in the monitored directories."
217209
)
218210
}
219211
newFiles
220212
} catch {
213+
case e: FileNotFoundException =>
214+
logWarning(s"No directory to scan: $directoryPath: $e")
215+
Array.empty
221216
case e: Exception =>
222-
logWarning("Error finding new files", e)
217+
logWarning(s"Error finding new files under $directoryPath", e)
223218
reset()
224219
Array.empty
225220
}
@@ -242,16 +237,24 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
242237
* The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1).
243238
* Hence they can get selected as new files again. To prevent this, files whose mod time is more
244239
* than current batch time are not considered.
240+
* @param fileStatus file status
241+
* @param currentTime time of the batch
242+
* @param modTimeIgnoreThreshold the ignore threshold
243+
* @return true if the file has been modified within the batch window
245244
*/
246-
private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
245+
private def isNewFile(
246+
fileStatus: FileStatus,
247+
currentTime: Long,
248+
modTimeIgnoreThreshold: Long): Boolean = {
249+
val path = fileStatus.getPath
247250
val pathStr = path.toString
248251
// Reject file if it does not satisfy filter
249252
if (!filter(path)) {
250253
logDebug(s"$pathStr rejected by filter")
251254
return false
252255
}
253256
// Reject file if it was created before the ignore time
254-
val modTime = getFileModTime(path)
257+
val modTime = fileStatus.getModificationTime()
255258
if (modTime <= modTimeIgnoreThreshold) {
256259
// Use <= instead of < to avoid SPARK-4518
257260
logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold")
@@ -293,11 +296,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
293296
new UnionRDD(context.sparkContext, fileRDDs)
294297
}
295298

296-
/** Get file mod time from cache or fetch it from the file system */
297-
private def getFileModTime(path: Path) = {
298-
fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
299-
}
300-
301299
private def directoryPath: Path = {
302300
if (_path == null) _path = new Path(directory)
303301
_path
@@ -319,7 +317,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
319317
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
320318
batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]
321319
recentlySelectedFiles = new mutable.HashSet[String]()
322-
fileToModTime = new TimeStampedHashMap[String, Long](true)
323320
}
324321

325322
/**

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 78 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import scala.collection.JavaConverters._
2727
import scala.collection.mutable
2828

2929
import com.google.common.io.Files
30-
import org.apache.hadoop.fs.Path
30+
import org.apache.commons.io.IOUtils
31+
import org.apache.hadoop.fs.{FileSystem, Path}
3132
import org.apache.hadoop.io.{LongWritable, Text}
3233
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
3334
import org.scalatest.BeforeAndAfter
@@ -130,10 +131,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
130131
}
131132

132133
test("binary records stream") {
133-
var testDir: File = null
134-
try {
134+
withTempDir { testDir =>
135135
val batchDuration = Seconds(2)
136-
testDir = Utils.createTempDir()
137136
// Create a file that exists before the StreamingContext is created:
138137
val existingFile = new File(testDir, "0")
139138
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
@@ -176,8 +175,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
176175
assert(obtainedOutput(i) === input.map(b => (b + i).toByte))
177176
}
178177
}
179-
} finally {
180-
if (testDir != null) Utils.deleteRecursively(testDir)
181178
}
182179
}
183180

@@ -190,10 +187,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
190187
}
191188

192189
test("file input stream - wildcard") {
193-
var testDir: File = null
194-
try {
190+
withTempDir { testDir =>
195191
val batchDuration = Seconds(2)
196-
testDir = Utils.createTempDir()
197192
val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
198193
val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")
199194

@@ -221,12 +216,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
221216
// not enough to trigger a batch
222217
clock.advance(batchDuration.milliseconds / 2)
223218

224-
def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
219+
def createFileAndAdvanceTime(data: Int, dir: File): Unit = {
225220
val file = new File(testSubDir1, data.toString)
226221
Files.write(data + "\n", file, StandardCharsets.UTF_8)
227222
assert(file.setLastModified(clock.getTimeMillis()))
228223
assert(file.lastModified === clock.getTimeMillis())
229-
logInfo("Created file " + file)
224+
logInfo(s"Created file $file")
230225
// Advance the clock after creating the file to avoid a race when
231226
// setting its modification time
232227
clock.advance(batchDuration.milliseconds)
@@ -236,18 +231,85 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
236231
}
237232
// Over time, create files in the temp directory 1
238233
val input1 = Seq(1, 2, 3, 4, 5)
239-
input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
234+
input1.foreach(i => createFileAndAdvanceTime(i, testSubDir1))
240235

241236
// Over time, create files in the temp directory 1
242237
val input2 = Seq(6, 7, 8, 9, 10)
243-
input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
238+
input2.foreach(i => createFileAndAdvanceTime(i, testSubDir2))
244239

245240
// Verify that all the files have been read
246241
val expectedOutput = (input1 ++ input2).map(_.toString).toSet
247242
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
248243
}
249-
} finally {
250-
if (testDir != null) Utils.deleteRecursively(testDir)
244+
}
245+
}
246+
247+
test("Modified files are correctly detected.") {
248+
withTempDir { testDir =>
249+
val batchDuration = Seconds(2)
250+
val durationMs = batchDuration.milliseconds
251+
val testPath = new Path(testDir.toURI)
252+
val streamDir = new Path(testPath, "streaming")
253+
val streamGlobPath = new Path(streamDir, "sub*")
254+
val generatedDir = new Path(testPath, "generated")
255+
val generatedSubDir = new Path(generatedDir, "subdir")
256+
val renamedSubDir = new Path(streamDir, "subdir")
257+
258+
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
259+
val sparkContext = ssc.sparkContext
260+
val hc = sparkContext.hadoopConfiguration
261+
val fs = FileSystem.get(testPath.toUri, hc)
262+
263+
fs.delete(testPath, true)
264+
fs.mkdirs(testPath)
265+
fs.mkdirs(streamDir)
266+
fs.mkdirs(generatedSubDir)
267+
268+
def write(path: Path, text: String): Unit = {
269+
val out = fs.create(path, true)
270+
IOUtils.write(text, out)
271+
out.close()
272+
}
273+
274+
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
275+
val existingFile = new Path(generatedSubDir, "existing")
276+
write(existingFile, "existing\n")
277+
val status = fs.getFileStatus(existingFile)
278+
clock.setTime(status.getModificationTime + durationMs)
279+
val batchCounter = new BatchCounter(ssc)
280+
val fileStream = ssc.textFileStream(streamGlobPath.toUri.toString)
281+
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
282+
val outputStream = new TestOutputStream(fileStream, outputQueue)
283+
outputStream.register()
284+
285+
ssc.start()
286+
clock.advance(durationMs)
287+
eventually(eventuallyTimeout) {
288+
assert(1 === batchCounter.getNumCompletedBatches)
289+
}
290+
// create and rename the file
291+
// put a file into the generated directory
292+
val textPath = new Path(generatedSubDir, "renamed.txt")
293+
write(textPath, "renamed\n")
294+
val now = clock.getTimeMillis()
295+
val modTime = now + durationMs / 2
296+
fs.setTimes(textPath, modTime, modTime)
297+
val textFilestatus = fs.getFileStatus(existingFile)
298+
assert(textFilestatus.getModificationTime < now + durationMs)
299+
300+
// rename the directory under the path being scanned
301+
fs.rename(generatedSubDir, renamedSubDir)
302+
303+
// move forward one window
304+
clock.advance(durationMs)
305+
// await the next scan completing
306+
eventually(eventuallyTimeout) {
307+
assert(2 === batchCounter.getNumCompletedBatches)
308+
}
309+
// verify that the "renamed" file is found, but not the "existing" one which is out of
310+
// the window
311+
assert(Set("renamed") === outputQueue.asScala.flatten.toSet)
312+
}
251313
}
252314
}
253315

@@ -416,10 +478,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
416478
}
417479

418480
def testFileStream(newFilesOnly: Boolean) {
419-
var testDir: File = null
420-
try {
481+
withTempDir { testDir =>
421482
val batchDuration = Seconds(2)
422-
testDir = Utils.createTempDir()
423483
// Create a file that exists before the StreamingContext is created:
424484
val existingFile = new File(testDir, "0")
425485
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
@@ -466,8 +526,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
466526
}
467527
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
468528
}
469-
} finally {
470-
if (testDir != null) Utils.deleteRecursively(testDir)
471529
}
472530
}
473531
}

streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.streaming
1919

20-
import java.io.{IOException, ObjectInputStream}
20+
import java.io.{File, IOException, ObjectInputStream}
2121
import java.util.concurrent.ConcurrentLinkedQueue
2222

2323
import scala.collection.JavaConverters._
@@ -557,4 +557,16 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
557557
verifyOutput[W](output.toSeq, expectedOutput, useSet)
558558
}
559559
}
560+
561+
/**
562+
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
563+
* returns.
564+
* (originally from `SqlTestUtils`.)
565+
* @todo Probably this method should be moved to a more general place
566+
*/
567+
protected def withTempDir(f: File => Unit): Unit = {
568+
val dir = Utils.createTempDir().getCanonicalFile
569+
try f(dir) finally Utils.deleteRecursively(dir)
570+
}
571+
560572
}

0 commit comments

Comments
 (0)