Skip to content

Commit fac457d

Browse files
authored
[SPARK-21400] Don't overwrite output committers on append (apache-spark-on-k8s#227)
1 parent 68ef3f5 commit fac457d

File tree

2 files changed

+23
-77
lines changed

2 files changed

+23
-77
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -33,40 +33,32 @@ class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Bo
3333
extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging {
3434

3535
override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
36-
var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
36+
val clazz = context.getConfiguration
37+
.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
3738

38-
if (!isAppend) {
39-
// If we are appending data to an existing dir, we will only use the output committer
40-
// associated with the file output format since it is not safe to use a custom
41-
// committer for appending. For example, in S3, direct parquet output committer may
42-
// leave partial data in the destination dir when the appending job fails.
43-
// See SPARK-8578 for more details.
44-
val configuration = context.getConfiguration
45-
val clazz =
46-
configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
39+
if (clazz != null) {
40+
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
4741

48-
if (clazz != null) {
49-
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
50-
51-
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
52-
// has an associated output committer. To override this output committer,
53-
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
54-
// If a data source needs to override the output committer, it needs to set the
55-
// output committer in prepareForWrite method.
56-
if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
57-
// The specified output committer is a FileOutputCommitter.
58-
// So, we will use the FileOutputCommitter-specified constructor.
59-
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
60-
committer = ctor.newInstance(new Path(path), context)
61-
} else {
62-
// The specified output committer is just an OutputCommitter.
63-
// So, we will use the no-argument constructor.
64-
val ctor = clazz.getDeclaredConstructor()
65-
committer = ctor.newInstance()
66-
}
42+
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
43+
// has an associated output committer. To override this output committer,
44+
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
45+
// If a data source needs to override the output committer, it needs to set the
46+
// output committer in prepareForWrite method.
47+
if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
48+
// The specified output committer is a FileOutputCommitter.
49+
// So, we will use the FileOutputCommitter-specified constructor.
50+
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
51+
ctor.newInstance(new Path(path), context)
52+
} else {
53+
// The specified output committer is just an OutputCommitter.
54+
// So, we will use the no-argument constructor.
55+
val ctor = clazz.getDeclaredConstructor()
56+
ctor.newInstance()
6757
}
58+
} else {
59+
val committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
60+
logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
61+
committer
6862
}
69-
logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
70-
committer
7163
}
7264
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -783,52 +783,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
783783
}
784784
}
785785

786-
test("SPARK-8578 specified custom output committer will not be used to append data") {
787-
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
788-
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
789-
val extraOptions = Map[String, String](
790-
SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName,
791-
// Since Parquet has its own output committer setting, also set it
792-
// to AlwaysFailParquetOutputCommitter at here.
793-
"spark.sql.parquet.output.committer.class" ->
794-
classOf[AlwaysFailParquetOutputCommitter].getName
795-
)
796-
797-
val df = spark.range(1, 10).toDF("i")
798-
withTempPath { dir =>
799-
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
800-
// Because there data already exists,
801-
// this append should succeed because we will use the output committer associated
802-
// with file format and AlwaysFailOutputCommitter will not be used.
803-
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
804-
checkAnswer(
805-
spark.read
806-
.format(dataSourceName)
807-
.option("dataSchema", df.schema.json)
808-
.options(extraOptions)
809-
.load(dir.getCanonicalPath),
810-
df.union(df))
811-
812-
// This will fail because AlwaysFailOutputCommitter is used when we do append.
813-
intercept[Exception] {
814-
df.write.mode("overwrite")
815-
.options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
816-
}
817-
}
818-
withTempPath { dir =>
819-
// Because there is no existing data,
820-
// this append will fail because AlwaysFailOutputCommitter is used when we do append
821-
// and there is no existing data.
822-
intercept[Exception] {
823-
df.write.mode("append")
824-
.options(extraOptions)
825-
.format(dataSourceName)
826-
.save(dir.getCanonicalPath)
827-
}
828-
}
829-
}
830-
}
831-
832786
test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") {
833787
val df = Seq(
834788
(1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")),

0 commit comments

Comments
 (0)