Skip to content

Commit aff8f15

Browse files
MaxGekkgatorsmile
authored andcommitted
[SPARK-25240][SQL] Fix for a deadlock in RECOVER PARTITIONS
## What changes were proposed in this pull request? In the PR, I propose to not perform recursive parallel listening of files in the `scanPartitions` method because it can cause a deadlock. Instead of that I propose to do `scanPartitions` in parallel for top level partitions only. ## How was this patch tested? I extended an existing test to trigger the deadlock. Author: Maxim Gekk <[email protected]> Closes apache#22233 from MaxGekk/fix-recover-partitions.
1 parent 4e3f3ce commit aff8f15

File tree

3 files changed

+61
-47
lines changed
  • sql

3 files changed

+61
-47
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
2020
import java.util.Locale
2121

2222
import scala.collection.{GenMap, GenSeq}
23-
import scala.concurrent.ExecutionContext
23+
import scala.collection.parallel.ForkJoinTaskSupport
2424
import scala.util.control.NonFatal
2525

2626
import org.apache.hadoop.conf.Configuration
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
2929

3030
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
3131
import org.apache.spark.sql.catalyst.TableIdentifier
32-
import org.apache.spark.sql.catalyst.analysis.Resolver
32+
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
3333
import org.apache.spark.sql.catalyst.catalog._
3434
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3535
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
4040
import org.apache.spark.sql.internal.HiveSerDe
4141
import org.apache.spark.sql.types._
4242
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
43-
import org.apache.spark.util.ThreadUtils.parmap
4443

4544
// Note: The definition of these commands are based on the ones described in
4645
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -622,9 +621,8 @@ case class AlterTableRecoverPartitionsCommand(
622621
val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
623622
val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
624623
try {
625-
implicit val ec = ExecutionContext.fromExecutor(evalPool)
626624
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
627-
spark.sessionState.conf.resolver)
625+
spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
628626
} finally {
629627
evalPool.shutdown()
630628
}
@@ -656,13 +654,23 @@ case class AlterTableRecoverPartitionsCommand(
656654
spec: TablePartitionSpec,
657655
partitionNames: Seq[String],
658656
threshold: Int,
659-
resolver: Resolver)(implicit ec: ExecutionContext): Seq[(TablePartitionSpec, Path)] = {
657+
resolver: Resolver,
658+
evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = {
660659
if (partitionNames.isEmpty) {
661660
return Seq(spec -> path)
662661
}
663662

664-
val statuses = fs.listStatus(path, filter).toSeq
665-
def handleStatus(st: FileStatus): Seq[(TablePartitionSpec, Path)] = {
663+
val statuses = fs.listStatus(path, filter)
664+
val statusPar: GenSeq[FileStatus] =
665+
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
666+
// parallelize the list of partitions here, then we can have better parallelism later.
667+
val parArray = statuses.par
668+
parArray.tasksupport = evalTaskSupport
669+
parArray
670+
} else {
671+
statuses
672+
}
673+
statusPar.flatMap { st =>
666674
val name = st.getPath.getName
667675
if (st.isDirectory && name.contains("=")) {
668676
val ps = name.split("=", 2)
@@ -671,7 +679,7 @@ case class AlterTableRecoverPartitionsCommand(
671679
val value = ExternalCatalogUtils.unescapePathName(ps(1))
672680
if (resolver(columnName, partitionNames.head)) {
673681
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
674-
partitionNames.drop(1), threshold, resolver)
682+
partitionNames.drop(1), threshold, resolver, evalTaskSupport)
675683
} else {
676684
logWarning(
677685
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
@@ -682,14 +690,6 @@ case class AlterTableRecoverPartitionsCommand(
682690
Seq.empty
683691
}
684692
}
685-
val result = if (partitionNames.length > 1 &&
686-
statuses.length > threshold || partitionNames.length > 2) {
687-
parmap(statuses)(handleStatus _)
688-
} else {
689-
statuses.map(handleStatus)
690-
}
691-
692-
result.flatten
693693
}
694694

695695
private def gatherPartitionStats(

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,23 +52,24 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
5252
protected override def generateTable(
5353
catalog: SessionCatalog,
5454
name: TableIdentifier,
55-
isDataSource: Boolean = true): CatalogTable = {
55+
isDataSource: Boolean = true,
56+
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
5657
val storage =
5758
CatalogStorageFormat.empty.copy(locationUri = Some(catalog.defaultTablePath(name)))
5859
val metadata = new MetadataBuilder()
5960
.putString("key", "value")
6061
.build()
62+
val schema = new StructType()
63+
.add("col1", "int", nullable = true, metadata = metadata)
64+
.add("col2", "string")
6165
CatalogTable(
6266
identifier = name,
6367
tableType = CatalogTableType.EXTERNAL,
6468
storage = storage,
65-
schema = new StructType()
66-
.add("col1", "int", nullable = true, metadata = metadata)
67-
.add("col2", "string")
68-
.add("a", "int")
69-
.add("b", "int"),
69+
schema = schema.copy(
70+
fields = schema.fields ++ partitionCols.map(StructField(_, IntegerType))),
7071
provider = Some("parquet"),
71-
partitionColumnNames = Seq("a", "b"),
72+
partitionColumnNames = partitionCols,
7273
createTime = 0L,
7374
createVersion = org.apache.spark.SPARK_VERSION,
7475
tracksPartitionsInCatalog = true)
@@ -176,7 +177,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
176177
protected def generateTable(
177178
catalog: SessionCatalog,
178179
name: TableIdentifier,
179-
isDataSource: Boolean = true): CatalogTable
180+
isDataSource: Boolean = true,
181+
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable
180182

181183
private val escapedIdentifier = "`(.+)`".r
182184

@@ -228,8 +230,10 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
228230
private def createTable(
229231
catalog: SessionCatalog,
230232
name: TableIdentifier,
231-
isDataSource: Boolean = true): Unit = {
232-
catalog.createTable(generateTable(catalog, name, isDataSource), ignoreIfExists = false)
233+
isDataSource: Boolean = true,
234+
partitionCols: Seq[String] = Seq("a", "b")): Unit = {
235+
catalog.createTable(
236+
generateTable(catalog, name, isDataSource, partitionCols), ignoreIfExists = false)
233237
}
234238

235239
private def createTablePartition(
@@ -1131,7 +1135,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
11311135
}
11321136

11331137
test("alter table: recover partition (parallel)") {
1134-
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
1138+
withSQLConf("spark.rdd.parallelListingThreshold" -> "0") {
11351139
testRecoverPartitions()
11361140
}
11371141
}
@@ -1144,23 +1148,32 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
11441148
}
11451149

11461150
val tableIdent = TableIdentifier("tab1")
1147-
createTable(catalog, tableIdent)
1148-
val part1 = Map("a" -> "1", "b" -> "5")
1151+
createTable(catalog, tableIdent, partitionCols = Seq("a", "b", "c"))
1152+
val part1 = Map("a" -> "1", "b" -> "5", "c" -> "19")
11491153
createTablePartition(catalog, part1, tableIdent)
11501154
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
11511155

1152-
val part2 = Map("a" -> "2", "b" -> "6")
1156+
val part2 = Map("a" -> "2", "b" -> "6", "c" -> "31")
11531157
val root = new Path(catalog.getTableMetadata(tableIdent).location)
11541158
val fs = root.getFileSystem(spark.sessionState.newHadoopConf())
11551159
// valid
1156-
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
1157-
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
1158-
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
1159-
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
1160-
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
1161-
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
1162-
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
1163-
fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
1160+
fs.mkdirs(new Path(new Path(new Path(root, "a=1"), "b=5"), "c=19"))
1161+
fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "a.csv")) // file
1162+
fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "_SUCCESS")) // file
1163+
1164+
fs.mkdirs(new Path(new Path(new Path(root, "A=2"), "B=6"), "C=31"))
1165+
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "b.csv")) // file
1166+
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "c.csv")) // file
1167+
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), ".hiddenFile")) // file
1168+
fs.mkdirs(new Path(new Path(root, "A=2/B=6/C=31"), "_temporary"))
1169+
1170+
val parts = (10 to 100).map { a =>
1171+
val part = Map("a" -> a.toString, "b" -> "5", "c" -> "42")
1172+
fs.mkdirs(new Path(new Path(new Path(root, s"a=$a"), "b=5"), "c=42"))
1173+
fs.createNewFile(new Path(new Path(root, s"a=$a/b=5/c=42"), "a.csv")) // file
1174+
createTablePartition(catalog, part, tableIdent)
1175+
part
1176+
}
11641177

11651178
// invalid
11661179
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
@@ -1174,7 +1187,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
11741187
try {
11751188
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
11761189
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
1177-
Set(part1, part2))
1190+
Set(part1, part2) ++ parts)
11781191
if (!isUsingHiveMetastore) {
11791192
assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
11801193
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
6060
protected override def generateTable(
6161
catalog: SessionCatalog,
6262
name: TableIdentifier,
63-
isDataSource: Boolean): CatalogTable = {
63+
isDataSource: Boolean,
64+
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
6465
val storage =
6566
if (isDataSource) {
6667
val serde = HiveSerDe.sourceToSerDe("parquet")
@@ -84,17 +85,17 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
8485
val metadata = new MetadataBuilder()
8586
.putString("key", "value")
8687
.build()
88+
val schema = new StructType()
89+
.add("col1", "int", nullable = true, metadata = metadata)
90+
.add("col2", "string")
8791
CatalogTable(
8892
identifier = name,
8993
tableType = CatalogTableType.EXTERNAL,
9094
storage = storage,
91-
schema = new StructType()
92-
.add("col1", "int", nullable = true, metadata = metadata)
93-
.add("col2", "string")
94-
.add("a", "int")
95-
.add("b", "int"),
95+
schema = schema.copy(
96+
fields = schema.fields ++ partitionCols.map(StructField(_, IntegerType))),
9697
provider = if (isDataSource) Some("parquet") else Some("hive"),
97-
partitionColumnNames = Seq("a", "b"),
98+
partitionColumnNames = partitionCols,
9899
createTime = 0L,
99100
createVersion = org.apache.spark.SPARK_VERSION,
100101
tracksPartitionsInCatalog = true)

0 commit comments

Comments
 (0)