Skip to content

Commit 39596b9

Browse files
viiryadongjoon-hyun
authored andcommitted
[SPARK-29649][SQL] Stop task set if FileAlreadyExistsException was thrown when writing to output file
### What changes were proposed in this pull request? We already know task attempts that do not clean up output files in staging directory can cause job failure (SPARK-27194). There was proposals trying to fix it by changing output filename, or deleting existing output files. These proposals are not reliable completely. The difficulty is, as previous failed task attempt wrote the output file, at next task attempt the output file is still under same staging directory, even the output file name is different. If the job will go to fail eventually, there is no point to re-run the task until max attempts are reached. For the jobs running a lot of time, re-running the task can waste a lot of time. This patch proposes to let Spark detect such file already exist exception and stop the task set early. ### Why are the changes needed? For now, if FileAlreadyExistsException is thrown during data writing job in SQL, the job will continue re-running task attempts until max failure number is reached. It is no point for re-running tasks as task attempts will also fail because they can not write to the existing file too. We should stop the task set early. ### Does this PR introduce any user-facing change? Yes. If FileAlreadyExistsException is thrown during data writing job in SQL, no more task attempts are re-tried and the task set will be stoped early. ### How was this patch tested? Unit test. Closes apache#26312 from viirya/stop-taskset-if-outputfile-exists. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 32d44b1 commit 39596b9

File tree

5 files changed

+90
-1
lines changed

5 files changed

+90
-1
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
/**
21+
* Exception thrown when a task cannot write to output file due to the file already exists.
22+
*/
23+
private[spark] class TaskOutputFileAlreadyExistException(error: Throwable) extends Exception(error)

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,15 @@ private[spark] class TaskSetManager(
799799
info.id, taskSet.id, tid, ef.description))
800800
return
801801
}
802+
if (ef.className == classOf[TaskOutputFileAlreadyExistException].getName) {
803+
// If we can not write to output file in the task, there's no point in trying to
804+
// re-execute it.
805+
logError("Task %s in stage %s (TID %d) can not write to output file: %s; not retrying"
806+
.format(info.id, taskSet.id, tid, ef.description))
807+
abort("Task %s in stage %s (TID %d) can not write to output file: %s".format(
808+
info.id, taskSet.id, tid, ef.description))
809+
return
810+
}
802811
val key = ef.description
803812
val now = clock.getTimeMillis()
804813
val (printFull, dupCount) = {

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.{Properties, Random}
2222
import scala.collection.mutable
2323
import scala.collection.mutable.ArrayBuffer
2424

25+
import org.apache.hadoop.fs.FileAlreadyExistsException
2526
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyString}
2627
import org.mockito.Mockito._
2728
import org.mockito.invocation.InvocationOnMock
@@ -1775,4 +1776,23 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
17751776
assert(!manager.checkSpeculatableTasks(0))
17761777
assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
17771778
}
1779+
1780+
test("TaskOutputFileAlreadyExistException lead to task set abortion") {
1781+
sc = new SparkContext("local", "test")
1782+
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
1783+
val taskSet = FakeTask.createTaskSet(1)
1784+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
1785+
assert(sched.taskSetsFailed.isEmpty)
1786+
1787+
val offerResult = manager.resourceOffer("exec1", "host1", ANY)
1788+
assert(offerResult.isDefined,
1789+
"Expect resource offer on iteration 0 to return a task")
1790+
assert(offerResult.get.index === 0)
1791+
val reason = new ExceptionFailure(
1792+
new TaskOutputFileAlreadyExistException(
1793+
new FileAlreadyExistsException("file already exists")),
1794+
Seq.empty[AccumulableInfo])
1795+
manager.handleFailedTask(offerResult.get.taskId, TaskState.FAILED, reason)
1796+
assert(sched.taskSetsFailed.contains(taskSet.id))
1797+
}
17781798
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
2020
import java.util.{Date, UUID}
2121

2222
import org.apache.hadoop.conf.Configuration
23-
import org.apache.hadoop.fs.Path
23+
import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
2424
import org.apache.hadoop.mapreduce._
2525
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
2626
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -281,6 +281,10 @@ object FileFormatWriter extends Logging {
281281
} catch {
282282
case e: FetchFailedException =>
283283
throw e
284+
case f: FileAlreadyExistsException =>
285+
// If any output file to write already exists, it does not make sense to re-run this task.
286+
// We throw the exception and let Executor throw ExceptionFailure to abort the job.
287+
throw new TaskOutputFileAlreadyExistException(f)
284288
case t: Throwable =>
285289
throw new SparkException("Task failed while writing rows.", t)
286290
}

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.sql.sources
2020
import java.io.File
2121
import java.sql.Date
2222

23+
import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataOutputStream, Path, RawLocalFileSystem}
24+
2325
import org.apache.spark.SparkException
2426
import org.apache.spark.sql._
2527
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -735,4 +737,35 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
735737
assert(msg.contains("Cannot write nullable values to non-null column 's'"))
736738
}
737739
}
740+
741+
test("Stop task set if FileAlreadyExistsException was thrown") {
742+
withSQLConf("fs.file.impl" -> classOf[FileExistingTestFileSystem].getName,
743+
"fs.file.impl.disable.cache" -> "true") {
744+
withTable("t") {
745+
sql(
746+
"""
747+
|CREATE TABLE t(i INT, part1 INT) USING PARQUET
748+
|PARTITIONED BY (part1)
749+
""".stripMargin)
750+
751+
val df = Seq((1, 1)).toDF("i", "part1")
752+
val err = intercept[SparkException] {
753+
df.write.mode("overwrite").format("parquet").insertInto("t")
754+
}
755+
assert(err.getCause.getMessage.contains("can not write to output file: " +
756+
"org.apache.hadoop.fs.FileAlreadyExistsException"))
757+
}
758+
}
759+
}
760+
}
761+
762+
class FileExistingTestFileSystem extends RawLocalFileSystem {
763+
override def create(
764+
f: Path,
765+
overwrite: Boolean,
766+
bufferSize: Int,
767+
replication: Short,
768+
blockSize: Long): FSDataOutputStream = {
769+
throw new FileAlreadyExistsException(s"${f.toString} already exists")
770+
}
738771
}

0 commit comments

Comments
 (0)