Skip to content

Commit 8f225e0

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-24548][SQL] Fix incorrect schema of Dataset with tuple encoders
## What changes were proposed in this pull request? When creating tuple expression encoders, we should give the serializer expressions of tuple items correct names, so we can have correct output schema when we use such tuple encoders. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <[email protected]> Closes apache#21576 from viirya/SPARK-24548.
1 parent bce1775 commit 8f225e0

File tree

3 files changed

+33
-1
lines changed

3 files changed

+33
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ object ExpressionEncoder {
128128
case b: BoundReference if b == originalInputObject => newInputObject
129129
})
130130

131-
if (enc.flat) {
131+
val serializerExpr = if (enc.flat) {
132132
newSerializer.head
133133
} else {
134134
// For non-flat encoder, the input object is not top level anymore after being combined to
@@ -146,6 +146,7 @@ object ExpressionEncoder {
146146
Invoke(Literal.fromObject(None), "equals", BooleanType, newInputObject :: Nil))
147147
If(nullCheck, Literal.create(null, struct.dataType), struct)
148148
}
149+
Alias(serializerExpr, s"_${index + 1}")()
149150
}
150151

151152
val childrenDeserializers = encoders.zipWithIndex.map { case (enc, index) =>

sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.junit.*;
3535
import org.junit.rules.ExpectedException;
3636

37+
import org.apache.spark.api.java.JavaPairRDD;
3738
import org.apache.spark.api.java.JavaSparkContext;
3839
import org.apache.spark.api.java.function.*;
3940
import org.apache.spark.sql.*;
@@ -336,6 +337,23 @@ public void testTupleEncoder() {
336337
Assert.assertEquals(data5, ds5.collectAsList());
337338
}
338339

340+
@Test
341+
public void testTupleEncoderSchema() {
342+
Encoder<Tuple2<String, Tuple2<String,String>>> encoder =
343+
Encoders.tuple(Encoders.STRING(), Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
344+
List<Tuple2<String, Tuple2<String, String>>> data = Arrays.asList(tuple2("1", tuple2("a", "b")),
345+
tuple2("2", tuple2("c", "d")));
346+
Dataset<Row> ds1 = spark.createDataset(data, encoder).toDF("value1", "value2");
347+
348+
JavaPairRDD<String, Tuple2<String, String>> pairRDD = jsc.parallelizePairs(data);
349+
Dataset<Row> ds2 = spark.createDataset(JavaPairRDD.toRDD(pairRDD), encoder)
350+
.toDF("value1", "value2");
351+
352+
Assert.assertEquals(ds1.schema(), ds2.schema());
353+
Assert.assertEquals(ds1.select(expr("value2._1")).collectAsList(),
354+
ds2.select(expr("value2._1")).collectAsList());
355+
}
356+
339357
@Test
340358
public void testNestedTupleEncoder() {
341359
// test ((int, string), string)

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1466,6 +1466,19 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
14661466
val ds = Seq[(Option[Int], Option[Int])]((Some(1), None)).toDS()
14671467
intercept[NullPointerException](ds.as[(Int, Int)].collect())
14681468
}
1469+
1470+
test("SPARK-24548: Dataset with tuple encoders should have correct schema") {
1471+
val encoder = Encoders.tuple(newStringEncoder,
1472+
Encoders.tuple(newStringEncoder, newStringEncoder))
1473+
1474+
val data = Seq(("a", ("1", "2")), ("b", ("3", "4")))
1475+
val rdd = sparkContext.parallelize(data)
1476+
1477+
val ds1 = spark.createDataset(rdd)
1478+
val ds2 = spark.createDataset(rdd)(encoder)
1479+
assert(ds1.schema == ds2.schema)
1480+
checkDataset(ds1.select("_2._2"), ds2.select("_2._2").collect(): _*)
1481+
}
14691482
}
14701483

14711484
case class TestDataUnion(x: Int, y: Int, z: Int)

0 commit comments

Comments
 (0)