Skip to content

Commit 8a6b685

Browse files
pan3793cloud-fan
authored andcommitted
[SPARK-54853][SQL] Always check hive.exec.max.dynamic.partitions on the spark side
### What changes were proposed in this pull request? This PR makes `hive.exec.max.dynamic.partitions` check always happens on the spark side on performing dynamic partition overwrite to Hive SerDe tables, and skips the hive side check by setting `hive.exec.max.dynamic.partitions` to `Int.MaxValue` for shared `Hive`'s conf, also assigns the error condition `_LEGACY_ERROR_TEMP_2277` with a proper name `DYNAMIC_PARTITION_WRITE_PARTITION_NUM_LIMIT_EXCEEDED` ### Why are the changes needed? SPARK-37217 partially handles `hive.exec.max.dynamic.partitions` on the spark side, but only for `INSERT OVERWRITE` case on performing dynamic partition overwrite to an external Hive SerDe table, which reduces the data loss risks but still has risks, e.g. when the user updates(especially increases) the session conf `hive.exec.max.dynamic.partitions`, it only takes effect on the spark side, the shared `Hive` still uses a static hadoop conf from `sc.newHadoopConf`, thus if the user hits the error and increases the value by following the error message's suggestion, it can pass the spark side check but fail on the hive side later, then data loss issue mentioned in SPARK-37217 will happens again. Currently, the following three frequently used configs related to dynamic partition overwrite for Hive SerDe tables have inconsistent behaviors ``` -- this works SET hive.exec.dynamic.partition=true; -- this also works SET hive.exec.dynamic.partition.mode=nonstrict; -- this does not work, but the error message suggests the user to do that SET hive.exec.max.dynamic.partitions=1001; ``` ``` Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Number of dynamic partitions created is 3, which is more than 2. To solve this try to set hive.exec.max.dynamic.partitions to at least 3. at org.apache.hadoop.hive.ql.metadata.Hive.getValidPartitionsInPath(Hive.java:1862) at org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(Hive.java:1902) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:569) at org.apache.spark.sql.hive.client.Shim_v2_1.loadDynamicPartitions(HiveShim.scala:1110) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadDynamicPartitions$1(HiveClientImpl.scala:1013) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:237) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:236) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:274) at org.apache.spark.sql.hive.client.HiveClientImpl.loadDynamicPartitions(HiveClientImpl.scala:1004) at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadDynamicPartitions$1(HiveExternalCatalog.scala:1051) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:105) at org.apache.spark.sql.hive.HiveExternalCatalog.loadDynamicPartitions(HiveExternalCatalog.scala:1031) ... ``` ### Does this PR introduce _any_ user-facing change? Yes, with this change, users are allowed to set `hive.exec.max.dynamic.partitions` in session conf, e.g., by executing `SET hive.exec.max.dynamic.partitions=1001`, and users will see more consistent behavior on `hive.exec.max.dynamic.partitions` checks, it always perform checks before calling external catalog `loadDynamicPartitions`, for both managed and external table, and both `INSERT INTO` and `INSERT OVERWRITE` dynamic partition write operation for Hive SerDe tables. ### How was this patch tested? New UT is added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53624 from pan3793/SPARK-54853. Lead-authored-by: Cheng Pan <[email protected]> Co-authored-by: Cheng Pan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent eec092c commit 8a6b685

File tree

5 files changed

+58
-18
lines changed

5 files changed

+58
-18
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1575,6 +1575,12 @@
15751575
],
15761576
"sqlState" : "42734"
15771577
},
1578+
"DYNAMIC_PARTITION_WRITE_PARTITION_NUM_LIMIT_EXCEEDED" : {
1579+
"message" : [
1580+
"Number of dynamic partitions created is <numWrittenParts>, which is more than <maxDynamicPartitions>. To solve this try to set <maxDynamicPartitionsKey> to at least <numWrittenParts>."
1581+
],
1582+
"sqlState" : "54054"
1583+
},
15781584
"EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED" : {
15791585
"message" : [
15801586
"Previous node emitted a row with eventTime=<emittedRowEventTime> which is older than current_watermark_value=<currentWatermark>",
@@ -9409,11 +9415,6 @@
94099415
"<message>"
94109416
]
94119417
},
9412-
"_LEGACY_ERROR_TEMP_2277" : {
9413-
"message" : [
9414-
"Number of dynamic partitions created is <numWrittenParts>, which is more than <maxDynamicPartitions>. To solve this try to set <maxDynamicPartitionsKey> to at least <numWrittenParts>."
9415-
]
9416-
},
94179418
"_LEGACY_ERROR_TEMP_2330" : {
94189419
"message" : [
94199420
"Cannot change nullable column to non-nullable: <fieldName>."

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2484,12 +2484,11 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
24842484
maxDynamicPartitions: Int,
24852485
maxDynamicPartitionsKey: String): Throwable = {
24862486
new SparkException(
2487-
errorClass = "_LEGACY_ERROR_TEMP_2277",
2487+
errorClass = "DYNAMIC_PARTITION_WRITE_PARTITION_NUM_LIMIT_EXCEEDED",
24882488
messageParameters = Map(
2489-
"numWrittenParts" -> numWrittenParts.toString(),
2489+
"numWrittenParts" -> numWrittenParts.toString,
24902490
"maxDynamicPartitionsKey" -> maxDynamicPartitionsKey,
2491-
"maxDynamicPartitions" -> maxDynamicPartitions.toString(),
2492-
"numWrittenParts" -> numWrittenParts.toString()),
2491+
"maxDynamicPartitions" -> maxDynamicPartitions.toString),
24932492
cause = null)
24942493
}
24952494

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1399,6 +1399,8 @@ private[hive] object HiveClientImpl extends Logging {
13991399
if ("bonecp".equalsIgnoreCase(cpType)) {
14001400
hiveConf.set("datanucleus.connectionPoolingType", "DBCP", SOURCE_SPARK)
14011401
}
1402+
// SPARK-54853 handles this check on the Spark side
1403+
hiveConf.set("hive.exec.max.dynamic.partitions", Int.MaxValue.toString, SOURCE_SPARK)
14021404
hiveConf
14031405
}
14041406

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,15 +144,15 @@ case class InsertIntoHiveTable(
144144

145145
if (partition.nonEmpty) {
146146
if (numDynamicPartitions > 0) {
147+
val numWrittenParts = writtenParts.size
148+
val maxDynamicPartitionsKey = HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
149+
val maxDynamicPartitions = hadoopConf.getInt(maxDynamicPartitionsKey,
150+
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal)
151+
if (numWrittenParts > maxDynamicPartitions) {
152+
throw QueryExecutionErrors.writePartitionExceedConfigSizeWhenDynamicPartitionError(
153+
numWrittenParts, maxDynamicPartitions, maxDynamicPartitionsKey)
154+
}
147155
if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
148-
val numWrittenParts = writtenParts.size
149-
val maxDynamicPartitionsKey = HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
150-
val maxDynamicPartitions = hadoopConf.getInt(maxDynamicPartitionsKey,
151-
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal)
152-
if (numWrittenParts > maxDynamicPartitions) {
153-
throw QueryExecutionErrors.writePartitionExceedConfigSizeWhenDynamicPartitionError(
154-
numWrittenParts, maxDynamicPartitions, maxDynamicPartitionsKey)
155-
}
156156
// SPARK-29295: When insert overwrite to a Hive external table partition, if the
157157
// partition does not exist, Hive will not check if the external partition directory
158158
// exists or not before copying files. So if users drop the partition, and then do

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSQLInsertTestSuite.scala

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.spark.sql.hive
1919

20-
import org.apache.spark.SparkThrowable
20+
import org.apache.hadoop.hive.conf.HiveConf
21+
22+
import org.apache.spark.{SparkException, SparkThrowable}
2123
import org.apache.spark.sql.SQLInsertTestSuite
2224
import org.apache.spark.sql.hive.test.TestHiveSingleton
2325

@@ -48,4 +50,40 @@ class HiveSQLInsertTestSuite extends SQLInsertTestSuite with TestHiveSingleton {
4850
checkError(exception = exception, sqlState = None, condition = v1ErrorClass,
4951
parameters = v1Parameters)
5052
}
53+
54+
test("SPARK-54853: SET hive.exec.max.dynamic.partitions takes effect in session conf") {
55+
withSQLConf(
56+
HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE.key -> "false") {
57+
val cols = Seq("c1", "p1")
58+
val df = sql("SELECT 1, * FROM range(3)")
59+
Seq(true, false).foreach { overwrite =>
60+
withTable("t1") {
61+
createTable("t1", cols, Seq("int", "int"), cols.takeRight(1))
62+
assert(spark.table("t1").count() === 0)
63+
64+
spark.conf.set("hive.exec.max.dynamic.partitions", "3")
65+
processInsert("t1", df, overwrite = overwrite)
66+
assert(spark.table("t1").count() === 3)
67+
68+
spark.conf.set("hive.exec.max.dynamic.partitions", "2")
69+
checkError(
70+
exception = intercept[SparkException] {
71+
processInsert("t1", df, overwrite = overwrite)
72+
},
73+
condition = "DYNAMIC_PARTITION_WRITE_PARTITION_NUM_LIMIT_EXCEEDED",
74+
sqlState = Some("54054"),
75+
parameters = Map(
76+
"numWrittenParts" -> "3",
77+
"maxDynamicPartitionsKey" -> HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname,
78+
"maxDynamicPartitions" -> "2"))
79+
assert(spark.table("t1").count() === 3)
80+
81+
spark.conf.set("hive.exec.max.dynamic.partitions", "3")
82+
processInsert("t1", df, overwrite = overwrite)
83+
val expectedRowCount = if (overwrite) 3 else 6
84+
assert(spark.table("t1").count() === expectedRowCount)
85+
}
86+
}
87+
}
88+
}
5189
}

0 commit comments

Comments
 (0)