Skip to content

Commit 928d073

Browse files
gengliangwangHyukjinKwon
authored andcommitted
[SPARK-25595] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled
## What changes were proposed in this pull request? With flag `IGNORE_CORRUPT_FILES` enabled, schema inference should ignore corrupt Avro files, which is consistent with Parquet and Orc data source. ## How was this patch tested? Unit test Closes apache#22611 from gengliangwang/ignoreCorruptAvro. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
1 parent d6be46e commit 928d073

File tree

2 files changed

+93
-28
lines changed

2 files changed

+93
-28
lines changed

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ import org.apache.hadoop.conf.Configuration
3232
import org.apache.hadoop.fs.{FileStatus, Path}
3333
import org.apache.hadoop.mapreduce.Job
3434

35-
import org.apache.spark.TaskContext
35+
import org.apache.spark.{SparkException, TaskContext}
3636
import org.apache.spark.internal.Logging
3737
import org.apache.spark.sql.SparkSession
3838
import org.apache.spark.sql.catalyst.InternalRow
3939
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
4040
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
4141
import org.apache.spark.sql.types.StructType
42-
import org.apache.spark.util.SerializableConfiguration
42+
import org.apache.spark.util.{SerializableConfiguration, Utils}
4343

4444
private[avro] class AvroFileFormat extends FileFormat
4545
with DataSourceRegister with Logging with Serializable {
@@ -59,36 +59,13 @@ private[avro] class AvroFileFormat extends FileFormat
5959
val conf = spark.sessionState.newHadoopConf()
6060
val parsedOptions = new AvroOptions(options, conf)
6161

62-
// Schema evolution is not supported yet. Here we only pick a single random sample file to
63-
// figure out the schema of the whole dataset.
64-
val sampleFile =
65-
if (parsedOptions.ignoreExtension) {
66-
files.headOption.getOrElse {
67-
throw new FileNotFoundException("Files for schema inferring have been not found.")
68-
}
69-
} else {
70-
files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
71-
throw new FileNotFoundException(
72-
"No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
73-
}
74-
}
75-
7662
// User can specify an optional avro json schema.
7763
val avroSchema = parsedOptions.schema
7864
.map(new Schema.Parser().parse)
7965
.getOrElse {
80-
val in = new FsInput(sampleFile.getPath, conf)
81-
try {
82-
val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
83-
try {
84-
reader.getSchema
85-
} finally {
86-
reader.close()
87-
}
88-
} finally {
89-
in.close()
90-
}
91-
}
66+
inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
67+
spark.sessionState.conf.ignoreCorruptFiles)
68+
}
9269

9370
SchemaConverters.toSqlType(avroSchema).dataType match {
9471
case t: StructType => Some(t)
@@ -100,6 +77,51 @@ private[avro] class AvroFileFormat extends FileFormat
10077
}
10178
}
10279

80+
private def inferAvroSchemaFromFiles(
81+
files: Seq[FileStatus],
82+
conf: Configuration,
83+
ignoreExtension: Boolean,
84+
ignoreCorruptFiles: Boolean): Schema = {
85+
// Schema evolution is not supported yet. Here we only pick first random readable sample file to
86+
// figure out the schema of the whole dataset.
87+
val avroReader = files.iterator.map { f =>
88+
val path = f.getPath
89+
if (!ignoreExtension && !path.getName.endsWith(".avro")) {
90+
None
91+
} else {
92+
Utils.tryWithResource {
93+
new FsInput(path, conf)
94+
} { in =>
95+
try {
96+
Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()))
97+
} catch {
98+
case e: IOException =>
99+
if (ignoreCorruptFiles) {
100+
logWarning(s"Skipped the footer in the corrupted file: $path", e)
101+
None
102+
} else {
103+
throw new SparkException(s"Could not read file: $path", e)
104+
}
105+
}
106+
}
107+
}
108+
}.collectFirst {
109+
case Some(reader) => reader
110+
}
111+
112+
avroReader match {
113+
case Some(reader) =>
114+
try {
115+
reader.getSchema
116+
} finally {
117+
reader.close()
118+
}
119+
case None =>
120+
throw new FileNotFoundException(
121+
"No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
122+
}
123+
}
124+
103125
override def shortName(): String = "avro"
104126

105127
override def isSplitable(

external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
3939
import org.apache.spark.sql.internal.SQLConf
4040
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
4141
import org.apache.spark.sql.types._
42+
import org.apache.spark.util.Utils
4243

4344
class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
4445
import testImplicits._
@@ -342,6 +343,48 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
342343
}
343344
}
344345

346+
private def createDummyCorruptFile(dir: File): Unit = {
347+
Utils.tryWithResource {
348+
FileUtils.forceMkdir(dir)
349+
val corruptFile = new File(dir, "corrupt.avro")
350+
new BufferedWriter(new FileWriter(corruptFile))
351+
} { writer =>
352+
writer.write("corrupt")
353+
}
354+
}
355+
356+
test("Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES enabled") {
357+
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
358+
withTempPath { dir =>
359+
createDummyCorruptFile(dir)
360+
val message = intercept[FileNotFoundException] {
361+
spark.read.format("avro").load(dir.getAbsolutePath).schema
362+
}.getMessage
363+
assert(message.contains("No Avro files found."))
364+
365+
val srcFile = new File("src/test/resources/episodes.avro")
366+
val destFile = new File(dir, "episodes.avro")
367+
FileUtils.copyFile(srcFile, destFile)
368+
369+
val result = spark.read.format("avro").load(srcFile.getAbsolutePath).collect()
370+
checkAnswer(spark.read.format("avro").load(dir.getAbsolutePath), result)
371+
}
372+
}
373+
}
374+
375+
test("Throws IOException on reading corrupt Avro file if flag IGNORE_CORRUPT_FILES disabled") {
376+
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
377+
withTempPath { dir =>
378+
createDummyCorruptFile(dir)
379+
val message = intercept[org.apache.spark.SparkException] {
380+
spark.read.format("avro").load(dir.getAbsolutePath)
381+
}.getMessage
382+
383+
assert(message.contains("Could not read file"))
384+
}
385+
}
386+
}
387+
345388
test("Date field type") {
346389
withTempPath { dir =>
347390
val schema = StructType(Seq(

0 commit comments

Comments
 (0)