Skip to content

Commit 67855b9

Browse files
author
Robert Kruszewski
committed
Merge branch 'master' into rk/more-upstream
2 parents 75c75f4 + 9ea8d3d commit 67855b9

File tree

75 files changed

+3710
-385
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+3710
-385
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,12 @@ package object config {
338338
.booleanConf
339339
.createWithDefault(false)
340340

341+
private[spark] val IGNORE_MISSING_FILES = ConfigBuilder("spark.files.ignoreMissingFiles")
342+
.doc("Whether to ignore missing files. If true, the Spark jobs will continue to run when " +
343+
"encountering missing files and the contents that have been read will still be returned.")
344+
.booleanConf
345+
.createWithDefault(false)
346+
341347
private[spark] val APP_CALLER_CONTEXT = ConfigBuilder("spark.log.callerContext")
342348
.stringConf
343349
.createOptional

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20-
import java.io.IOException
20+
import java.io.{FileNotFoundException, IOException}
2121
import java.text.SimpleDateFormat
2222
import java.util.{Date, Locale}
2323

@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
2828
import org.apache.hadoop.mapred._
2929
import org.apache.hadoop.mapred.lib.CombineFileSplit
3030
import org.apache.hadoop.mapreduce.TaskType
31+
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
3132
import org.apache.hadoop.util.ReflectionUtils
3233

3334
import org.apache.spark._
@@ -134,6 +135,8 @@ class HadoopRDD[K, V](
134135

135136
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
136137

138+
private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
139+
137140
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
138141

139142
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
@@ -197,17 +200,24 @@ class HadoopRDD[K, V](
197200
val jobConf = getJobConf()
198201
// add the credentials here as this can be called before SparkContext initialized
199202
SparkHadoopUtil.get.addCredentials(jobConf)
200-
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
201-
val inputSplits = if (ignoreEmptySplits) {
202-
allInputSplits.filter(_.getLength > 0)
203-
} else {
204-
allInputSplits
205-
}
206-
val array = new Array[Partition](inputSplits.size)
207-
for (i <- 0 until inputSplits.size) {
208-
array(i) = new HadoopPartition(id, i, inputSplits(i))
203+
try {
204+
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
205+
val inputSplits = if (ignoreEmptySplits) {
206+
allInputSplits.filter(_.getLength > 0)
207+
} else {
208+
allInputSplits
209+
}
210+
val array = new Array[Partition](inputSplits.size)
211+
for (i <- 0 until inputSplits.size) {
212+
array(i) = new HadoopPartition(id, i, inputSplits(i))
213+
}
214+
array
215+
} catch {
216+
case e: InvalidInputException if ignoreMissingFiles =>
217+
logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
218+
s" partitions returned from this path.", e)
219+
Array.empty[Partition]
209220
}
210-
array
211221
}
212222

213223
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
@@ -256,6 +266,12 @@ class HadoopRDD[K, V](
256266
try {
257267
inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
258268
} catch {
269+
case e: FileNotFoundException if ignoreMissingFiles =>
270+
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
271+
finished = true
272+
null
273+
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
274+
case e: FileNotFoundException if !ignoreMissingFiles => throw e
259275
case e: IOException if ignoreCorruptFiles =>
260276
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
261277
finished = true
@@ -276,6 +292,11 @@ class HadoopRDD[K, V](
276292
try {
277293
finished = !reader.next(key, value)
278294
} catch {
295+
case e: FileNotFoundException if ignoreMissingFiles =>
296+
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
297+
finished = true
298+
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
299+
case e: FileNotFoundException if !ignoreMissingFiles => throw e
279300
case e: IOException if ignoreCorruptFiles =>
280301
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
281302
finished = true

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20-
import java.io.IOException
20+
import java.io.{FileNotFoundException, IOException}
2121
import java.text.SimpleDateFormat
2222
import java.util.{Date, Locale}
2323

@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
2828
import org.apache.hadoop.io.Writable
2929
import org.apache.hadoop.mapred.JobConf
3030
import org.apache.hadoop.mapreduce._
31-
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
31+
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileInputFormat, FileSplit, InvalidInputException}
3232
import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
3333

3434
import org.apache.spark._
@@ -90,6 +90,8 @@ class NewHadoopRDD[K, V](
9090

9191
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
9292

93+
private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
94+
9395
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
9496

9597
def getConf: Configuration = {
@@ -124,17 +126,25 @@ class NewHadoopRDD[K, V](
124126
configurable.setConf(_conf)
125127
case _ =>
126128
}
127-
val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
128-
val rawSplits = if (ignoreEmptySplits) {
129-
allRowSplits.filter(_.getLength > 0)
130-
} else {
131-
allRowSplits
132-
}
133-
val result = new Array[Partition](rawSplits.size)
134-
for (i <- 0 until rawSplits.size) {
135-
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
129+
try {
130+
val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
131+
val rawSplits = if (ignoreEmptySplits) {
132+
allRowSplits.filter(_.getLength > 0)
133+
} else {
134+
allRowSplits
135+
}
136+
val result = new Array[Partition](rawSplits.size)
137+
for (i <- 0 until rawSplits.size) {
138+
result(i) =
139+
new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
140+
}
141+
result
142+
} catch {
143+
case e: InvalidInputException if ignoreMissingFiles =>
144+
logWarning(s"${_conf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
145+
s" partitions returned from this path.", e)
146+
Array.empty[Partition]
136147
}
137-
result
138148
}
139149

140150
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
@@ -189,6 +199,12 @@ class NewHadoopRDD[K, V](
189199
_reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
190200
_reader
191201
} catch {
202+
case e: FileNotFoundException if ignoreMissingFiles =>
203+
logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
204+
finished = true
205+
null
206+
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
207+
case e: FileNotFoundException if !ignoreMissingFiles => throw e
192208
case e: IOException if ignoreCorruptFiles =>
193209
logWarning(
194210
s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
@@ -213,6 +229,11 @@ class NewHadoopRDD[K, V](
213229
try {
214230
finished = !reader.nextKeyValue
215231
} catch {
232+
case e: FileNotFoundException if ignoreMissingFiles =>
233+
logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
234+
finished = true
235+
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
236+
case e: FileNotFoundException if !ignoreMissingFiles => throw e
216237
case e: IOException if ignoreCorruptFiles =>
217238
logWarning(
218239
s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",

core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ private[scheduler] class BlacklistTracker (
210210
updateNextExpiryTime()
211211
killBlacklistedExecutor(exec)
212212

213-
val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]())
213+
val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(host, HashSet[String]())
214214
blacklistedExecsOnNode += exec
215215
}
216216
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,17 +1092,16 @@ class DAGScheduler(
10921092
// the stage as completed here in case there are no tasks to run
10931093
markStageAsFinished(stage, None)
10941094

1095-
val debugString = stage match {
1095+
stage match {
10961096
case stage: ShuffleMapStage =>
1097-
s"Stage ${stage} is actually done; " +
1098-
s"(available: ${stage.isAvailable}," +
1099-
s"available outputs: ${stage.numAvailableOutputs}," +
1100-
s"partitions: ${stage.numPartitions})"
1097+
logDebug(s"Stage ${stage} is actually done; " +
1098+
s"(available: ${stage.isAvailable}," +
1099+
s"available outputs: ${stage.numAvailableOutputs}," +
1100+
s"partitions: ${stage.numPartitions})")
1101+
markMapStageJobsAsFinished(stage)
11011102
case stage : ResultStage =>
1102-
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
1103+
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
11031104
}
1104-
logDebug(debugString)
1105-
11061105
submitWaitingChildStages(stage)
11071106
}
11081107
}
@@ -1307,13 +1306,7 @@ class DAGScheduler(
13071306
shuffleStage.findMissingPartitions().mkString(", "))
13081307
submitStage(shuffleStage)
13091308
} else {
1310-
// Mark any map-stage jobs waiting on this stage as finished
1311-
if (shuffleStage.mapStageJobs.nonEmpty) {
1312-
val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
1313-
for (job <- shuffleStage.mapStageJobs) {
1314-
markMapStageJobAsFinished(job, stats)
1315-
}
1316-
}
1309+
markMapStageJobsAsFinished(shuffleStage)
13171310
submitWaitingChildStages(shuffleStage)
13181311
}
13191312
}
@@ -1433,6 +1426,16 @@ class DAGScheduler(
14331426
}
14341427
}
14351428

1429+
private[scheduler] def markMapStageJobsAsFinished(shuffleStage: ShuffleMapStage): Unit = {
1430+
// Mark any map-stage jobs waiting on this stage as finished
1431+
if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) {
1432+
val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
1433+
for (job <- shuffleStage.mapStageJobs) {
1434+
markMapStageJobAsFinished(job, stats)
1435+
}
1436+
}
1437+
}
1438+
14361439
/**
14371440
* Responds to an executor being lost. This is called inside the event loop, so it assumes it can
14381441
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.

core/src/main/scala/org/apache/spark/util/collection/Spillable.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.util.collection
1919

2020
import org.apache.spark.SparkEnv
2121
import org.apache.spark.internal.Logging
22+
import org.apache.spark.internal.config._
2223
import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager}
2324

2425
/**
@@ -41,7 +42,7 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
4142
protected def forceSpill(): Boolean
4243

4344
// Number of elements read from input since last spill
44-
protected def elementsRead: Long = _elementsRead
45+
protected def elementsRead: Int = _elementsRead
4546

4647
// Called by subclasses every time a record is read
4748
// It's used for checking spilling frequency
@@ -54,15 +55,15 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
5455

5556
// Force this collection to spill when there are this many elements in memory
5657
// For testing only
57-
private[this] val numElementsForceSpillThreshold: Long =
58-
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MaxValue)
58+
private[this] val numElementsForceSpillThreshold: Int =
59+
SparkEnv.get.conf.get(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD)
5960

6061
// Threshold for this collection's size in bytes before we start tracking its memory usage
6162
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
6263
@volatile private[this] var myMemoryThreshold = initialMemoryThreshold
6364

6465
// Number of elements read from input since last spill
65-
private[this] var _elementsRead = 0L
66+
private[this] var _elementsRead = 0
6667

6768
// Number of bytes spilled in total
6869
@volatile private[this] var _memoryBytesSpilled = 0L

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.zip.GZIPOutputStream
2323

2424
import scala.io.Source
2525

26+
import org.apache.hadoop.conf.Configuration
2627
import org.apache.hadoop.fs.Path
2728
import org.apache.hadoop.io._
2829
import org.apache.hadoop.io.compress.DefaultCodec
@@ -32,7 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
3233
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
3334

3435
import org.apache.spark.internal.config._
35-
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
36+
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD, RDD}
3637
import org.apache.spark.storage.StorageLevel
3738
import org.apache.spark.util.Utils
3839

@@ -596,4 +597,70 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
596597
actualPartitionNum = 5,
597598
expectedPartitionNum = 2)
598599
}
600+
601+
test("spark.files.ignoreMissingFiles should work both HadoopRDD and NewHadoopRDD") {
602+
// "file not found" can happen both when getPartitions or compute in HadoopRDD/NewHadoopRDD,
603+
// We test both cases here.
604+
605+
val deletedPath = new Path(tempDir.getAbsolutePath, "test-data-1")
606+
val fs = deletedPath.getFileSystem(new Configuration())
607+
fs.delete(deletedPath, true)
608+
intercept[FileNotFoundException](fs.open(deletedPath))
609+
610+
def collectRDDAndDeleteFileBeforeCompute(newApi: Boolean): Array[_] = {
611+
val dataPath = new Path(tempDir.getAbsolutePath, "test-data-2")
612+
val writer = new OutputStreamWriter(new FileOutputStream(new File(dataPath.toString)))
613+
writer.write("hello\n")
614+
writer.write("world\n")
615+
writer.close()
616+
val rdd = if (newApi) {
617+
sc.newAPIHadoopFile(dataPath.toString, classOf[NewTextInputFormat],
618+
classOf[LongWritable], classOf[Text])
619+
} else {
620+
sc.textFile(dataPath.toString)
621+
}
622+
rdd.partitions
623+
fs.delete(dataPath, true)
624+
// Exception happens when initialize record reader in HadoopRDD/NewHadoopRDD.compute
625+
// because partitions' info already cached.
626+
rdd.collect()
627+
}
628+
629+
// collect HadoopRDD and NewHadoopRDD when spark.files.ignoreMissingFiles=false by default.
630+
sc = new SparkContext("local", "test")
631+
intercept[org.apache.hadoop.mapred.InvalidInputException] {
632+
// Exception happens when HadoopRDD.getPartitions
633+
sc.textFile(deletedPath.toString).collect()
634+
}
635+
636+
var e = intercept[SparkException] {
637+
collectRDDAndDeleteFileBeforeCompute(false)
638+
}
639+
assert(e.getCause.isInstanceOf[java.io.FileNotFoundException])
640+
641+
intercept[org.apache.hadoop.mapreduce.lib.input.InvalidInputException] {
642+
// Exception happens when NewHadoopRDD.getPartitions
643+
sc.newAPIHadoopFile(deletedPath.toString, classOf[NewTextInputFormat],
644+
classOf[LongWritable], classOf[Text]).collect
645+
}
646+
647+
e = intercept[SparkException] {
648+
collectRDDAndDeleteFileBeforeCompute(true)
649+
}
650+
assert(e.getCause.isInstanceOf[java.io.FileNotFoundException])
651+
652+
sc.stop()
653+
654+
// collect HadoopRDD and NewHadoopRDD when spark.files.ignoreMissingFiles=true.
655+
val conf = new SparkConf().set(IGNORE_MISSING_FILES, true)
656+
sc = new SparkContext("local", "test", conf)
657+
assert(sc.textFile(deletedPath.toString).collect().isEmpty)
658+
659+
assert(collectRDDAndDeleteFileBeforeCompute(false).isEmpty)
660+
661+
assert(sc.newAPIHadoopFile(deletedPath.toString, classOf[NewTextInputFormat],
662+
classOf[LongWritable], classOf[Text]).collect().isEmpty)
663+
664+
assert(collectRDDAndDeleteFileBeforeCompute(true).isEmpty)
665+
}
599666
}

core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,9 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
574574
verify(allocationClientMock, never).killExecutors(any(), any(), any(), any())
575575
verify(allocationClientMock, never).killExecutorsOnHost(any())
576576

577+
assert(blacklist.nodeToBlacklistedExecs.contains("hostA"))
578+
assert(blacklist.nodeToBlacklistedExecs("hostA").contains("1"))
579+
577580
// Enable auto-kill. Blacklist an executor and make sure killExecutors is called.
578581
conf.set(config.BLACKLIST_KILL_ENABLED, true)
579582
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
@@ -589,6 +592,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
589592
1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
590593
assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
591594
assert(blacklist.nodeIdToBlacklistExpiryTime.isEmpty)
595+
assert(blacklist.nodeToBlacklistedExecs.contains("hostA"))
596+
assert(blacklist.nodeToBlacklistedExecs("hostA").contains("1"))
592597

593598
// Enable external shuffle service to see if all the executors on this node will be killed.
594599
conf.set(config.SHUFFLE_SERVICE_ENABLED, true)

0 commit comments

Comments
 (0)