Skip to content

Commit fe9e5c0

Browse files
uros-dbcloud-fan
authored andcommitted
[SPARK-55262][GEO][SQL] Block Geo types in all file based data sources except Parquet
### What changes were proposed in this pull request? Disable geospatial types (Geometry and Geography) in all file based data sources except Parquet: CSV, JSON, ORC, Avro, XML. ### Why are the changes needed? Explicitly disallow geospatial data types in the unsupported formats. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests to verify that GEOMETRY and GEOGRAPHY are disallowed in CSV, JSON, ORC, Avro, and XML. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#54038 from uros-db/geo-data-sources. Authored-by: Uros Bojanic <uros.bojanic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent c32aee1 commit fe9e5c0

File tree

10 files changed

+82
-1
lines changed

10 files changed

+82
-1
lines changed

connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3482,4 +3482,29 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
34823482
checkAnswer(readDf, df)
34833483
}
34843484
}
3485+
3486+
test("Geospatial types are not supported in Avro") {
3487+
withTempDir { dir =>
3488+
// Temporary directory for writing the test data.
3489+
val tempDir = new File(dir, "files").getCanonicalPath
3490+
// Test data: WKB representation of POINT(1 2).
3491+
val wkb = "0101000000000000000000F03F0000000000000040"
3492+
// Test GEOMETRY and GEOGRAPHY data types.
3493+
val geoTestCases = Seq(
3494+
(s"ST_GeomFromWKB(X'$wkb')", "\"GEOMETRY(0)\""),
3495+
(s"ST_GeogFromWKB(X'$wkb')", "\"GEOGRAPHY(4326)\"")
3496+
)
3497+
geoTestCases.foreach { case (expr, expectedType) =>
3498+
checkError(
3499+
exception = intercept[AnalysisException] {
3500+
sql(s"select $expr as g").write.format("avro").mode("overwrite").save(tempDir)
3501+
},
3502+
condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
3503+
parameters = Map(
3504+
"columnName" -> "`g`",
3505+
"columnType" -> expectedType,
3506+
"format" -> "Avro"))
3507+
}
3508+
}
3509+
}
34853510
}

sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ private[sql] object AvroUtils extends Logging {
8383
def supportsDataType(dataType: DataType): Boolean = dataType match {
8484
case _: VariantType => false
8585

86+
case _: GeometryType | _: GeographyType => false
87+
8688
case _: AtomicType => true
8789

8890
case st: StructType => st.forall { f => supportsDataType(f.dataType) }

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ case class CSVFileFormat() extends TextBasedFileFormat with DataSourceRegister {
155155
private def supportDataType(dataType: DataType, allowVariant: Boolean): Boolean = dataType match {
156156
case _: VariantType => allowVariant
157157

158+
case _: GeometryType | _: GeographyType => false
159+
158160
case _: AtomicType => true
159161

160162
case udt: UserDefinedType[_] => supportDataType(udt.sqlType)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ case class JsonFileFormat() extends TextBasedFileFormat with DataSourceRegister
115115
override def supportDataType(dataType: DataType): Boolean = dataType match {
116116
case _: VariantType => true
117117

118+
case _: GeometryType | _: GeographyType => false
119+
118120
case _: AtomicType => true
119121

120122
case st: StructType => st.forall { f => supportDataType(f.dataType) }

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ class OrcFileFormat
249249
override def supportDataType(dataType: DataType): Boolean = dataType match {
250250
case _: VariantType => false
251251

252+
case _: GeometryType | _: GeographyType => false
253+
252254
case _: AtomicType => true
253255

254256
case st: StructType => st.forall { f => supportDataType(f.dataType) }

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuild
2626
import org.apache.spark.sql.execution.datasources.FileFormat
2727
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
2828
import org.apache.spark.sql.execution.datasources.v2.FileTable
29-
import org.apache.spark.sql.types.{AtomicType, DataType, StructType, UserDefinedType}
29+
import org.apache.spark.sql.types.{AtomicType, DataType, GeographyType, GeometryType, StructType, UserDefinedType}
3030
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3131

3232
case class CSVTable(
@@ -57,6 +57,8 @@ case class CSVTable(
5757
}
5858

5959
override def supportsDataType(dataType: DataType): Boolean = dataType match {
60+
case _: GeometryType | _: GeographyType => false
61+
6062
case _: AtomicType => true
6163

6264
case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ case class JsonTable(
5757
}
5858

5959
override def supportsDataType(dataType: DataType): Boolean = dataType match {
60+
case _: GeometryType | _: GeographyType => false
61+
6062
case _: AtomicType => true
6163

6264
case st: StructType => st.forall { f => supportsDataType(f.dataType) }

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ case class OrcTable(
5151
}
5252

5353
override def supportsDataType(dataType: DataType): Boolean = dataType match {
54+
case _: GeometryType | _: GeographyType => false
55+
5456
case _: AtomicType => true
5557

5658
case st: StructType => st.forall { f => supportsDataType(f.dataType) }

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ case class XmlFileFormat() extends TextBasedFileFormat with DataSourceRegister {
131131

132132
override def supportDataType(dataType: DataType): Boolean = dataType match {
133133
case _: VariantType => true
134+
135+
case _: GeometryType | _: GeographyType => false
136+
134137
case _: AtomicType => true
135138

136139
case st: StructType => st.forall { f => supportDataType(f.dataType) }

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,6 +1273,45 @@ class FileBasedDataSourceSuite extends QueryTest
12731273
}
12741274
}
12751275
}
1276+
1277+
test("Geospatial types are not supported in file data sources other than Parquet") {
1278+
// All of these file formats do NOT support geospatial types (GEOMETRY and GEOGRAPHY).
1279+
val unsupportedDataSources = Seq("csv", "json", "orc", "text", "xml")
1280+
// Test both v1 and v2 data sources.
1281+
Seq(true, false).foreach { useV1 =>
1282+
val useV1List = if (useV1) {
1283+
unsupportedDataSources.mkString(",")
1284+
} else {
1285+
""
1286+
}
1287+
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
1288+
withTempDir { dir =>
1289+
// Temporary directory for writing the test data.
1290+
val tempDir = new File(dir, "files").getCanonicalPath
1291+
// Test data: WKB representation of POINT(1 2).
1292+
val wkb = "0101000000000000000000F03F0000000000000040"
1293+
// Test GEOMETRY and GEOGRAPHY data types.
1294+
val geoTestCases = Seq(
1295+
(s"ST_GeomFromWKB(X'$wkb')", "\"GEOMETRY(0)\""),
1296+
(s"ST_GeogFromWKB(X'$wkb')", "\"GEOGRAPHY(4326)\"")
1297+
)
1298+
unsupportedDataSources.foreach { format =>
1299+
geoTestCases.foreach { case (expr, expectedType) =>
1300+
checkError(
1301+
exception = intercept[AnalysisException] {
1302+
sql(s"select $expr as g").write.format(format).mode("overwrite").save(tempDir)
1303+
},
1304+
condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
1305+
parameters = Map(
1306+
"columnName" -> "`g`",
1307+
"columnType" -> expectedType,
1308+
"format" -> formatMapping(format)))
1309+
}
1310+
}
1311+
}
1312+
}
1313+
}
1314+
}
12761315
}
12771316

12781317
object TestingUDT {

0 commit comments

Comments
 (0)