Skip to content

Commit 5c01b9a

Browse files
committed
cross compile
1 parent a84e8f4 commit 5c01b9a

File tree

8 files changed

+183
-97
lines changed

8 files changed

+183
-97
lines changed

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
138138
// For adding staged Spark RC versions, e.g.:
139139
// resolvers += "Apache Spark 3.5.0 (RC1) Staging" at "https://repository.apache.org/content/repositories/orgapachespark-1444/",
140140
Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-3.5",
141+
Test / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "test" / "scala-spark-3.5",
141142
Antlr4 / antlr4Version := "4.9.3",
142143

143144
// Java-/Scala-/Uni-Doc Settings
@@ -153,6 +154,7 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
153154
targetJvm := "17",
154155
resolvers += "Spark master staging" at "https://repository.apache.org/content/groups/snapshots/",
155156
Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-master",
157+
Test / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "test" / "scala-spark-master",
156158
Antlr4 / antlr4Version := "4.13.1",
157159
Test / javaOptions ++= Seq(
158160
// Copied from SparkBuild.scala to support Java 17 for unit tests (see apache/spark#34153)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.types
18+
19+
object VariantShim {
20+
21+
/**
22+
* Spark's variant type is implemented for Spark 4.0 and is not implemented in Spark 3.5. Thus,
23+
* any Spark 3.5 DataType cannot be a variant type.
24+
*/
25+
def isTypeVariant(dt: DataType): Boolean = false
26+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.types
18+
19+
object VariantShim {
20+
21+
/** Spark's variant type is only implemented in Spark 4.0 and above.*/
22+
def isTypeVariant(dt: DataType): Boolean = dt.isInstanceOf[VariantType]
23+
}

spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,7 +1267,7 @@ def normalizeColumnNamesInDataType(
12671267
* Find VariantType columns in the table schema.
12681268
*/
12691269
def checkForVariantTypeColumnsRecursively(schema: StructType): Boolean = {
1270-
SchemaUtils.typeExistsRecursively(schema)(_.isInstanceOf[VariantType])
1270+
SchemaUtils.typeExistsRecursively(schema)(VariantShim.isTypeVariant(_))
12711271
}
12721272

12731273
/**
@@ -1309,7 +1309,7 @@ def normalizeColumnNamesInDataType(
13091309
case DateType =>
13101310
case TimestampType =>
13111311
case TimestampNTZType =>
1312-
case VariantType =>
1312+
case dt if VariantShim.isTypeVariant(dt) =>
13131313
case BinaryType =>
13141314
case _: DecimalType =>
13151315
case a: ArrayType =>

spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ private[delta] object PartitionUtils {
605605

606606
partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
607607
field => field.dataType match {
608-
case a: AtomicType if !a.isInstanceOf[VariantType] => // OK
608+
case a: AtomicType if !VariantShim.isTypeVariant(a) => // OK
609609
case _ => throw DeltaErrors.cannotUseDataTypeForPartitionColumnError(field)
610610
}
611611
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
trait DeltaVariantSparkOnlyTests { self: DeltaVariantSuite => }
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
import org.apache.spark.SparkThrowable
20+
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
21+
import org.apache.spark.sql.catalyst.TableIdentifier
22+
import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils}
23+
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
24+
import org.apache.spark.sql.types.StructType
25+
26+
trait DeltaVariantSparkOnlyTests
27+
extends QueryTest
28+
with DeltaSQLCommandTest { self: DeltaVariantSuite =>
29+
private def getProtocolForTable(table: String): Protocol = {
30+
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table))
31+
deltaLog.unsafeVolatileSnapshot.protocol
32+
}
33+
34+
test("create a new table with Variant, higher protocol and feature should be picked.") {
35+
withTable("tbl") {
36+
sql("CREATE TABLE tbl(s STRING, v VARIANT) USING DELTA")
37+
sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))")
38+
assert(spark.table("tbl").selectExpr("v::int").head == Row(99))
39+
assert(
40+
getProtocolForTable("tbl") ==
41+
VariantTypeTableFeature.minProtocolVersion.withFeature(VariantTypeTableFeature)
42+
)
43+
}
44+
}
45+
46+
test("creating a table without Variant should use the usual minimum protocol") {
47+
withTable("tbl") {
48+
sql("CREATE TABLE tbl(s STRING, i INTEGER) USING DELTA")
49+
assert(getProtocolForTable("tbl") == Protocol(1, 2))
50+
51+
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl"))
52+
assert(
53+
!deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(VariantTypeTableFeature),
54+
s"Table tbl contains VariantTypeFeature descriptor when its not supposed to"
55+
)
56+
}
57+
}
58+
59+
test("add a new Variant column should upgrade to the correct protocol versions") {
60+
withTable("tbl") {
61+
sql("CREATE TABLE tbl(s STRING) USING delta")
62+
assert(getProtocolForTable("tbl") == Protocol(1, 2))
63+
64+
// Should throw error
65+
val e = intercept[SparkThrowable] {
66+
sql("ALTER TABLE tbl ADD COLUMN v VARIANT")
67+
}
68+
// capture the existing protocol here.
69+
// we will check the error message later in this test as we need to compare the
70+
// expected schema and protocol
71+
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl"))
72+
val currentProtocol = deltaLog.unsafeVolatileSnapshot.protocol
73+
val currentFeatures = currentProtocol.implicitlyAndExplicitlySupportedFeatures
74+
.map(_.name)
75+
.toSeq
76+
.sorted
77+
.mkString(", ")
78+
79+
// add table feature
80+
sql(
81+
s"ALTER TABLE tbl " +
82+
s"SET TBLPROPERTIES('delta.feature.variantType-dev' = 'supported')"
83+
)
84+
85+
sql("ALTER TABLE tbl ADD COLUMN v VARIANT")
86+
87+
// check previously thrown error message
88+
checkError(
89+
e,
90+
errorClass = "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT",
91+
parameters = Map(
92+
"unsupportedFeatures" -> VariantTypeTableFeature.name,
93+
"supportedFeatures" -> currentFeatures
94+
)
95+
)
96+
97+
sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))")
98+
assert(spark.table("tbl").selectExpr("v::int").head == Row(99))
99+
100+
assert(
101+
getProtocolForTable("tbl") ==
102+
VariantTypeTableFeature.minProtocolVersion
103+
.withFeature(VariantTypeTableFeature)
104+
.withFeature(InvariantsTableFeature)
105+
.withFeature(AppendOnlyTableFeature)
106+
)
107+
}
108+
}
109+
}

spark/src/test/scala/org/apache/spark/sql/delta/DeltaVariantSuite.scala

Lines changed: 1 addition & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -16,97 +16,4 @@
1616

1717
package org.apache.spark.sql.delta
1818

19-
import org.apache.spark.SparkThrowable
20-
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
21-
import org.apache.spark.sql.catalyst.TableIdentifier
22-
import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils}
23-
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
24-
import org.apache.spark.sql.types.StructType
25-
26-
class DeltaVariantSuite
27-
extends QueryTest
28-
with DeltaSQLCommandTest {
29-
30-
private def getProtocolForTable(table: String): Protocol = {
31-
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table))
32-
deltaLog.unsafeVolatileSnapshot.protocol
33-
}
34-
35-
test("create a new table with Variant, higher protocol and feature should be picked.") {
36-
withTable("tbl") {
37-
sql("CREATE TABLE tbl(s STRING, v VARIANT) USING DELTA")
38-
sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))")
39-
// TODO(r.chen): Enable once variant casting is properly implemented in OSS Spark.
40-
// assert(spark.table("tbl").selectExpr("v::int").head == Row(99))
41-
assert(
42-
getProtocolForTable("tbl") ==
43-
VariantTypeTableFeature.minProtocolVersion.withFeature(VariantTypeTableFeature)
44-
)
45-
}
46-
}
47-
48-
test("creating a table without Variant should use the usual minimum protocol") {
49-
withTable("tbl") {
50-
sql("CREATE TABLE tbl(s STRING, i INTEGER) USING DELTA")
51-
assert(getProtocolForTable("tbl") == Protocol(1, 2))
52-
53-
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl"))
54-
assert(
55-
!deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(VariantTypeTableFeature),
56-
s"Table tbl contains VariantTypeFeature descriptor when its not supposed to"
57-
)
58-
}
59-
}
60-
61-
test("add a new Variant column should upgrade to the correct protocol versions") {
62-
withTable("tbl") {
63-
sql("CREATE TABLE tbl(s STRING) USING delta")
64-
assert(getProtocolForTable("tbl") == Protocol(1, 2))
65-
66-
// Should throw error
67-
val e = intercept[SparkThrowable] {
68-
sql("ALTER TABLE tbl ADD COLUMN v VARIANT")
69-
}
70-
// capture the existing protocol here.
71-
// we will check the error message later in this test as we need to compare the
72-
// expected schema and protocol
73-
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl"))
74-
val currentProtocol = deltaLog.unsafeVolatileSnapshot.protocol
75-
val currentFeatures = currentProtocol.implicitlyAndExplicitlySupportedFeatures
76-
.map(_.name)
77-
.toSeq
78-
.sorted
79-
.mkString(", ")
80-
81-
// add table feature
82-
sql(
83-
s"ALTER TABLE tbl " +
84-
s"SET TBLPROPERTIES('delta.feature.variantType-dev' = 'supported')"
85-
)
86-
87-
sql("ALTER TABLE tbl ADD COLUMN v VARIANT")
88-
89-
// check previously thrown error message
90-
checkError(
91-
e,
92-
errorClass = "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT",
93-
parameters = Map(
94-
"unsupportedFeatures" -> VariantTypeTableFeature.name,
95-
"supportedFeatures" -> currentFeatures
96-
)
97-
)
98-
99-
sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))")
100-
// TODO(r.chen): Enable once variant casting is properly implemented in OSS Spark.
101-
// assert(spark.table("tbl").selectExpr("v::int").head == Row(99))
102-
103-
assert(
104-
getProtocolForTable("tbl") ==
105-
VariantTypeTableFeature.minProtocolVersion
106-
.withFeature(VariantTypeTableFeature)
107-
.withFeature(InvariantsTableFeature)
108-
.withFeature(AppendOnlyTableFeature)
109-
)
110-
}
111-
}
112-
}
19+
class DeltaVariantSuite extends DeltaVariantSparkOnlyTests

0 commit comments

Comments
 (0)