Skip to content

Commit e98b553

Browse files
yuchenhuoRobert Kruszewski
authored andcommitted
[SPARK-23822][SQL] Improve error message for Parquet schema mismatches
## What changes were proposed in this pull request? This pull request tries to improve the error message for spark while reading parquet files with different schemas, e.g. One with a STRING column and the other with a INT column. A new ParquetSchemaColumnConvertNotSupportedException is added to replace the old UnsupportedOperationException. The Exception is again wrapped in FileScanRdd.scala to throw a more a general QueryExecutionException with the actual parquet file name which trigger the exception. ## How was this patch tested? Unit tests added to check the new exception and verify the error messages. Also manually tested with two parquet with different schema to check the error message. <img width="1125" alt="screen shot 2018-03-30 at 4 03 04 pm" src="https://user-images.githubusercontent.com/37087310/38156580-dd58a140-3433-11e8-973a-b816d859fbe1.png"> Author: Yuchen Huo <[email protected]> Closes apache#20953 from yuchenhuo/SPARK-23822.
1 parent 571ce7c commit e98b553

File tree

5 files changed

+166
-13
lines changed

5 files changed

+166
-13
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources;
19+
20+
import org.apache.spark.annotation.InterfaceStability;
21+
22+
/**
23+
* Exception thrown when the parquet reader find column type mismatches.
24+
*/
25+
@InterfaceStability.Unstable
26+
public class SchemaColumnConvertNotSupportedException extends RuntimeException {
27+
28+
/**
29+
* Name of the column which cannot be converted.
30+
*/
31+
private String column;
32+
/**
33+
* Physical column type in the actual parquet file.
34+
*/
35+
private String physicalType;
36+
/**
37+
* Logical column type in the parquet schema the parquet reader use to parse all files.
38+
*/
39+
private String logicalType;
40+
41+
public String getColumn() {
42+
return column;
43+
}
44+
45+
public String getPhysicalType() {
46+
return physicalType;
47+
}
48+
49+
public String getLogicalType() {
50+
return logicalType;
51+
}
52+
53+
public SchemaColumnConvertNotSupportedException(
54+
String column,
55+
String physicalType,
56+
String logicalType) {
57+
super();
58+
this.column = column;
59+
this.physicalType = physicalType;
60+
this.logicalType = logicalType;
61+
}
62+
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.createRLEIterator;
2323

2424
import java.io.IOException;
25+
import java.util.Arrays;
2526
import java.util.TimeZone;
2627
import org.apache.parquet.bytes.BytesUtils;
2728
import org.apache.parquet.column.ColumnDescriptor;
@@ -37,6 +38,7 @@
3738
import org.apache.parquet.schema.OriginalType;
3839
import org.apache.parquet.schema.PrimitiveType;
3940
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
41+
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
4042
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
4143
import org.apache.spark.sql.types.DataTypes;
4244
import org.apache.spark.sql.types.DecimalType;
@@ -233,6 +235,18 @@ private boolean shouldConvertTimestamps() {
233235
return convertTz != null && !convertTz.equals(UTC);
234236
}
235237

238+
/**
239+
* Helper function to construct exception for parquet schema mismatch.
240+
*/
241+
private SchemaColumnConvertNotSupportedException constructConvertNotSupportedException(
242+
ColumnDescriptor descriptor,
243+
WritableColumnVector column) {
244+
return new SchemaColumnConvertNotSupportedException(
245+
Arrays.toString(descriptor.getPath()),
246+
descriptor.getType().toString(),
247+
column.dataType().toString());
248+
}
249+
236250
/**
237251
* Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
238252
*/
@@ -263,7 +277,7 @@ private void decodeDictionaryIds(
263277
}
264278
}
265279
} else {
266-
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
280+
throw constructConvertNotSupportedException(descriptor, column);
267281
}
268282
break;
269283

@@ -284,7 +298,7 @@ private void decodeDictionaryIds(
284298
}
285299
}
286300
} else {
287-
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
301+
throw constructConvertNotSupportedException(descriptor, column);
288302
}
289303
break;
290304

@@ -323,7 +337,7 @@ private void decodeDictionaryIds(
323337
}
324338
}
325339
} else {
326-
throw new UnsupportedOperationException();
340+
throw constructConvertNotSupportedException(descriptor, column);
327341
}
328342
break;
329343
case BINARY:
@@ -362,7 +376,7 @@ private void decodeDictionaryIds(
362376
}
363377
}
364378
} else {
365-
throw new UnsupportedOperationException();
379+
throw constructConvertNotSupportedException(descriptor, column);
366380
}
367381
break;
368382

@@ -377,7 +391,9 @@ private void decodeDictionaryIds(
377391
*/
378392

379393
private void readBooleanBatch(int rowId, int num, WritableColumnVector column) {
380-
assert(column.dataType() == DataTypes.BooleanType);
394+
if (column.dataType() != DataTypes.BooleanType) {
395+
throw constructConvertNotSupportedException(descriptor, column);
396+
}
381397
defColumn.readBooleans(
382398
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
383399
}
@@ -396,7 +412,7 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) {
396412
defColumn.readShorts(
397413
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
398414
} else {
399-
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
415+
throw constructConvertNotSupportedException(descriptor, column);
400416
}
401417
}
402418

@@ -416,7 +432,7 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) {
416432
}
417433
}
418434
} else {
419-
throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
435+
throw constructConvertNotSupportedException(descriptor, column);
420436
}
421437
}
422438

@@ -427,7 +443,7 @@ private void readFloatBatch(int rowId, int num, WritableColumnVector column) {
427443
defColumn.readFloats(
428444
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
429445
} else {
430-
throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
446+
throw constructConvertNotSupportedException(descriptor, column);
431447
}
432448
}
433449

@@ -438,7 +454,7 @@ private void readDoubleBatch(int rowId, int num, WritableColumnVector column) {
438454
defColumn.readDoubles(
439455
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
440456
} else {
441-
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
457+
throw constructConvertNotSupportedException(descriptor, column);
442458
}
443459
}
444460

@@ -473,7 +489,7 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) {
473489
}
474490
}
475491
} else {
476-
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
492+
throw constructConvertNotSupportedException(descriptor, column);
477493
}
478494
}
479495

@@ -512,7 +528,7 @@ private void readFixedLenByteArrayBatch(
512528
}
513529
}
514530
} else {
515-
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
531+
throw constructConvertNotSupportedException(descriptor, column);
516532
}
517533
}
518534

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecutionException.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
class QueryExecutionException(message: String) extends Exception(message)
20+
class QueryExecutionException(message: String, cause: Throwable = null)
21+
extends Exception(message, cause)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@ import java.io.{FileNotFoundException, IOException}
2121

2222
import scala.collection.mutable
2323

24+
import org.apache.parquet.io.ParquetDecodingException
25+
2426
import org.apache.spark.{Partition => RDDPartition, TaskContext, TaskKilledException}
2527
import org.apache.spark.deploy.SparkHadoopUtil
2628
import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
2729
import org.apache.spark.sql.SparkSession
2830
import org.apache.spark.sql.catalyst.InternalRow
31+
import org.apache.spark.sql.execution.QueryExecutionException
2932
import org.apache.spark.sql.vectorized.ColumnarBatch
3033
import org.apache.spark.util.NextIterator
3134

@@ -179,7 +182,23 @@ class FileScanRDD(
179182
currentIterator = readCurrentFile()
180183
}
181184

182-
hasNext
185+
try {
186+
hasNext
187+
} catch {
188+
case e: SchemaColumnConvertNotSupportedException =>
189+
val message = "Parquet column cannot be converted in " +
190+
s"file ${currentFile.filePath}. Column: ${e.getColumn}, " +
191+
s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}"
192+
throw new QueryExecutionException(message, e)
193+
case e: ParquetDecodingException =>
194+
if (e.getMessage.contains("Can not read value at")) {
195+
val message = "Encounter error while reading parquet files. " +
196+
"One possible cause: Parquet column cannot be converted in the " +
197+
"corresponding files. Details: "
198+
throw new QueryExecutionException(message, e)
199+
}
200+
throw e
201+
}
183202
} else {
184203
currentFile = null
185204
InputFileBlockHolder.unset()

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@ package org.apache.spark.sql.execution.datasources.parquet
2020
import scala.reflect.ClassTag
2121
import scala.reflect.runtime.universe.TypeTag
2222

23+
import org.apache.parquet.io.ParquetDecodingException
2324
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
2425

2526
import org.apache.spark.SparkException
2627
import org.apache.spark.sql.catalyst.ScalaReflection
28+
import org.apache.spark.sql.execution.QueryExecutionException
29+
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
2730
import org.apache.spark.sql.internal.SQLConf
2831
import org.apache.spark.sql.test.SharedSQLContext
2932
import org.apache.spark.sql.types._
@@ -382,6 +385,58 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
382385
}
383386
}
384387

388+
// =======================================
389+
// Tests for parquet schema mismatch error
390+
// =======================================
391+
def testSchemaMismatch(path: String, vectorizedReaderEnabled: Boolean): SparkException = {
392+
import testImplicits._
393+
394+
var e: SparkException = null
395+
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReaderEnabled.toString) {
396+
// Create two parquet files with different schemas in the same folder
397+
Seq(("bcd", 2)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet(s"$path/parquet")
398+
Seq((1, "abc")).toDF("a", "b").coalesce(1).write.mode("append").parquet(s"$path/parquet")
399+
400+
e = intercept[SparkException] {
401+
spark.read.parquet(s"$path/parquet").collect()
402+
}
403+
}
404+
e
405+
}
406+
407+
test("schema mismatch failure error message for parquet reader") {
408+
withTempPath { dir =>
409+
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false)
410+
val expectedMessage = "Encounter error while reading parquet files. " +
411+
"One possible cause: Parquet column cannot be converted in the corresponding " +
412+
"files. Details:"
413+
assert(e.getCause.isInstanceOf[QueryExecutionException])
414+
assert(e.getCause.getCause.isInstanceOf[ParquetDecodingException])
415+
assert(e.getCause.getMessage.startsWith(expectedMessage))
416+
}
417+
}
418+
419+
test("schema mismatch failure error message for parquet vectorized reader") {
420+
withTempPath { dir =>
421+
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
422+
assert(e.getCause.isInstanceOf[QueryExecutionException])
423+
assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
424+
425+
// Check if the physical type is reporting correctly
426+
val errMsg = e.getCause.getMessage
427+
assert(errMsg.startsWith("Parquet column cannot be converted in file"))
428+
val file = errMsg.substring("Parquet column cannot be converted in file ".length,
429+
errMsg.indexOf(". "))
430+
val col = spark.read.parquet(file).schema.fields.filter(_.name.equals("a"))
431+
assert(col.length == 1)
432+
if (col(0).dataType == StringType) {
433+
assert(errMsg.contains("Column: [a], Expected: IntegerType, Found: BINARY"))
434+
} else {
435+
assert(errMsg.endsWith("Column: [a], Expected: StringType, Found: INT32"))
436+
}
437+
}
438+
}
439+
385440
// =======================================================
386441
// Tests for converting Parquet LIST to Catalyst ArrayType
387442
// =======================================================

0 commit comments

Comments
 (0)