Skip to content

Commit f2ba01b

Browse files
committed
Support SupportsPartitionManagement for HiveTable
1 parent 2037881 commit f2ba01b

File tree

3 files changed

+238
-4
lines changed

3 files changed

+238
-4
lines changed

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kyuubi.spark.connector.hive
1919

20+
import java.net.URI
2021
import java.util
2122
import java.util.Locale
2223

@@ -26,16 +27,19 @@ import scala.collection.mutable
2627
import org.apache.hadoop.fs.Path
2728
import org.apache.spark.internal.Logging
2829
import org.apache.spark.sql.SparkSession
29-
import org.apache.spark.sql.catalyst.catalog.CatalogTable
30-
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
30+
import org.apache.spark.sql.catalyst.InternalRow
31+
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
32+
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
33+
import org.apache.spark.sql.catalyst.expressions.{Cast, GenericInternalRow, Literal}
34+
import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
3135
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
3236
import org.apache.spark.sql.connector.expressions.Transform
3337
import org.apache.spark.sql.connector.read.ScanBuilder
3438
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
3539
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
3640
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
3741
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{BucketSpecHelper, LogicalExpressions}
38-
import org.apache.spark.sql.types.StructType
42+
import org.apache.spark.sql.types.{StringType, StructType}
3943
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4044

4145
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET}
@@ -46,7 +50,7 @@ case class HiveTable(
4650
sparkSession: SparkSession,
4751
catalogTable: CatalogTable,
4852
hiveTableCatalog: HiveTableCatalog)
49-
extends Table with SupportsRead with SupportsWrite with Logging {
53+
extends Table with SupportsRead with SupportsWrite with SupportsPartitionManagement with Logging {
5054

5155
lazy val dataSchema: StructType = catalogTable.dataSchema
5256

@@ -112,4 +116,92 @@ case class HiveTable(
112116
override def capabilities(): util.Set[TableCapability] = {
113117
util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC)
114118
}
119+
120+
override def createPartition(ident: InternalRow, properties: util.Map[String, String]): Unit = {
121+
val spec = toPartitionSpec(ident)
122+
val location = Option(properties.get(HiveTableProperties.LOCATION)).map(new URI(_))
123+
val newPart = CatalogTablePartition(
124+
spec,
125+
catalogTable.storage.copy(locationUri = location),
126+
properties.asScala.toMap)
127+
hiveTableCatalog.externalCatalog.createPartitions(
128+
catalogTable.database,
129+
catalogTable.identifier.table,
130+
Seq(newPart),
131+
ignoreIfExists = false)
132+
}
133+
134+
override def dropPartition(ident: InternalRow): Boolean = {
135+
try {
136+
hiveTableCatalog.externalCatalog.dropPartitions(
137+
catalogTable.database,
138+
catalogTable.identifier.table,
139+
Seq(toPartitionSpec(ident)),
140+
ignoreIfNotExists = false,
141+
purge = false,
142+
retainData = false)
143+
true
144+
} catch {
145+
case _: NoSuchPartitionException => false
146+
}
147+
}
148+
149+
override def replacePartitionMetadata(
150+
ident: InternalRow,
151+
properties: util.Map[String, String]): Unit = {
152+
throw new UnsupportedOperationException("Replace partition is not supported")
153+
}
154+
155+
override def loadPartitionMetadata(ident: InternalRow): util.Map[String, String] = {
156+
val spec = toPartitionSpec(ident)
157+
val partition = hiveTableCatalog.externalCatalog.getPartition(
158+
catalogTable.database,
159+
catalogTable.identifier.table,
160+
spec)
161+
val metadata = new util.HashMap[String, String](partition.parameters.asJava)
162+
partition.storage.locationUri.foreach { uri =>
163+
metadata.put(HiveTableProperties.LOCATION, uri.toString)
164+
}
165+
metadata
166+
}
167+
168+
override def listPartitionIdentifiers(
169+
names: Array[String],
170+
ident: InternalRow): Array[InternalRow] = {
171+
val partialSpec = if (names.isEmpty) {
172+
None
173+
} else {
174+
val fields = names.map(partitionSchema(_))
175+
val schema = StructType(fields)
176+
Some(toPartitionSpec(ident, schema))
177+
}
178+
hiveTableCatalog.externalCatalog.listPartitions(
179+
catalogTable.database,
180+
catalogTable.identifier.table,
181+
partialSpec).map { part =>
182+
val values = partitionSchema.map { field =>
183+
val strValue = part.spec(field.name)
184+
Cast(Literal(strValue), field.dataType).eval()
185+
}
186+
new GenericInternalRow(values.toArray)
187+
}.toArray
188+
}
189+
190+
private def toPartitionSpec(ident: InternalRow, schema: StructType): Map[String, String] = {
191+
require(
192+
schema.size == ident.numFields,
193+
s"Schema size (${schema.size}) does not match numFields (${ident.numFields})")
194+
schema.zipWithIndex.map { case (field, index) =>
195+
val value = ident.get(index, field.dataType)
196+
val filedValue = Cast(
197+
Literal(value, field.dataType),
198+
StringType,
199+
Some(sparkSession.sessionState.conf.sessionLocalTimeZone)).eval().toString
200+
field.name -> filedValue
201+
}.toMap
202+
}
203+
204+
private def toPartitionSpec(ident: InternalRow): Map[String, String] = {
205+
toPartitionSpec(ident, partitionSchema)
206+
}
115207
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.hive
19+
20+
object HiveTableProperties {
21+
val LOCATION = "location"
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.hive.command
19+
20+
import org.apache.spark.sql.Row
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitionManagement, TableCatalog}
23+
import org.apache.spark.unsafe.types.UTF8String
24+
import org.apache.kyuubi.spark.connector.hive.command.DDLCommandTestUtils.{V1_COMMAND_VERSION, V2_COMMAND_VERSION}
25+
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException}
26+
27+
trait PartitionManagementSuite extends DDLCommandTestUtils {
28+
override protected def command: String = "PARTITION MANAGEMENT"
29+
30+
test("create partition") {
31+
withNamespaceAndTable("ns", "tbl") { t =>
32+
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
33+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
34+
checkAnswer(
35+
sql(s"SHOW PARTITIONS $t"),
36+
Row("year=2023/month=01") :: Nil)
37+
intercept[PartitionsAlreadyExistException] {
38+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
39+
}
40+
}
41+
}
42+
43+
test("create partition with location") {
44+
withNamespaceAndTable("ns", "tbl") { t =>
45+
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
46+
val loc = "file:///tmp/kyuubi/hive_catalog_part_loc"
47+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01') LOCATION '$loc'")
48+
checkAnswer(
49+
sql(s"SHOW PARTITIONS $t"),
50+
Row("year=2023/month=01") :: Nil)
51+
val catalog = spark.sessionState.catalogManager
52+
.catalog(catalogName).asInstanceOf[TableCatalog]
53+
val partManagement = catalog.loadTable(Identifier.of(Array("ns"), "tbl"))
54+
.asInstanceOf[SupportsPartitionManagement]
55+
val partIdent = InternalRow.fromSeq(
56+
Seq(UTF8String.fromString("2023"), UTF8String.fromString("01")))
57+
val metadata = partManagement.loadPartitionMetadata(partIdent)
58+
assert(metadata.containsKey("location"))
59+
assert(metadata.get("location").contains("hive_catalog_part_loc"))
60+
}
61+
}
62+
63+
test("drop partition") {
64+
withNamespaceAndTable("ns", "tbl") { t =>
65+
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
66+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
67+
sql(s"ALTER TABLE $t DROP PARTITION (year='2023', month='01')")
68+
checkAnswer(
69+
sql(s"SHOW PARTITIONS $t"),
70+
Nil)
71+
intercept[NoSuchPartitionsException] {
72+
sql(s"ALTER TABLE $t DROP PARTITION (year='9999', month='99')")
73+
}
74+
}
75+
}
76+
77+
test("list partitions") {
78+
withNamespaceAndTable("ns", "tbl") { t =>
79+
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
80+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
81+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='02')")
82+
checkAnswer(
83+
sql(s"SHOW PARTITIONS $t"),
84+
Row("year=2023/month=01") :: Row("year=2023/month=02") :: Nil)
85+
86+
checkAnswer(
87+
sql(s"SHOW PARTITIONS $t PARTITION (year='2023', month='01')"),
88+
Row("year=2023/month=01") :: Nil)
89+
}
90+
}
91+
92+
test("show partitions with multiple partition columns") {
93+
withNamespaceAndTable("ns", "tbl") { t =>
94+
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
95+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
96+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='02')")
97+
sql(s"ALTER TABLE $t ADD PARTITION (year='2024', month='01')")
98+
99+
checkAnswer(
100+
sql(s"SHOW PARTITIONS $t"),
101+
Row("year=2023/month=01") :: Row("year=2023/month=02") :: Row("year=2024/month=01") :: Nil)
102+
103+
checkAnswer(
104+
sql(s"SHOW PARTITIONS $t PARTITION (year='2023')"),
105+
Row("year=2023/month=01") :: Row("year=2023/month=02") :: Nil)
106+
}
107+
}
108+
}
109+
110+
class PartitionManagementV2Suite extends PartitionManagementSuite {
111+
override protected def catalogVersion: String = "Hive V2"
112+
override protected def commandVersion: String = V2_COMMAND_VERSION
113+
}
114+
115+
class PartitionManagementV1Suite extends PartitionManagementSuite {
116+
val SESSION_CATALOG_NAME: String = "spark_catalog"
117+
override protected val catalogName: String = SESSION_CATALOG_NAME
118+
override protected def catalogVersion: String = "V1"
119+
override protected def commandVersion: String = V1_COMMAND_VERSION
120+
}

0 commit comments

Comments
 (0)