Skip to content

Commit 568db94

Browse files
gengliangwangcloud-fan
authored andcommitted
[SPARK-27356][SQL] File source V2: Fix the case that data columns overlap with partition schema
## What changes were proposed in this pull request? In the current file source V2 framework, the schema of `FileScan` is not returned correctly if there are overlap columns between `dataSchema` and `partitionSchema`. The actual schema should be `dataSchema - overlapSchema + partitionSchema`, which might have different column order from the pushed down `requiredSchema` in `SupportsPushDownRequiredColumns.pruneColumns`. For example, if the data schema is `[a: String, b: String, c: String]` and the partition schema is `[b: Int, d: Int]`, the result schema is `[a: String, b: Int, c: String, d: Int]` in current `FileTable` and `HadoopFsRelation`. while the actual scan schema is `[a: String, c: String, b: Int, d: Int]` in `FileScan`. To fix the corner case, this PR proposes that the output schema of `FileTable` should be `dataSchema - overlapSchema + partitionSchema`, so that the column order is consistent with `FileScan`. Putting all the partition columns to the end of table schema is more reasonable. ## How was this patch tested? Unit test. Closes apache#24284 from gengliangwang/FixReadSchema. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 04e53d2 commit 568db94

File tree

3 files changed

+147
-44
lines changed

3 files changed

+147
-44
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,16 @@ abstract class FileTable(
7070
val partitionSchema = fileIndex.partitionSchema
7171
SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames,
7272
"in the partition schema", caseSensitive)
73-
PartitioningUtils.mergeDataAndPartitionSchema(dataSchema,
74-
partitionSchema, caseSensitive)._1
73+
val partitionNameSet: Set[String] =
74+
partitionSchema.fields.map(PartitioningUtils.getColName(_, caseSensitive)).toSet
75+
76+
// When data and partition schemas have overlapping columns,
77+
// tableSchema = dataSchema - overlapSchema + partitionSchema
78+
val fields = dataSchema.fields.filterNot { field =>
79+
val colName = PartitioningUtils.getColName(field, caseSensitive)
80+
partitionNameSet.contains(colName)
81+
} ++ partitionSchema.fields
82+
StructType(fields)
7583
}
7684

7785
override def capabilities(): java.util.Set[TableCapability] = FileTable.CAPABILITIES

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,20 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
493493
}
494494
}
495495
}
496+
497+
test("Return correct results when data columns overlap with partition columns") {
498+
Seq("parquet", "orc", "json").foreach { format =>
499+
withTempPath { path =>
500+
val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
501+
Seq((1, 2, 3, 4, 5)).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
502+
.write.format(format).save(tablePath.getCanonicalPath)
503+
504+
val df = spark.read.format(format).load(path.getCanonicalPath)
505+
.select("CoL1", "Col2", "CoL5", "CoL3")
506+
checkAnswer(df, Row("a", 2, "e", "c"))
507+
}
508+
}
509+
}
496510
}
497511

498512
object TestingUDT {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala

Lines changed: 123 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,68 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
107107
}
108108
}
109109

110+
test("read partitioned table - with nulls") {
111+
withTempDir { base =>
112+
for {
113+
// Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero...
114+
pi <- Seq(1, null.asInstanceOf[Integer])
115+
ps <- Seq("foo", null.asInstanceOf[String])
116+
} {
117+
makeOrcFile(
118+
(1 to 10).map(i => OrcParData(i, i.toString)),
119+
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
120+
}
121+
122+
spark.read
123+
.option("hive.exec.default.partition.name", defaultPartitionName)
124+
.orc(base.getCanonicalPath)
125+
.createOrReplaceTempView("t")
126+
127+
withTempTable("t") {
128+
checkAnswer(
129+
sql("SELECT * FROM t"),
130+
for {
131+
i <- 1 to 10
132+
pi <- Seq(1, null.asInstanceOf[Integer])
133+
ps <- Seq("foo", null.asInstanceOf[String])
134+
} yield Row(i, i.toString, pi, ps))
135+
136+
checkAnswer(
137+
sql("SELECT * FROM t WHERE pi IS NULL"),
138+
for {
139+
i <- 1 to 10
140+
ps <- Seq("foo", null.asInstanceOf[String])
141+
} yield Row(i, i.toString, null, ps))
142+
143+
checkAnswer(
144+
sql("SELECT * FROM t WHERE ps IS NULL"),
145+
for {
146+
i <- 1 to 10
147+
pi <- Seq(1, null.asInstanceOf[Integer])
148+
} yield Row(i, i.toString, pi, null))
149+
}
150+
}
151+
}
152+
153+
test("SPARK-27162: handle pathfilter configuration correctly") {
154+
withTempPath { dir =>
155+
val path = dir.getCanonicalPath
156+
157+
val df = spark.range(2)
158+
df.write.orc(path + "/p=1")
159+
df.write.orc(path + "/p=2")
160+
assert(spark.read.orc(path).count() === 4)
161+
162+
val extraOptions = Map(
163+
"mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
164+
"mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
165+
)
166+
assert(spark.read.options(extraOptions).orc(path).count() === 2)
167+
}
168+
}
169+
}
170+
171+
class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
110172
test("read partitioned table - partition key included in orc file") {
111173
withTempDir { base =>
112174
for {
@@ -127,7 +189,7 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
127189
i <- 1 to 10
128190
pi <- Seq(1, 2)
129191
ps <- Seq("foo", "bar")
130-
} yield Row(i, pi, i.toString, ps))
192+
} yield Row(i, i.toString, pi, ps))
131193

132194
checkAnswer(
133195
sql("SELECT intField, pi FROM t"),
@@ -142,28 +204,26 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
142204
for {
143205
i <- 1 to 10
144206
ps <- Seq("foo", "bar")
145-
} yield Row(i, 1, i.toString, ps))
207+
} yield Row(i, i.toString, 1, ps))
146208

147209
checkAnswer(
148210
sql("SELECT * FROM t WHERE ps = 'foo'"),
149211
for {
150212
i <- 1 to 10
151213
pi <- Seq(1, 2)
152-
} yield Row(i, pi, i.toString, "foo"))
214+
} yield Row(i, i.toString, pi, "foo"))
153215
}
154216
}
155217
}
156218

157-
158-
test("read partitioned table - with nulls") {
219+
test("read partitioned table - with nulls and partition keys are included in Orc file") {
159220
withTempDir { base =>
160221
for {
161-
// Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero...
162-
pi <- Seq(1, null.asInstanceOf[Integer])
222+
pi <- Seq(1, 2)
163223
ps <- Seq("foo", null.asInstanceOf[String])
164224
} {
165225
makeOrcFile(
166-
(1 to 10).map(i => OrcParData(i, i.toString)),
226+
(1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)),
167227
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
168228
}
169229

@@ -177,23 +237,71 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
177237
sql("SELECT * FROM t"),
178238
for {
179239
i <- 1 to 10
180-
pi <- Seq(1, null.asInstanceOf[Integer])
240+
pi <- Seq(1, 2)
181241
ps <- Seq("foo", null.asInstanceOf[String])
182242
} yield Row(i, i.toString, pi, ps))
183243

184244
checkAnswer(
185-
sql("SELECT * FROM t WHERE pi IS NULL"),
245+
sql("SELECT * FROM t WHERE ps IS NULL"),
186246
for {
187247
i <- 1 to 10
188-
ps <- Seq("foo", null.asInstanceOf[String])
189-
} yield Row(i, i.toString, null, ps))
248+
pi <- Seq(1, 2)
249+
} yield Row(i, i.toString, pi, null))
250+
}
251+
}
252+
}
253+
}
190254

255+
class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
256+
override protected def sparkConf: SparkConf =
257+
super
258+
.sparkConf
259+
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
260+
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc")
261+
262+
test("read partitioned table - partition key included in orc file") {
263+
withTempDir { base =>
264+
for {
265+
pi <- Seq(1, 2)
266+
ps <- Seq("foo", "bar")
267+
} {
268+
makeOrcFile(
269+
(1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)),
270+
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
271+
}
272+
273+
spark.read.orc(base.getCanonicalPath).createOrReplaceTempView("t")
274+
275+
withTempTable("t") {
191276
checkAnswer(
192-
sql("SELECT * FROM t WHERE ps IS NULL"),
277+
sql("SELECT * FROM t"),
193278
for {
194279
i <- 1 to 10
195-
pi <- Seq(1, null.asInstanceOf[Integer])
196-
} yield Row(i, i.toString, pi, null))
280+
pi <- Seq(1, 2)
281+
ps <- Seq("foo", "bar")
282+
} yield Row(i, pi, i.toString, ps))
283+
284+
checkAnswer(
285+
sql("SELECT intField, pi FROM t"),
286+
for {
287+
i <- 1 to 10
288+
pi <- Seq(1, 2)
289+
_ <- Seq("foo", "bar")
290+
} yield Row(i, pi))
291+
292+
checkAnswer(
293+
sql("SELECT * FROM t WHERE pi = 1"),
294+
for {
295+
i <- 1 to 10
296+
ps <- Seq("foo", "bar")
297+
} yield Row(i, 1, i.toString, ps))
298+
299+
checkAnswer(
300+
sql("SELECT * FROM t WHERE ps = 'foo'"),
301+
for {
302+
i <- 1 to 10
303+
pi <- Seq(1, 2)
304+
} yield Row(i, pi, i.toString, "foo"))
197305
}
198306
}
199307
}
@@ -232,31 +340,4 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
232340
}
233341
}
234342
}
235-
236-
test("SPARK-27162: handle pathfilter configuration correctly") {
237-
withTempPath { dir =>
238-
val path = dir.getCanonicalPath
239-
240-
val df = spark.range(2)
241-
df.write.orc(path + "/p=1")
242-
df.write.orc(path + "/p=2")
243-
assert(spark.read.orc(path).count() === 4)
244-
245-
val extraOptions = Map(
246-
"mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
247-
"mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
248-
)
249-
assert(spark.read.options(extraOptions).orc(path).count() === 2)
250-
}
251-
}
252-
}
253-
254-
class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext
255-
256-
class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
257-
override protected def sparkConf: SparkConf =
258-
super
259-
.sparkConf
260-
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
261-
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc")
262343
}

0 commit comments

Comments
 (0)