Skip to content

Commit cea383d

Browse files
committed
minimal support
1 parent a8ebf40 commit cea383d

File tree

4 files changed

+131
-2
lines changed

4 files changed

+131
-2
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,8 @@ object TableFeature {
358358
RowTrackingFeature,
359359
InCommitTimestampTableFeature,
360360
TypeWideningTableFeature,
361-
VacuumProtocolCheckTableFeature)
361+
VacuumProtocolCheckTableFeature,
362+
VariantTypeTableFeature)
362363
}
363364
val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap
364365
require(features.size == featureMap.size, "Lowercase feature names must not duplicate.")
@@ -483,6 +484,14 @@ object ColumnMappingTableFeature
483484
}
484485
}
485486

487+
object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType-dev")
488+
with FeatureAutomaticallyEnabledByMetadata {
489+
override def metadataRequiresFeatureToBeEnabled(
490+
metadata: Metadata, spark: SparkSession): Boolean = {
491+
SchemaUtils.checkForVariantTypeColumnsRecursively(metadata.schema)
492+
}
493+
}
494+
486495
object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz")
487496
with FeatureAutomaticallyEnabledByMetadata {
488497
override def metadataRequiresFeatureToBeEnabled(

spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,6 +1253,13 @@ def normalizeColumnNamesInDataType(
12531253
unsupportedDataTypes.toSeq
12541254
}
12551255

1256+
/**
1257+
* Find VariantType columns in the table schema.
1258+
*/
1259+
def checkForVariantTypeColumnsRecursively(schema: StructType): Boolean = {
1260+
SchemaUtils.typeExistsRecursively(schema)(_.isInstanceOf[VariantType])
1261+
}
1262+
12561263
/**
12571264
* Find TimestampNTZ columns in the table schema.
12581265
*/
@@ -1292,6 +1299,7 @@ def normalizeColumnNamesInDataType(
12921299
case DateType =>
12931300
case TimestampType =>
12941301
case TimestampNTZType =>
1302+
case VariantType =>
12951303
case BinaryType =>
12961304
case _: DecimalType =>
12971305
case a: ArrayType =>

spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ private[delta] object PartitionUtils {
605605

606606
partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
607607
field => field.dataType match {
608-
case _: AtomicType => // OK
608+
case a: AtomicType if !a.isInstanceOf[VariantType] => // OK
609609
case _ => throw DeltaErrors.cannotUseDataTypeForPartitionColumnError(field)
610610
}
611611
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
import org.apache.spark.SparkThrowable
20+
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
21+
import org.apache.spark.sql.catalyst.TableIdentifier
22+
import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils}
23+
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
24+
import org.apache.spark.sql.types.StructType
25+
26+
class DeltaVariantSuite
27+
extends QueryTest
28+
with DeltaSQLCommandTest {
29+
30+
private def getProtocolForTable(table: String): Protocol = {
31+
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table))
32+
deltaLog.unsafeVolatileSnapshot.protocol
33+
}
34+
35+
test("create a new table with Variant, higher protocol and feature should be picked.") {
36+
withTable("tbl") {
37+
sql("CREATE TABLE tbl(s STRING, v VARIANT) USING DELTA")
38+
sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))")
39+
// TODO(r.chen): Enable once `parse_json` is properly implemented in OSS Spark.
40+
// assert(spark.table("tbl").selectExpr("v::int").head == Row(99))
41+
assert(
42+
getProtocolForTable("tbl") ==
43+
VariantTypeTableFeature.minProtocolVersion.withFeature(VariantTypeTableFeature)
44+
)
45+
}
46+
}
47+
48+
test("creating a table without Variant should use the usual minimum protocol") {
49+
withTable("tbl") {
50+
sql("CREATE TABLE tbl(s STRING, i INTEGER) USING DELTA")
51+
assert(getProtocolForTable("tbl") == Protocol(1, 2))
52+
53+
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl"))
54+
assert(
55+
!deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(VariantTypeTableFeature),
56+
s"Table tbl contains VariantTypeFeature descriptor when its not supposed to"
57+
)
58+
}
59+
}
60+
61+
test("add a new Variant column should upgrade to the correct protocol versions") {
62+
withTable("tbl") {
63+
sql("CREATE TABLE tbl(s STRING) USING delta")
64+
assert(getProtocolForTable("tbl") == Protocol(1, 2))
65+
66+
// Should throw error
67+
val e = intercept[SparkThrowable] {
68+
sql("ALTER TABLE tbl ADD COLUMN v VARIANT")
69+
}
70+
// capture the existing protocol here.
71+
// we will check the error message later in this test as we need to compare the
72+
// expected schema and protocol
73+
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl"))
74+
val currentProtocol = deltaLog.unsafeVolatileSnapshot.protocol
75+
val currentFeatures = currentProtocol.implicitlyAndExplicitlySupportedFeatures
76+
.map(_.name)
77+
.toSeq
78+
.sorted
79+
.mkString(", ")
80+
81+
// add table feature
82+
sql(
83+
s"ALTER TABLE tbl " +
84+
s"SET TBLPROPERTIES('delta.feature.variantType-dev' = 'supported')"
85+
)
86+
87+
sql("ALTER TABLE tbl ADD COLUMN v VARIANT")
88+
89+
// check previously thrown error message
90+
checkError(
91+
e,
92+
errorClass = "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT",
93+
parameters = Map(
94+
"unsupportedFeatures" -> VariantTypeTableFeature.name,
95+
"supportedFeatures" -> currentFeatures
96+
)
97+
)
98+
99+
sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))")
100+
// TODO(r.chen): Enable once `parse_json` is properly implemented in OSS Spark.
101+
// assert(spark.table("tbl").selectExpr("v::int").head == Row(99))
102+
103+
assert(
104+
getProtocolForTable("tbl") ==
105+
VariantTypeTableFeature.minProtocolVersion
106+
.withFeature(VariantTypeTableFeature)
107+
.withFeature(InvariantsTableFeature)
108+
.withFeature(AppendOnlyTableFeature)
109+
)
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)