Skip to content

Commit ed4101d

Browse files
jinxingcloud-fan
authored andcommitted
[SPARK-22676] Avoid iterating all partition paths when spark.sql.hive.verifyPartitionPath=true
## What changes were proposed in this pull request? In current code, it will scanning all partition paths when spark.sql.hive.verifyPartitionPath=true. e.g. table like below: ``` CREATE TABLE `test`( `id` int, `age` int, `name` string) PARTITIONED BY ( `A` string, `B` string) load data local inpath '/tmp/data0' into table test partition(A='00', B='00') load data local inpath '/tmp/data1' into table test partition(A='01', B='01') load data local inpath '/tmp/data2' into table test partition(A='10', B='10') load data local inpath '/tmp/data3' into table test partition(A='11', B='11') ``` If I query with SQL – "select * from test where A='00' and B='01' ", current code will scan all partition paths including '/data/A=00/B=00', '/data/A=00/B=00', '/data/A=01/B=01', '/data/A=10/B=10', '/data/A=11/B=11'. It costs much time and memory cost. This pr proposes to avoid iterating all partition paths. Add a config `spark.files.ignoreMissingFiles` and ignore the `file not found` when `getPartitions/compute`(for hive table scan). This is much like the logic brought by `spark.sql.files.ignoreMissingFiles`(which is for datasource scan). ## How was this patch tested? UT Author: jinxing <[email protected]> Closes apache#19868 from jinxing64/SPARK-22676.
1 parent 0a9172a commit ed4101d

File tree

6 files changed

+181
-25
lines changed

6 files changed

+181
-25
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,12 @@ package object config {
301301
.booleanConf
302302
.createWithDefault(false)
303303

304+
private[spark] val IGNORE_MISSING_FILES = ConfigBuilder("spark.files.ignoreMissingFiles")
305+
.doc("Whether to ignore missing files. If true, the Spark jobs will continue to run when " +
306+
"encountering missing files and the contents that have been read will still be returned.")
307+
.booleanConf
308+
.createWithDefault(false)
309+
304310
private[spark] val APP_CALLER_CONTEXT = ConfigBuilder("spark.log.callerContext")
305311
.stringConf
306312
.createOptional

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20-
import java.io.IOException
20+
import java.io.{FileNotFoundException, IOException}
2121
import java.text.SimpleDateFormat
2222
import java.util.{Date, Locale}
2323

@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
2828
import org.apache.hadoop.mapred._
2929
import org.apache.hadoop.mapred.lib.CombineFileSplit
3030
import org.apache.hadoop.mapreduce.TaskType
31+
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
3132
import org.apache.hadoop.util.ReflectionUtils
3233

3334
import org.apache.spark._
@@ -134,6 +135,8 @@ class HadoopRDD[K, V](
134135

135136
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
136137

138+
private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
139+
137140
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
138141

139142
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
@@ -197,17 +200,24 @@ class HadoopRDD[K, V](
197200
val jobConf = getJobConf()
198201
// add the credentials here as this can be called before SparkContext initialized
199202
SparkHadoopUtil.get.addCredentials(jobConf)
200-
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
201-
val inputSplits = if (ignoreEmptySplits) {
202-
allInputSplits.filter(_.getLength > 0)
203-
} else {
204-
allInputSplits
205-
}
206-
val array = new Array[Partition](inputSplits.size)
207-
for (i <- 0 until inputSplits.size) {
208-
array(i) = new HadoopPartition(id, i, inputSplits(i))
203+
try {
204+
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
205+
val inputSplits = if (ignoreEmptySplits) {
206+
allInputSplits.filter(_.getLength > 0)
207+
} else {
208+
allInputSplits
209+
}
210+
val array = new Array[Partition](inputSplits.size)
211+
for (i <- 0 until inputSplits.size) {
212+
array(i) = new HadoopPartition(id, i, inputSplits(i))
213+
}
214+
array
215+
} catch {
216+
case e: InvalidInputException if ignoreMissingFiles =>
217+
logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
218+
s" partitions returned from this path.", e)
219+
Array.empty[Partition]
209220
}
210-
array
211221
}
212222

213223
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
@@ -256,6 +266,12 @@ class HadoopRDD[K, V](
256266
try {
257267
inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
258268
} catch {
269+
case e: FileNotFoundException if ignoreMissingFiles =>
270+
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
271+
finished = true
272+
null
273+
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
274+
case e: FileNotFoundException if !ignoreMissingFiles => throw e
259275
case e: IOException if ignoreCorruptFiles =>
260276
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
261277
finished = true
@@ -276,6 +292,11 @@ class HadoopRDD[K, V](
276292
try {
277293
finished = !reader.next(key, value)
278294
} catch {
295+
case e: FileNotFoundException if ignoreMissingFiles =>
296+
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
297+
finished = true
298+
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
299+
case e: FileNotFoundException if !ignoreMissingFiles => throw e
279300
case e: IOException if ignoreCorruptFiles =>
280301
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
281302
finished = true

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20-
import java.io.IOException
20+
import java.io.{FileNotFoundException, IOException}
2121
import java.text.SimpleDateFormat
2222
import java.util.{Date, Locale}
2323

@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
2828
import org.apache.hadoop.io.Writable
2929
import org.apache.hadoop.mapred.JobConf
3030
import org.apache.hadoop.mapreduce._
31-
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
31+
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileInputFormat, FileSplit, InvalidInputException}
3232
import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
3333

3434
import org.apache.spark._
@@ -90,6 +90,8 @@ class NewHadoopRDD[K, V](
9090

9191
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
9292

93+
private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
94+
9395
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
9496

9597
def getConf: Configuration = {
@@ -124,17 +126,25 @@ class NewHadoopRDD[K, V](
124126
configurable.setConf(_conf)
125127
case _ =>
126128
}
127-
val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
128-
val rawSplits = if (ignoreEmptySplits) {
129-
allRowSplits.filter(_.getLength > 0)
130-
} else {
131-
allRowSplits
132-
}
133-
val result = new Array[Partition](rawSplits.size)
134-
for (i <- 0 until rawSplits.size) {
135-
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
129+
try {
130+
val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
131+
val rawSplits = if (ignoreEmptySplits) {
132+
allRowSplits.filter(_.getLength > 0)
133+
} else {
134+
allRowSplits
135+
}
136+
val result = new Array[Partition](rawSplits.size)
137+
for (i <- 0 until rawSplits.size) {
138+
result(i) =
139+
new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
140+
}
141+
result
142+
} catch {
143+
case e: InvalidInputException if ignoreMissingFiles =>
144+
logWarning(s"${_conf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
145+
s" partitions returned from this path.", e)
146+
Array.empty[Partition]
136147
}
137-
result
138148
}
139149

140150
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
@@ -189,6 +199,12 @@ class NewHadoopRDD[K, V](
189199
_reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
190200
_reader
191201
} catch {
202+
case e: FileNotFoundException if ignoreMissingFiles =>
203+
logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
204+
finished = true
205+
null
206+
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
207+
case e: FileNotFoundException if !ignoreMissingFiles => throw e
192208
case e: IOException if ignoreCorruptFiles =>
193209
logWarning(
194210
s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
@@ -213,6 +229,11 @@ class NewHadoopRDD[K, V](
213229
try {
214230
finished = !reader.nextKeyValue
215231
} catch {
232+
case e: FileNotFoundException if ignoreMissingFiles =>
233+
logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
234+
finished = true
235+
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
236+
case e: FileNotFoundException if !ignoreMissingFiles => throw e
216237
case e: IOException if ignoreCorruptFiles =>
217238
logWarning(
218239
s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.zip.GZIPOutputStream
2323

2424
import scala.io.Source
2525

26+
import org.apache.hadoop.conf.Configuration
2627
import org.apache.hadoop.fs.Path
2728
import org.apache.hadoop.io._
2829
import org.apache.hadoop.io.compress.DefaultCodec
@@ -32,7 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
3233
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
3334

3435
import org.apache.spark.internal.config._
35-
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
36+
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD, RDD}
3637
import org.apache.spark.storage.StorageLevel
3738
import org.apache.spark.util.Utils
3839

@@ -596,4 +597,70 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
596597
actualPartitionNum = 5,
597598
expectedPartitionNum = 2)
598599
}
600+
601+
test("spark.files.ignoreMissingFiles should work both HadoopRDD and NewHadoopRDD") {
602+
// "file not found" can happen both when getPartitions or compute in HadoopRDD/NewHadoopRDD,
603+
// We test both cases here.
604+
605+
val deletedPath = new Path(tempDir.getAbsolutePath, "test-data-1")
606+
val fs = deletedPath.getFileSystem(new Configuration())
607+
fs.delete(deletedPath, true)
608+
intercept[FileNotFoundException](fs.open(deletedPath))
609+
610+
def collectRDDAndDeleteFileBeforeCompute(newApi: Boolean): Array[_] = {
611+
val dataPath = new Path(tempDir.getAbsolutePath, "test-data-2")
612+
val writer = new OutputStreamWriter(new FileOutputStream(new File(dataPath.toString)))
613+
writer.write("hello\n")
614+
writer.write("world\n")
615+
writer.close()
616+
val rdd = if (newApi) {
617+
sc.newAPIHadoopFile(dataPath.toString, classOf[NewTextInputFormat],
618+
classOf[LongWritable], classOf[Text])
619+
} else {
620+
sc.textFile(dataPath.toString)
621+
}
622+
rdd.partitions
623+
fs.delete(dataPath, true)
624+
// Exception happens when initialize record reader in HadoopRDD/NewHadoopRDD.compute
625+
// because partitions' info already cached.
626+
rdd.collect()
627+
}
628+
629+
// collect HadoopRDD and NewHadoopRDD when spark.files.ignoreMissingFiles=false by default.
630+
sc = new SparkContext("local", "test")
631+
intercept[org.apache.hadoop.mapred.InvalidInputException] {
632+
// Exception happens when HadoopRDD.getPartitions
633+
sc.textFile(deletedPath.toString).collect()
634+
}
635+
636+
var e = intercept[SparkException] {
637+
collectRDDAndDeleteFileBeforeCompute(false)
638+
}
639+
assert(e.getCause.isInstanceOf[java.io.FileNotFoundException])
640+
641+
intercept[org.apache.hadoop.mapreduce.lib.input.InvalidInputException] {
642+
// Exception happens when NewHadoopRDD.getPartitions
643+
sc.newAPIHadoopFile(deletedPath.toString, classOf[NewTextInputFormat],
644+
classOf[LongWritable], classOf[Text]).collect
645+
}
646+
647+
e = intercept[SparkException] {
648+
collectRDDAndDeleteFileBeforeCompute(true)
649+
}
650+
assert(e.getCause.isInstanceOf[java.io.FileNotFoundException])
651+
652+
sc.stop()
653+
654+
// collect HadoopRDD and NewHadoopRDD when spark.files.ignoreMissingFiles=true.
655+
val conf = new SparkConf().set(IGNORE_MISSING_FILES, true)
656+
sc = new SparkContext("local", "test", conf)
657+
assert(sc.textFile(deletedPath.toString).collect().isEmpty)
658+
659+
assert(collectRDDAndDeleteFileBeforeCompute(false).isEmpty)
660+
661+
assert(sc.newAPIHadoopFile(deletedPath.toString, classOf[NewTextInputFormat],
662+
classOf[LongWritable], classOf[Text]).collect().isEmpty)
663+
664+
assert(collectRDDAndDeleteFileBeforeCompute(true).isEmpty)
665+
}
599666
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,8 @@ object SQLConf {
437437

438438
val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
439439
.doc("When true, check all the partition paths under the table\'s root directory " +
440-
"when reading data stored in HDFS.")
440+
"when reading data stored in HDFS. This configuration will be deprecated in the future " +
441+
"releases and replaced by spark.files.ignoreMissingFiles.")
441442
.booleanConf
442443
.createWithDefault(false)
443444

sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.sql.Timestamp
2323
import com.google.common.io.Files
2424
import org.apache.hadoop.fs.FileSystem
2525

26+
import org.apache.spark.internal.config._
2627
import org.apache.spark.sql._
2728
import org.apache.spark.sql.hive.test.TestHiveSingleton
2829
import org.apache.spark.sql.internal.SQLConf
@@ -70,6 +71,45 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl
7071
}
7172
}
7273

74+
test("Replace spark.sql.hive.verifyPartitionPath by spark.files.ignoreMissingFiles") {
75+
withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "false")) {
76+
sparkContext.conf.set(IGNORE_MISSING_FILES.key, "true")
77+
val testData = sparkContext.parallelize(
78+
(1 to 10).map(i => TestData(i, i.toString))).toDF()
79+
testData.createOrReplaceTempView("testData")
80+
81+
val tmpDir = Files.createTempDir()
82+
// create the table for test
83+
sql(s"CREATE TABLE table_with_partition(key int,value string) " +
84+
s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ")
85+
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " +
86+
"SELECT key,value FROM testData")
87+
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " +
88+
"SELECT key,value FROM testData")
89+
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " +
90+
"SELECT key,value FROM testData")
91+
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " +
92+
"SELECT key,value FROM testData")
93+
94+
// test for the exist path
95+
checkAnswer(sql("select key,value from table_with_partition"),
96+
testData.toDF.collect ++ testData.toDF.collect
97+
++ testData.toDF.collect ++ testData.toDF.collect)
98+
99+
// delete the path of one partition
100+
tmpDir.listFiles
101+
.find { f => f.isDirectory && f.getName().startsWith("ds=") }
102+
.foreach { f => Utils.deleteRecursively(f) }
103+
104+
// test for after delete the path
105+
checkAnswer(sql("select key,value from table_with_partition"),
106+
testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect)
107+
108+
sql("DROP TABLE IF EXISTS table_with_partition")
109+
sql("DROP TABLE IF EXISTS createAndInsertTest")
110+
}
111+
}
112+
73113
test("SPARK-21739: Cast expression should initialize timezoneId") {
74114
withTable("table_with_timestamp_partition") {
75115
sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED BY (ts TIMESTAMP)")

0 commit comments

Comments
 (0)